import multiprocessing as mp
import subprocess
import os
import re

from .helper import make_path_safe, thirdparty_binary

from .textgrid import ctm_to_textgrid, parse_ctm

from .config import *

from .exceptions import CorpusError

def mfcc_func(mfcc_directory, log_directory, job_name, mfcc_config_path):  # pragma: no cover
    raw_mfcc_path = os.path.join(mfcc_directory, 'raw_mfcc.{}.ark'.format(job_name))
    raw_scp_path = os.path.join(mfcc_directory, 'raw_mfcc.{}.scp'.format(job_name))
    log_path = os.path.join(log_directory, 'make_mfcc.{}.log'.format(job_name))
    segment_path = os.path.join(log_directory, 'segments.{}'.format(job_name))
    scp_path = os.path.join(log_directory, 'wav.{}.scp'.format(job_name))

    with open(log_path, 'w', encoding='utf8') as f:
        if os.path.exists(segment_path):
            seg_proc = subprocess.Popen([thirdparty_binary('extract-segments'),
                                         'scp,p:' + scp_path, segment_path, 'ark:-'],
                                        stdout=subprocess.PIPE, stderr=f)
            comp_proc = subprocess.Popen([thirdparty_binary('compute-mfcc-feats'), '--verbose=2',
                                          '--config=' + mfcc_config_path,
                                          'ark:-', 'ark:-'],
                                         stdout=subprocess.PIPE, stderr=f, stdin=seg_proc.stdout)

            comp_proc = subprocess.Popen([thirdparty_binary('compute-mfcc-feats'), '--verbose=2',
                                          '--config=' + mfcc_config_path,
                                          'scp,p:' + scp_path, 'ark:-'],
                                         stdout=subprocess.PIPE, stderr=f)
        copy_proc = subprocess.Popen([thirdparty_binary('copy-feats'),
                                      '--compress=true', 'ark:-',
                                      'ark,scp:{},{}'.format(raw_mfcc_path, raw_scp_path)],
                                     stdin=comp_proc.stdout, stderr=f)

[docs]def mfcc(mfcc_directory, log_directory, num_jobs, mfcc_configs): """ Multiprocessing function that converts wav files into MFCCs See and for more details on how MFCCs are computed. Also see for the bash script this function was based on. Parameters ---------- mfcc_directory : str Directory to save MFCC feature matrices log_directory : str Directory to store log files num_jobs : int The number of processes to use in calculation mfcc_configs : list of :class:`~aligner.config.MfccConfig` Configuration object for generating MFCCs Raises ------ CorpusError If the files per speaker exceeds the number of files that are allowed to be open on the computer (for Unix-based systems) """ jobs = [(mfcc_directory, log_directory, x, mfcc_configs[x].path) for x in range(num_jobs)] with mp.Pool(processes=num_jobs) as pool: r = False try: results = [pool.apply_async(mfcc_func, args=i) for i in jobs] output = [p.get() for p in results] except OSError as e: if e.errorno == 24: r = True else: raise if r: raise (CorpusError( 'There were too many files per speaker to process based on your OS settings. Please try to split your data into more speakers.'))
def acc_stats_func(directory, iteration, job_name, feat_path): # pragma: no cover log_path = os.path.join(directory, 'log', 'acc.{}.{}.log'.format(iteration, job_name)) model_path = os.path.join(directory, '{}.mdl'.format(iteration)) next_model_path = os.path.join(directory, '{}.mdl'.format(iteration + 1)) acc_path = os.path.join(directory, '{}.{}.acc'.format(iteration, job_name)) ali_path = os.path.join(directory, 'ali.{}'.format(job_name)) with open(log_path, 'w', encoding='utf8') as logf: acc_proc = subprocess.Popen([thirdparty_binary('gmm-acc-stats-ali'), model_path, "ark:" + feat_path, "ark,t:" + ali_path, acc_path], stderr=logf) acc_proc.communicate()
[docs]def acc_stats(iteration, directory, split_directory, num_jobs, fmllr=False): """ Multiprocessing function that computes stats for GMM training See for more details on the Kaldi binary this runs. Also see for the bash script this function was extracted from Parameters ---------- iteration : int Iteration to calculate stats for directory : str Directory of training (monophone, triphone, speaker-adapted triphone training directories) split_directory : str Directory of training data split into the number of jobs num_jobs : int The number of processes to use in calculation fmllr : bool, optional Whether the current training session is using fMLLR (speaker-adaptation), defaults to False """ feat_name = 'cmvndeltafeats' if fmllr: feat_name += '_fmllr' feat_name += '.{}' jobs = [(directory, iteration, x, os.path.join(split_directory, feat_name.format(x))) for x in range(num_jobs)] with mp.Pool(processes=num_jobs) as pool: results = [pool.apply_async(acc_stats_func, args=i) for i in jobs] output = [p.get() for p in results]
def parse_transitions(path, phones_path): state_extract_pattern = re.compile(r'Transition-state (\d+): phone = (\w+)') id_extract_pattern = re.compile(r'Transition-id = (\d+)') cur_phone = None current = 0 with open(path, encoding='utf8') as f, open(phones_path, 'w', encoding='utf8') as outf: outf.write('{} {}\n'.format('<eps>', 0)) for line in f: line = line.strip() if line.startswith('Transition-state'): m = state_extract_pattern.match(line) _, phone = m.groups() if phone != cur_phone: current = 0 cur_phone = phone else: m = id_extract_pattern.match(line) id = m.groups()[0] outf.write('{}_{} {}\n'.format(phone, current, id)) current += 1 def compile_train_graphs_func(directory, lang_directory, split_directory, job_name, debug=False): # pragma: no cover fst_path = os.path.join(directory, 'fsts.{}'.format(job_name)) tree_path = os.path.join(directory, 'tree') mdl_path = os.path.join(directory, '0.mdl') log_path = os.path.join(directory, 'log', 'show_transition.log') transition_path = os.path.join(directory, 'transitions.txt') phones_file_path = os.path.join(lang_directory, 'phones.txt') triphones_file_path = os.path.join(directory, 'triphones.txt') if debug: with open(log_path, 'w', encoding='utf8') as logf: with open(transition_path, 'w', encoding='utf8') as f:[thirdparty_binary('show-transitions'), phones_file_path, mdl_path], stdout=f, stderr=logf) parse_transitions(transition_path, triphones_file_path) log_path = os.path.join(directory, 'log', 'compile-graphs.0.{}.log'.format(job_name)) if os.path.exists(triphones_file_path): phones_file_path = triphones_file_path words_file_path = os.path.join(lang_directory, 'words.txt') with open(os.path.join(split_directory, 'text.{}.int'.format(job_name)), 'r') as inf, \ open(fst_path, 'wb') as outf, \ open(log_path, 'w', encoding='utf8') as logf: proc = subprocess.Popen([thirdparty_binary('compile-train-graphs'), '--read-disambig-syms={}'.format( os.path.join(lang_directory, 'phones', '')), tree_path, mdl_path, os.path.join(lang_directory, 'L.fst'), "ark:-", "ark:-"], stdin=inf, stdout=outf, stderr=logf) proc.communicate() if debug: utterances = [] with open(os.path.join(split_directory, 'utt2spk.{}'.format(job_name)), 'r', encoding='utf8') as f: for line in f: utt = line.split()[0].strip() if not utt: continue utterances.append(utt) with open(log_path, 'a') as logf: fst_ark_path = os.path.join(directory, 'fsts.{}.ark'.format(job_name)) fst_scp_path = os.path.join(directory, 'fsts.{}.scp'.format(job_name)) proc = subprocess.Popen([thirdparty_binary('fstcopy'), 'ark:{}'.format(fst_path), 'ark,scp:{},{}'.format(fst_ark_path, fst_scp_path)], stderr=logf) proc.communicate() temp_fst_path = os.path.join(directory, 'temp.fst.{}'.format(job_name)) with open(fst_scp_path, 'r') as f: for line in f: line = line.strip() utt = line.split()[0] print(utt) dot_path = os.path.join(directory, '{}.dot'.format(utt)) fst_proc = subprocess.Popen([thirdparty_binary('fstcopy'), 'scp:-', 'scp:echo {} {}|'.format(utt, temp_fst_path)], stdin=subprocess.PIPE, stderr=logf) fst_proc.communicate(input=line.encode()) draw_proc = subprocess.Popen([thirdparty_binary('fstdraw'), '--portrait=true', '--isymbols={}'.format(phones_file_path), '--osymbols={}'.format(words_file_path), temp_fst_path, dot_path], stderr=logf) draw_proc.communicate() try: dot_proc = subprocess.Popen([thirdparty_binary('dot'), '-Tpdf', '-O', dot_path], stderr=logf) dot_proc.communicate() except FileNotFoundError: pass
[docs]def compile_train_graphs(directory, lang_directory, split_directory, num_jobs, debug=False): """ Multiprocessing function that compiles training graphs for utterances See for more details on the Kaldi binary this function calls. Also see for the bash script that this function was extracted from. Parameters ---------- directory : str Directory of training (monophone, triphone, speaker-adapted triphone training directories) lang_directory : str Directory of the language model used split_directory : str Directory of training data split into the number of jobs num_jobs : int The number of processes to use """ os.makedirs(os.path.join(directory, 'log'), exist_ok=True) jobs = [(directory, lang_directory, split_directory, x, debug) for x in range(num_jobs)] with mp.Pool(processes=num_jobs) as pool: results = [pool.apply_async(compile_train_graphs_func, args=i) for i in jobs] output = [p.get() for p in results]
def mono_align_equal_func(mono_directory, split_directory, job_name, feat_path): # pragma: no cover fst_path = os.path.join(mono_directory, 'fsts.{}'.format(job_name)) tree_path = os.path.join(mono_directory, 'tree') mdl_path = os.path.join(mono_directory, '0.mdl') directory = os.path.join(split_directory, str(job_name)) log_path = os.path.join(mono_directory, 'log', 'align.0.{}.log'.format(job_name)) ali_path = os.path.join(mono_directory, '0.{}.acc'.format(job_name)) with open(log_path, 'w', encoding='utf8') as logf, \ open(ali_path, 'wb') as outf: align_proc = subprocess.Popen([thirdparty_binary('align-equal-compiled'), "ark:" + fst_path, 'ark:' + feat_path, 'ark,t:-'], stdout=subprocess.PIPE, stderr=logf) stats_proc = subprocess.Popen([thirdparty_binary('gmm-acc-stats-ali'), '--binary=true', mdl_path, 'ark:' + feat_path, 'ark:-', '-'], stdin=align_proc.stdout, stderr=logf, stdout=outf) stats_proc.communicate()
[docs]def mono_align_equal(mono_directory, split_directory, num_jobs): """ Multiprocessing function that creates equal alignments for base monophone training See for more details on the Kaldi binary this function calls. Also see for the bash script that this function was extracted from. Parameters ---------- mono_directory : str Directory of monophone training split_directory : str Directory of training data split into the number of jobs num_jobs : int The number of processes to use """ jobs = [(mono_directory, split_directory, x, os.path.join(split_directory, 'cmvndeltafeats.{}'.format(x))) for x in range(num_jobs)] with mp.Pool(processes=num_jobs) as pool: results = [pool.apply_async(mono_align_equal_func, args=i) for i in jobs] output = [p.get() for p in results]
def compile_utterance_train_graphs_func(directory, lang_directory, split_directory, job_name, debug=False): # pragma: no cover disambig_int_path = os.path.join(lang_directory, 'phones', '') tree_path = os.path.join(directory, 'tree') mdl_path = os.path.join(directory, 'final.mdl') lexicon_fst_path = os.path.join(lang_directory, 'L_disambig.fst') fsts_path = os.path.join(split_directory, 'utt2fst.{}'.format(job_name)) graphs_path = os.path.join(directory, 'utterance_graphs.{}.fst'.format(job_name)) log_path = os.path.join(directory, 'log', 'compile-graphs-fst.0.{}.log'.format(job_name)) with open(log_path, 'w', encoding='utf8') as logf, open(fsts_path, 'r', encoding='utf8') as f: proc = subprocess.Popen([thirdparty_binary('compile-train-graphs-fsts'), '--transition-scale=1.0', '--self-loop-scale=0.1', '--read-disambig-syms={}'.format(disambig_int_path), tree_path, mdl_path, lexicon_fst_path, "ark:-", "ark:" + graphs_path], stdin=subprocess.PIPE, stderr=logf) group = [] for line in f: group.append(line) if line.strip() == '': for l in group: proc.stdin.write(l.encode('utf8')) group = [] proc.stdin.flush() proc.communicate() def test_utterances_func(directory, lang_directory, split_directory, job_name): # pragma: no cover log_path = os.path.join(directory, 'log', 'decode.0.{}.log'.format(job_name)) words_path = os.path.join(lang_directory, 'words.txt') mdl_path = os.path.join(directory, 'final.mdl') feat_path = os.path.join(split_directory, 'cmvndeltafeats.{}'.format(job_name)) graphs_path = os.path.join(directory, 'utterance_graphs.{}.fst'.format(job_name)) text_int_path = os.path.join(split_directory, 'text.{}.int'.format(job_name)) edits_path = os.path.join(directory, 'edits.{}.txt'.format(job_name)) out_int_path = os.path.join(directory, 'aligned.{}.int'.format(job_name)) acoustic_scale = 0.1 beam = 15.0 lattice_beam = 8.0 max_active = 750 lat_path = os.path.join(directory, 'lat.{}'.format(job_name)) with open(log_path, 'w', encoding='utf8') as logf: latgen_proc = subprocess.Popen([thirdparty_binary('gmm-latgen-faster', ), '--acoustic-scale={}'.format(acoustic_scale), '--beam={}'.format(beam), '--max-active={}'.format(max_active), '--lattice-beam={}'.format(lattice_beam), '--word-symbol-table=' + words_path, mdl_path, 'ark:' + graphs_path, 'ark:' + feat_path, 'ark:' + lat_path], stderr=logf) latgen_proc.communicate() oracle_proc = subprocess.Popen([thirdparty_binary('lattice-oracle'), 'ark:' + lat_path, 'ark,t:' + text_int_path, 'ark,t:' + out_int_path, 'ark,t:' + edits_path], stderr=logf) oracle_proc.communicate() def test_utterances(aligner): print('Checking utterance transcriptions...') from alignment.sequence import Sequence from alignment.vocabulary import Vocabulary from alignment.sequencealigner import SimpleScoring, GlobalSequenceAligner from .corpus import load_scp split_directory = aligner.corpus.split_directory model_directory = aligner.tri_directory lang_directory = aligner.dictionary.output_directory with mp.Pool(processes=aligner.num_jobs) as pool: jobs = [(model_directory, lang_directory, split_directory, x) for x in range(aligner.num_jobs)] results = [pool.apply_async(compile_utterance_train_graphs_func, args=i) for i in jobs] output = [p.get() for p in results] print('Utterance FSTs compiled!') print('Decoding utterances (this will take some time)...') results = [pool.apply_async(test_utterances_func, args=i) for i in jobs] output = [p.get() for p in results] print('Finished decoding utterances!') word_mapping = aligner.dictionary.reversed_word_mapping v = Vocabulary() errors = {} for job in range(aligner.num_jobs): text_path = os.path.join(split_directory, 'text.{}'.format(job)) texts = load_scp(text_path) aligned_int = load_scp(os.path.join(model_directory, 'aligned.{}.int'.format(job))) with open(os.path.join(model_directory, 'aligned.{}'.format(job)), 'w', encoding='utf8') as outf: for utt, line in sorted(aligned_int.items()): text = [] for t in line: text.append(word_mapping[int(t)]) outf.write('{} {}\n'.format(utt, ' '.join(text))) ref_text = texts[utt] if len(text) < len(ref_text) - 7: insertions = [x for x in text if x not in ref_text] deletions = [x for x in ref_text if x not in text] else: aligned_seq = Sequence(text) ref_seq = Sequence(ref_text) alignedEncoded = v.encodeSequence(aligned_seq) refEncoded = v.encodeSequence(ref_seq) scoring = SimpleScoring(2, -1) a = GlobalSequenceAligner(scoring, -2) score, encodeds = a.align(refEncoded, alignedEncoded, backtrace=True) insertions = [] deletions = [] for encoded in encodeds: alignment = v.decodeSequenceAlignment(encoded) for i, f in enumerate(alignment.first): s = alignment.second[i] if f == '-': insertions.append(s) if s == '-': deletions.append(f) if insertions or deletions: errors[utt] = (insertions, deletions, ref_text, text) if not errors: print('There were no utterances with transcription issues.') return True out_path = os.path.join(aligner.output_directory, 'transcription_problems.txt') with open(out_path, 'w', encoding='utf8') as problemf: problemf.write('Utterance\tInsertions\tDeletions\tReference\tDecoded\n') for utt, (insertions, deletions, ref_text, text) in sorted(errors.items(), key=lambda x: -1 * (len(x[1][1]) + len(x[1][2]))): problemf.write('{}\t{}\t{}\t{}\t{}\n'.format(utt, ', '.join(insertions), ', '.join(deletions), ' '.join(ref_text), ' '.join(text))) print( 'There were {} of {} utterances with at least one transcription issue. Please see the outputted csv file {}.'.format( len(errors), aligner.corpus.num_utterances, out_path)) return False def align_func(directory, iteration, job_name, mdl, config, feat_path): # pragma: no cover fst_path = os.path.join(directory, 'fsts.{}'.format(job_name)) log_path = os.path.join(directory, 'log', 'align.{}.{}.log'.format(iteration, job_name)) ali_path = os.path.join(directory, 'ali.{}'.format(job_name)) with open(log_path, 'w', encoding='utf8') as logf, \ open(ali_path, 'wb') as outf: align_proc = subprocess.Popen([thirdparty_binary('gmm-align-compiled')] + config.scale_opts + ['--beam={}'.format(config.beam), '--retry-beam={}'.format(config.beam * 4), '--careful=false', mdl, "ark:" + fst_path, "ark:" + feat_path, "ark:-"], stderr=logf, stdout=outf) align_proc.communicate()
[docs]def align(iteration, directory, split_directory, optional_silence, num_jobs, config): """ Multiprocessing function that aligns based on the current model See and for more details on the Kaldi binary this function calls. Also see for the bash script this function was based on. Parameters ---------- iteration : int Iteration to align directory : str Directory of training (monophone, triphone, speaker-adapted triphone training directories) split_directory : str Directory of training data split into the number of jobs optional_silence : str Colon-separated list of silence phones to boost num_jobs : int The number of processes to use in calculation config : :class:`~aligner.config.MonophoneConfig`, :class:`~aligner.config.TriphoneConfig` or :class:`~aligner.config.TriphoneFmllrConfig` Configuration object for training """ mdl_path = os.path.join(directory, '{}.mdl'.format(iteration)) mdl = "{} --boost={} {} {} - |".format(thirdparty_binary('gmm-boost-silence'), config.boost_silence, optional_silence, make_path_safe(mdl_path)) feat_name = 'cmvndeltafeats' if config.do_fmllr: feat_name += '_fmllr' feat_name += '.{}' jobs = [(directory, iteration, x, mdl, config, os.path.join(split_directory, feat_name.format(x))) for x in range(num_jobs)] with mp.Pool(processes=num_jobs) as pool: results = [pool.apply_async(align_func, args=i) for i in jobs] output = [p.get() for p in results]
def ali_to_textgrid_func(output_directory, model_directory, dictionary, corpus, job_name): # pragma: no cover text_int_path = os.path.join(corpus.split_directory, 'text.{}.int'.format(job_name)) log_path = os.path.join(model_directory, 'log', 'get_ctm_align.{}.log'.format(job_name)) ali_path = os.path.join(model_directory, 'ali.{}'.format(job_name)) model_path = os.path.join(model_directory, 'final.mdl') aligned_path = os.path.join(model_directory, 'aligned.{}'.format(job_name)) word_ctm_path = os.path.join(model_directory, 'word_ctm.{}'.format(job_name)) phone_ctm_path = os.path.join(model_directory, 'phone_ctm.{}'.format(job_name)) frame_shift = corpus.mfcc_configs[0].config_dict['frame-shift'] / 1000 with open(log_path, 'w', encoding='utf8') as logf: lin_proc = subprocess.Popen([thirdparty_binary('linear-to-nbest'), "ark:" + ali_path, "ark:" + text_int_path, '', '', 'ark:-'], stdout=subprocess.PIPE, stderr=logf) align_proc = subprocess.Popen([thirdparty_binary('lattice-align-words'), os.path.join(dictionary.phones_dir, ''), model_path, 'ark:-', 'ark:' + aligned_path], stdin=lin_proc.stdout, stderr=logf) align_proc.communicate()[thirdparty_binary('nbest-to-ctm'), '--frame-shift={}'.format(frame_shift), 'ark:' + aligned_path, word_ctm_path], stderr=logf) phone_proc = subprocess.Popen([thirdparty_binary('lattice-to-phone-lattice'), model_path, 'ark:' + aligned_path, "ark:-"], stdout=subprocess.PIPE, stderr=logf) nbest_proc = subprocess.Popen([thirdparty_binary('nbest-to-ctm'), '--frame-shift={}'.format(frame_shift), "ark:-", phone_ctm_path], stdin=phone_proc.stdout, stderr=logf) nbest_proc.communicate()
[docs]def convert_ali_to_textgrids(output_directory, model_directory, dictionary, corpus, num_jobs): """ Multiprocessing function that aligns based on the current model See: - - - - for more details on the Kaldi binaries this function calls. Also see for the bash script that this function was based on. Parameters ---------- output_directory : str Directory to write TextGrid files to model_directory : str Directory of training (monophone, triphone, speaker-adapted triphone training directories) dictionary : :class:`~aligner.dictionary.Dictionary` Dictionary object that has information about pronunciations corpus : :class:`~aligner.corpus.Corpus` Corpus object that has information about the dataset num_jobs : int The number of processes to use in calculation Raises ------ CorpusError If the files per speaker exceeds the number of files that are allowed to be open on the computer (for Unix-based systems) """ jobs = [(output_directory, model_directory, dictionary, corpus, x) for x in range(num_jobs)] with mp.Pool(processes=num_jobs) as pool: r = False try: results = [pool.apply_async(ali_to_textgrid_func, args=i) for i in jobs] output = [p.get() for p in results] except OSError as e: if hasattr(e, 'errno') and e.errorno == 24: r = True else: raise if r: raise (CorpusError( 'There were too many files per speaker to process based on your OS settings. Please try to split your data into more speakers.')) word_ctm = {} phone_ctm = {} for i in range(num_jobs): word_ctm_path = os.path.join(model_directory, 'word_ctm.{}'.format(i)) phone_ctm_path = os.path.join(model_directory, 'phone_ctm.{}'.format(i)) if not os.path.exists(word_ctm_path): continue parsed = parse_ctm(word_ctm_path, corpus, dictionary, mode='word') for k, v in parsed.items(): if k not in word_ctm: word_ctm[k] = v else: word_ctm[k].update(v) parsed = parse_ctm(phone_ctm_path, corpus, dictionary, mode='phone') for k, v in parsed.items(): if k not in phone_ctm: phone_ctm[k] = v else: phone_ctm[k].update(v) ctm_to_textgrid(word_ctm, phone_ctm, output_directory, corpus, dictionary)
def tree_stats_func(directory, ci_phones, mdl, feat_path, ali_path, job_name): # pragma: no cover context_opts = [] log_path = os.path.join(directory, 'log', 'acc_tree.{}.log'.format(job_name)) treeacc_path = os.path.join(directory, '{}.treeacc'.format(job_name)) with open(log_path, 'w', encoding='utf8') as logf:[thirdparty_binary('acc-tree-stats')] + context_opts + ['--ci-phones=' + ci_phones, mdl, "ark:" + feat_path, "ark:" + ali_path, treeacc_path], stderr=logf)
[docs]def tree_stats(directory, align_directory, split_directory, ci_phones, num_jobs, fmllr=False): """ Multiprocessing function that computes stats for decision tree training See for more details on the Kaldi binary this runs. Parameters ---------- directory : str Directory of training (triphone, speaker-adapted triphone training directories) align_directory : str Directory of previous alignment split_directory : str Directory of training data split into the number of jobs ci_phones : str Colon-separated list of context-independent phones num_jobs : int The number of processes to use in calculation fmllr : bool, optional Whether the current training session is using fMLLR (speaker-adaptation), defaults to False """ feat_name = 'cmvndeltafeats' if fmllr: feat_name += '_fmllr' feat_name += '.{}' mdl_path = os.path.join(align_directory, 'final.mdl') jobs = [(directory, ci_phones, mdl_path, os.path.join(split_directory, feat_name.format(x)), os.path.join(align_directory, 'ali.{}'.format(x)), x) for x in range(num_jobs)] with mp.Pool(processes=num_jobs) as pool: results = [pool.apply_async(tree_stats_func, args=i) for i in jobs] output = [p.get() for p in results] tree_accs = [os.path.join(directory, '{}.treeacc'.format(x)) for x in range(num_jobs)] log_path = os.path.join(directory, 'log', 'sum_tree_acc.log') with open(log_path, 'w', encoding='utf8') as logf:[thirdparty_binary('sum-tree-stats'), os.path.join(directory, 'treeacc')] + tree_accs, stderr=logf) for f in tree_accs: os.remove(f)
def convert_alignments_func(directory, align_directory, job_name): # pragma: no cover mdl_path = os.path.join(directory, '1.mdl') tree_path = os.path.join(directory, 'tree') ali_mdl_path = os.path.join(align_directory, 'final.mdl') ali_path = os.path.join(align_directory, 'ali.{}'.format(job_name)) new_ali_path = os.path.join(directory, 'ali.{}'.format(job_name)) log_path = os.path.join(directory, 'log', 'convert.{}.log'.format(job_name)) with open(log_path, 'w', encoding='utf8') as logf:[thirdparty_binary('convert-ali'), ali_mdl_path, mdl_path, tree_path, "ark:" + ali_path, "ark:" + new_ali_path], stderr=logf)
[docs]def convert_alignments(directory, align_directory, num_jobs): """ Multiprocessing function that converts alignments from previous training See for more details on the Kaldi binary this runs. Parameters ---------- directory : str Directory of training (triphone, speaker-adapted triphone training directories) align_directory : str Directory of previous alignment num_jobs : int The number of processes to use in calculation """ jobs = [(directory, align_directory, x) for x in range(num_jobs)] with mp.Pool(processes=num_jobs) as pool: results = [pool.apply_async(convert_alignments_func, args=i) for i in jobs] output = [p.get() for p in results]
def calc_fmllr_func(directory, split_directory, sil_phones, job_name, config, initial, model_name='final'): # pragma: no cover feat_path = os.path.join(split_directory, 'cmvndeltafeats') if not initial: feat_path += '_fmllr' feat_path += '.{}'.format(job_name) feat_fmllr_path = os.path.join(split_directory, 'cmvndeltafeats_fmllr.{}'.format(job_name)) log_path = os.path.join(directory, 'log', 'fmllr.{}.log'.format(job_name)) ali_path = os.path.join(directory, 'ali.{}'.format(job_name)) mdl_path = os.path.join(directory, '{}.mdl'.format(model_name)) spk2utt_path = os.path.join(split_directory, 'spk2utt.{}'.format(job_name)) utt2spk_path = os.path.join(split_directory, 'utt2spk.{}'.format(job_name)) if not initial: tmp_trans_path = os.path.join(directory, 'trans.temp.{}'.format(job_name)) trans_path = os.path.join(directory, 'trans.{}'.format(job_name)) cmp_trans_path = os.path.join(directory, 'trans.cmp.{}'.format(job_name)) else: tmp_trans_path = os.path.join(directory, 'trans.{}'.format(job_name)) post_path = os.path.join(directory, 'post.{}'.format(job_name)) weight_path = os.path.join(directory, 'weight.{}'.format(job_name)) with open(log_path, 'w', encoding='utf8') as logf:[thirdparty_binary('ali-to-post'), "ark:" + ali_path, 'ark:' + post_path], stderr=logf)[thirdparty_binary('weight-silence-post'), '0.0', sil_phones, mdl_path, 'ark:' + post_path, 'ark:' + weight_path], stderr=logf)[thirdparty_binary('gmm-est-fmllr'), '--verbose=4', '--fmllr-update-type={}'.format(config.fmllr_update_type), '--spk2utt=ark:' + spk2utt_path, mdl_path, "ark,s,cs:" + feat_path, 'ark,s,cs:' + weight_path, 'ark:' + tmp_trans_path], stderr=logf) if not initial:[thirdparty_binary('compose-transforms'), '--b-is-affine=true', 'ark:' + tmp_trans_path, 'ark:' + trans_path, 'ark:' + cmp_trans_path], stderr=logf) os.remove(tmp_trans_path) os.remove(trans_path) os.rename(cmp_trans_path, trans_path) feat_path = os.path.join(split_directory, 'cmvndeltafeats.{}'.format(job_name)) else: trans_path = tmp_trans_path[thirdparty_binary('transform-feats'), '--utt2spk=ark:' + utt2spk_path, 'ark:' + trans_path, 'ark:' + feat_path, 'ark:' + feat_fmllr_path], stderr=logf)
[docs]def calc_fmllr(directory, split_directory, sil_phones, num_jobs, config, initial=False, iteration=None): """ Multiprocessing function that computes speaker adaptation (fMLLR) See: - - - - - for more details on the Kaldi binary this runs. Also see for the original bash script that this function was based on. Parameters ---------- directory : str Directory of training (triphone, speaker-adapted triphone training directories) split_directory : str Directory of training data split into the number of jobs sil_phones : str Colon-separated list of silence phones num_jobs : int The number of processes to use in calculation config : :class:`~aligner.config.TriphoneFmllrConfig` Configuration object for training initial : bool, optional Whether this is the first computation of speaker-adaptation, defaults to False iteration : int Specifies the current iteration, defaults to None """ if iteration is None: model_name = 'final' else: model_name = iteration jobs = [(directory, split_directory, sil_phones, x, config, initial, model_name) for x in range(num_jobs)] with mp.Pool(processes=num_jobs) as pool: results = [pool.apply_async(calc_fmllr_func, args=i) for i in jobs] output = [p.get() for p in results]