import multiprocessing as mp
import subprocess
import os
import re
from .helper import make_path_safe, thirdparty_binary
from .textgrid import ctm_to_textgrid, parse_ctm
from .config import *
from .exceptions import CorpusError
def mfcc_func(mfcc_directory, log_directory, job_name, mfcc_config_path): # pragma: no cover
raw_mfcc_path = os.path.join(mfcc_directory, 'raw_mfcc.{}.ark'.format(job_name))
raw_scp_path = os.path.join(mfcc_directory, 'raw_mfcc.{}.scp'.format(job_name))
log_path = os.path.join(log_directory, 'make_mfcc.{}.log'.format(job_name))
segment_path = os.path.join(log_directory, 'segments.{}'.format(job_name))
scp_path = os.path.join(log_directory, 'wav.{}.scp'.format(job_name))
with open(log_path, 'w', encoding='utf8') as f:
if os.path.exists(segment_path):
seg_proc = subprocess.Popen([thirdparty_binary('extract-segments'),
'scp,p:' + scp_path, segment_path, 'ark:-'],
stdout=subprocess.PIPE, stderr=f)
comp_proc = subprocess.Popen([thirdparty_binary('compute-mfcc-feats'), '--verbose=2',
'--config=' + mfcc_config_path,
'ark:-', 'ark:-'],
stdout=subprocess.PIPE, stderr=f, stdin=seg_proc.stdout)
else:
comp_proc = subprocess.Popen([thirdparty_binary('compute-mfcc-feats'), '--verbose=2',
'--config=' + mfcc_config_path,
'scp,p:' + scp_path, 'ark:-'],
stdout=subprocess.PIPE, stderr=f)
copy_proc = subprocess.Popen([thirdparty_binary('copy-feats'),
'--compress=true', 'ark:-',
'ark,scp:{},{}'.format(raw_mfcc_path, raw_scp_path)],
stdin=comp_proc.stdout, stderr=f)
copy_proc.wait()
[docs]def mfcc(mfcc_directory, log_directory, num_jobs, mfcc_configs):
"""
Multiprocessing function that converts wav files into MFCCs
See http://kaldi-asr.org/doc/feat.html and
http://kaldi-asr.org/doc/compute-mfcc-feats_8cc.html for more details on how
MFCCs are computed.
Also see https://github.com/kaldi-asr/kaldi/blob/master/egs/wsj/s5/steps/make_mfcc.sh
for the bash script this function was based on.
Parameters
----------
mfcc_directory : str
Directory to save MFCC feature matrices
log_directory : str
Directory to store log files
num_jobs : int
The number of processes to use in calculation
mfcc_configs : list of :class:`~aligner.config.MfccConfig`
Configuration object for generating MFCCs
Raises
------
CorpusError
If the files per speaker exceeds the number of files that are
allowed to be open on the computer (for Unix-based systems)
"""
jobs = [(mfcc_directory, log_directory, x, mfcc_configs[x].path)
for x in range(num_jobs)]
with mp.Pool(processes=num_jobs) as pool:
r = False
try:
results = [pool.apply_async(mfcc_func, args=i) for i in jobs]
output = [p.get() for p in results]
except OSError as e:
if e.errorno == 24:
r = True
else:
raise
if r:
raise (CorpusError(
'There were too many files per speaker to process based on your OS settings. Please try to split your data into more speakers.'))
def acc_stats_func(directory, iteration, job_name, feat_path): # pragma: no cover
log_path = os.path.join(directory, 'log', 'acc.{}.{}.log'.format(iteration, job_name))
model_path = os.path.join(directory, '{}.mdl'.format(iteration))
next_model_path = os.path.join(directory, '{}.mdl'.format(iteration + 1))
acc_path = os.path.join(directory, '{}.{}.acc'.format(iteration, job_name))
ali_path = os.path.join(directory, 'ali.{}'.format(job_name))
with open(log_path, 'w', encoding='utf8') as logf:
acc_proc = subprocess.Popen([thirdparty_binary('gmm-acc-stats-ali'), model_path,
"ark:" + feat_path, "ark,t:" + ali_path, acc_path],
stderr=logf)
acc_proc.communicate()
[docs]def acc_stats(iteration, directory, split_directory, num_jobs, fmllr=False):
"""
Multiprocessing function that computes stats for GMM training
See http://kaldi-asr.org/doc/gmm-acc-stats-ali_8cc.html for more details
on the Kaldi binary this runs.
Also see https://github.com/kaldi-asr/kaldi/blob/master/egs/wsj/s5/steps/train_mono.sh
for the bash script this function was extracted from
Parameters
----------
iteration : int
Iteration to calculate stats for
directory : str
Directory of training (monophone, triphone, speaker-adapted triphone
training directories)
split_directory : str
Directory of training data split into the number of jobs
num_jobs : int
The number of processes to use in calculation
fmllr : bool, optional
Whether the current training session is using fMLLR (speaker-adaptation),
defaults to False
"""
feat_name = 'cmvndeltafeats'
if fmllr:
feat_name += '_fmllr'
feat_name += '.{}'
jobs = [(directory, iteration, x, os.path.join(split_directory, feat_name.format(x)))
for x in range(num_jobs)]
with mp.Pool(processes=num_jobs) as pool:
results = [pool.apply_async(acc_stats_func, args=i) for i in jobs]
output = [p.get() for p in results]
def parse_transitions(path, phones_path):
state_extract_pattern = re.compile(r'Transition-state (\d+): phone = (\w+)')
id_extract_pattern = re.compile(r'Transition-id = (\d+)')
cur_phone = None
current = 0
with open(path, encoding='utf8') as f, open(phones_path, 'w', encoding='utf8') as outf:
outf.write('{} {}\n'.format('<eps>', 0))
for line in f:
line = line.strip()
if line.startswith('Transition-state'):
m = state_extract_pattern.match(line)
_, phone = m.groups()
if phone != cur_phone:
current = 0
cur_phone = phone
else:
m = id_extract_pattern.match(line)
id = m.groups()[0]
outf.write('{}_{} {}\n'.format(phone, current, id))
current += 1
def compile_train_graphs_func(directory, lang_directory, split_directory, job_name, debug=False): # pragma: no cover
fst_path = os.path.join(directory, 'fsts.{}'.format(job_name))
tree_path = os.path.join(directory, 'tree')
mdl_path = os.path.join(directory, '0.mdl')
log_path = os.path.join(directory, 'log', 'show_transition.log')
transition_path = os.path.join(directory, 'transitions.txt')
phones_file_path = os.path.join(lang_directory, 'phones.txt')
triphones_file_path = os.path.join(directory, 'triphones.txt')
if debug:
with open(log_path, 'w', encoding='utf8') as logf:
with open(transition_path, 'w', encoding='utf8') as f:
subprocess.call([thirdparty_binary('show-transitions'), phones_file_path, mdl_path],
stdout=f, stderr=logf)
parse_transitions(transition_path, triphones_file_path)
log_path = os.path.join(directory, 'log', 'compile-graphs.0.{}.log'.format(job_name))
if os.path.exists(triphones_file_path):
phones_file_path = triphones_file_path
words_file_path = os.path.join(lang_directory, 'words.txt')
with open(os.path.join(split_directory, 'text.{}.int'.format(job_name)), 'r') as inf, \
open(fst_path, 'wb') as outf, \
open(log_path, 'w', encoding='utf8') as logf:
proc = subprocess.Popen([thirdparty_binary('compile-train-graphs'),
'--read-disambig-syms={}'.format(
os.path.join(lang_directory, 'phones', 'disambig.int')),
tree_path, mdl_path,
os.path.join(lang_directory, 'L.fst'),
"ark:-", "ark:-"],
stdin=inf, stdout=outf, stderr=logf)
proc.communicate()
if debug:
utterances = []
with open(os.path.join(split_directory, 'utt2spk.{}'.format(job_name)), 'r', encoding='utf8') as f:
for line in f:
utt = line.split()[0].strip()
if not utt:
continue
utterances.append(utt)
with open(log_path, 'a') as logf:
fst_ark_path = os.path.join(directory, 'fsts.{}.ark'.format(job_name))
fst_scp_path = os.path.join(directory, 'fsts.{}.scp'.format(job_name))
proc = subprocess.Popen([thirdparty_binary('fstcopy'),
'ark:{}'.format(fst_path),
'ark,scp:{},{}'.format(fst_ark_path, fst_scp_path)], stderr=logf)
proc.communicate()
temp_fst_path = os.path.join(directory, 'temp.fst.{}'.format(job_name))
with open(fst_scp_path, 'r') as f:
for line in f:
line = line.strip()
utt = line.split()[0]
print(utt)
dot_path = os.path.join(directory, '{}.dot'.format(utt))
fst_proc = subprocess.Popen([thirdparty_binary('fstcopy'),
'scp:-',
'scp:echo {} {}|'.format(utt, temp_fst_path)],
stdin=subprocess.PIPE, stderr=logf)
fst_proc.communicate(input=line.encode())
draw_proc = subprocess.Popen([thirdparty_binary('fstdraw'), '--portrait=true',
'--isymbols={}'.format(phones_file_path),
'--osymbols={}'.format(words_file_path), temp_fst_path, dot_path],
stderr=logf)
draw_proc.communicate()
try:
dot_proc = subprocess.Popen([thirdparty_binary('dot'), '-Tpdf', '-O', dot_path], stderr=logf)
dot_proc.communicate()
except FileNotFoundError:
pass
[docs]def compile_train_graphs(directory, lang_directory, split_directory, num_jobs, debug=False):
"""
Multiprocessing function that compiles training graphs for utterances
See http://kaldi-asr.org/doc/compile-train-graphs_8cc.html for more details
on the Kaldi binary this function calls.
Also see https://github.com/kaldi-asr/kaldi/blob/master/egs/wsj/s5/steps/train_mono.sh
for the bash script that this function was extracted from.
Parameters
----------
directory : str
Directory of training (monophone, triphone, speaker-adapted triphone
training directories)
lang_directory : str
Directory of the language model used
split_directory : str
Directory of training data split into the number of jobs
num_jobs : int
The number of processes to use
"""
os.makedirs(os.path.join(directory, 'log'), exist_ok=True)
jobs = [(directory, lang_directory, split_directory, x, debug)
for x in range(num_jobs)]
with mp.Pool(processes=num_jobs) as pool:
results = [pool.apply_async(compile_train_graphs_func, args=i) for i in jobs]
output = [p.get() for p in results]
def mono_align_equal_func(mono_directory, split_directory, job_name, feat_path): # pragma: no cover
fst_path = os.path.join(mono_directory, 'fsts.{}'.format(job_name))
tree_path = os.path.join(mono_directory, 'tree')
mdl_path = os.path.join(mono_directory, '0.mdl')
directory = os.path.join(split_directory, str(job_name))
log_path = os.path.join(mono_directory, 'log', 'align.0.{}.log'.format(job_name))
ali_path = os.path.join(mono_directory, '0.{}.acc'.format(job_name))
with open(log_path, 'w', encoding='utf8') as logf, \
open(ali_path, 'wb') as outf:
align_proc = subprocess.Popen([thirdparty_binary('align-equal-compiled'), "ark:" + fst_path,
'ark:' + feat_path, 'ark,t:-'],
stdout=subprocess.PIPE, stderr=logf)
stats_proc = subprocess.Popen([thirdparty_binary('gmm-acc-stats-ali'), '--binary=true',
mdl_path, 'ark:' + feat_path, 'ark:-', '-'],
stdin=align_proc.stdout, stderr=logf, stdout=outf)
stats_proc.communicate()
[docs]def mono_align_equal(mono_directory, split_directory, num_jobs):
"""
Multiprocessing function that creates equal alignments for base monophone training
See http://kaldi-asr.org/doc/align-equal-compiled_8cc.html for more details
on the Kaldi binary this function calls.
Also see https://github.com/kaldi-asr/kaldi/blob/master/egs/wsj/s5/steps/train_mono.sh
for the bash script that this function was extracted from.
Parameters
----------
mono_directory : str
Directory of monophone training
split_directory : str
Directory of training data split into the number of jobs
num_jobs : int
The number of processes to use
"""
jobs = [(mono_directory, split_directory, x, os.path.join(split_directory, 'cmvndeltafeats.{}'.format(x)))
for x in range(num_jobs)]
with mp.Pool(processes=num_jobs) as pool:
results = [pool.apply_async(mono_align_equal_func, args=i) for i in jobs]
output = [p.get() for p in results]
def compile_utterance_train_graphs_func(directory, lang_directory, split_directory, job_name, debug=False): # pragma: no cover
disambig_int_path = os.path.join(lang_directory, 'phones', 'disambig.int')
tree_path = os.path.join(directory, 'tree')
mdl_path = os.path.join(directory, 'final.mdl')
lexicon_fst_path = os.path.join(lang_directory, 'L_disambig.fst')
fsts_path = os.path.join(split_directory, 'utt2fst.{}'.format(job_name))
graphs_path = os.path.join(directory, 'utterance_graphs.{}.fst'.format(job_name))
log_path = os.path.join(directory, 'log', 'compile-graphs-fst.0.{}.log'.format(job_name))
with open(log_path, 'w', encoding='utf8') as logf, open(fsts_path, 'r', encoding='utf8') as f:
proc = subprocess.Popen([thirdparty_binary('compile-train-graphs-fsts'),
'--transition-scale=1.0', '--self-loop-scale=0.1',
'--read-disambig-syms={}'.format(disambig_int_path),
tree_path, mdl_path,
lexicon_fst_path,
"ark:-", "ark:" + graphs_path],
stdin=subprocess.PIPE, stderr=logf)
group = []
for line in f:
group.append(line)
if line.strip() == '':
for l in group:
proc.stdin.write(l.encode('utf8'))
group = []
proc.stdin.flush()
proc.communicate()
def test_utterances_func(directory, lang_directory, split_directory, job_name): # pragma: no cover
log_path = os.path.join(directory, 'log', 'decode.0.{}.log'.format(job_name))
words_path = os.path.join(lang_directory, 'words.txt')
mdl_path = os.path.join(directory, 'final.mdl')
feat_path = os.path.join(split_directory, 'cmvndeltafeats.{}'.format(job_name))
graphs_path = os.path.join(directory, 'utterance_graphs.{}.fst'.format(job_name))
text_int_path = os.path.join(split_directory, 'text.{}.int'.format(job_name))
edits_path = os.path.join(directory, 'edits.{}.txt'.format(job_name))
out_int_path = os.path.join(directory, 'aligned.{}.int'.format(job_name))
acoustic_scale = 0.1
beam = 15.0
lattice_beam = 8.0
max_active = 750
lat_path = os.path.join(directory, 'lat.{}'.format(job_name))
with open(log_path, 'w', encoding='utf8') as logf:
latgen_proc = subprocess.Popen([thirdparty_binary('gmm-latgen-faster', ),
'--acoustic-scale={}'.format(acoustic_scale),
'--beam={}'.format(beam),
'--max-active={}'.format(max_active), '--lattice-beam={}'.format(lattice_beam),
'--word-symbol-table=' + words_path,
mdl_path, 'ark:' + graphs_path, 'ark:' + feat_path, 'ark:' + lat_path],
stderr=logf)
latgen_proc.communicate()
oracle_proc = subprocess.Popen([thirdparty_binary('lattice-oracle'),
'ark:' + lat_path, 'ark,t:' + text_int_path,
'ark,t:' + out_int_path, 'ark,t:' + edits_path],
stderr=logf)
oracle_proc.communicate()
def test_utterances(aligner):
print('Checking utterance transcriptions...')
from alignment.sequence import Sequence
from alignment.vocabulary import Vocabulary
from alignment.sequencealigner import SimpleScoring, GlobalSequenceAligner
from .corpus import load_scp
split_directory = aligner.corpus.split_directory
model_directory = aligner.tri_directory
lang_directory = aligner.dictionary.output_directory
with mp.Pool(processes=aligner.num_jobs) as pool:
jobs = [(model_directory, lang_directory, split_directory, x)
for x in range(aligner.num_jobs)]
results = [pool.apply_async(compile_utterance_train_graphs_func, args=i) for i in jobs]
output = [p.get() for p in results]
print('Utterance FSTs compiled!')
print('Decoding utterances (this will take some time)...')
results = [pool.apply_async(test_utterances_func, args=i) for i in jobs]
output = [p.get() for p in results]
print('Finished decoding utterances!')
word_mapping = aligner.dictionary.reversed_word_mapping
v = Vocabulary()
errors = {}
for job in range(aligner.num_jobs):
text_path = os.path.join(split_directory, 'text.{}'.format(job))
texts = load_scp(text_path)
aligned_int = load_scp(os.path.join(model_directory, 'aligned.{}.int'.format(job)))
with open(os.path.join(model_directory, 'aligned.{}'.format(job)), 'w', encoding='utf8') as outf:
for utt, line in sorted(aligned_int.items()):
text = []
for t in line:
text.append(word_mapping[int(t)])
outf.write('{} {}\n'.format(utt, ' '.join(text)))
ref_text = texts[utt]
if len(text) < len(ref_text) - 7:
insertions = [x for x in text if x not in ref_text]
deletions = [x for x in ref_text if x not in text]
else:
aligned_seq = Sequence(text)
ref_seq = Sequence(ref_text)
alignedEncoded = v.encodeSequence(aligned_seq)
refEncoded = v.encodeSequence(ref_seq)
scoring = SimpleScoring(2, -1)
a = GlobalSequenceAligner(scoring, -2)
score, encodeds = a.align(refEncoded, alignedEncoded, backtrace=True)
insertions = []
deletions = []
for encoded in encodeds:
alignment = v.decodeSequenceAlignment(encoded)
for i, f in enumerate(alignment.first):
s = alignment.second[i]
if f == '-':
insertions.append(s)
if s == '-':
deletions.append(f)
if insertions or deletions:
errors[utt] = (insertions, deletions, ref_text, text)
if not errors:
print('There were no utterances with transcription issues.')
return True
out_path = os.path.join(aligner.output_directory, 'transcription_problems.txt')
with open(out_path, 'w', encoding='utf8') as problemf:
problemf.write('Utterance\tInsertions\tDeletions\tReference\tDecoded\n')
for utt, (insertions, deletions, ref_text, text) in sorted(errors.items(),
key=lambda x: -1 * (len(x[1][1]) + len(x[1][2]))):
problemf.write('{}\t{}\t{}\t{}\t{}\n'.format(utt, ', '.join(insertions), ', '.join(deletions),
' '.join(ref_text), ' '.join(text)))
print(
'There were {} of {} utterances with at least one transcription issue. Please see the outputted csv file {}.'.format(
len(errors), aligner.corpus.num_utterances, out_path))
return False
def align_func(directory, iteration, job_name, mdl, config, feat_path): # pragma: no cover
fst_path = os.path.join(directory, 'fsts.{}'.format(job_name))
log_path = os.path.join(directory, 'log', 'align.{}.{}.log'.format(iteration, job_name))
ali_path = os.path.join(directory, 'ali.{}'.format(job_name))
with open(log_path, 'w', encoding='utf8') as logf, \
open(ali_path, 'wb') as outf:
align_proc = subprocess.Popen([thirdparty_binary('gmm-align-compiled')] + config.scale_opts +
['--beam={}'.format(config.beam),
'--retry-beam={}'.format(config.beam * 4),
'--careful=false',
mdl,
"ark:" + fst_path, "ark:" + feat_path, "ark:-"],
stderr=logf,
stdout=outf)
align_proc.communicate()
[docs]def align(iteration, directory, split_directory, optional_silence, num_jobs, config):
"""
Multiprocessing function that aligns based on the current model
See http://kaldi-asr.org/doc/gmm-align-compiled_8cc.html and
http://kaldi-asr.org/doc/gmm-boost-silence_8cc.html for more details
on the Kaldi binary this function calls.
Also see https://github.com/kaldi-asr/kaldi/blob/master/egs/wsj/s5/steps/align_si.sh
for the bash script this function was based on.
Parameters
----------
iteration : int
Iteration to align
directory : str
Directory of training (monophone, triphone, speaker-adapted triphone
training directories)
split_directory : str
Directory of training data split into the number of jobs
optional_silence : str
Colon-separated list of silence phones to boost
num_jobs : int
The number of processes to use in calculation
config : :class:`~aligner.config.MonophoneConfig`, :class:`~aligner.config.TriphoneConfig` or :class:`~aligner.config.TriphoneFmllrConfig`
Configuration object for training
"""
mdl_path = os.path.join(directory, '{}.mdl'.format(iteration))
mdl = "{} --boost={} {} {} - |".format(thirdparty_binary('gmm-boost-silence'),
config.boost_silence, optional_silence, make_path_safe(mdl_path))
feat_name = 'cmvndeltafeats'
if config.do_fmllr:
feat_name += '_fmllr'
feat_name += '.{}'
jobs = [(directory, iteration, x, mdl, config, os.path.join(split_directory, feat_name.format(x)))
for x in range(num_jobs)]
with mp.Pool(processes=num_jobs) as pool:
results = [pool.apply_async(align_func, args=i) for i in jobs]
output = [p.get() for p in results]
def ali_to_textgrid_func(output_directory, model_directory, dictionary, corpus, job_name): # pragma: no cover
text_int_path = os.path.join(corpus.split_directory, 'text.{}.int'.format(job_name))
log_path = os.path.join(model_directory, 'log', 'get_ctm_align.{}.log'.format(job_name))
ali_path = os.path.join(model_directory, 'ali.{}'.format(job_name))
model_path = os.path.join(model_directory, 'final.mdl')
aligned_path = os.path.join(model_directory, 'aligned.{}'.format(job_name))
word_ctm_path = os.path.join(model_directory, 'word_ctm.{}'.format(job_name))
phone_ctm_path = os.path.join(model_directory, 'phone_ctm.{}'.format(job_name))
frame_shift = corpus.mfcc_configs[0].config_dict['frame-shift'] / 1000
with open(log_path, 'w', encoding='utf8') as logf:
lin_proc = subprocess.Popen([thirdparty_binary('linear-to-nbest'), "ark:" + ali_path,
"ark:" + text_int_path,
'', '', 'ark:-'],
stdout=subprocess.PIPE, stderr=logf)
align_proc = subprocess.Popen([thirdparty_binary('lattice-align-words'),
os.path.join(dictionary.phones_dir, 'word_boundary.int'), model_path,
'ark:-', 'ark:' + aligned_path],
stdin=lin_proc.stdout, stderr=logf)
align_proc.communicate()
subprocess.call([thirdparty_binary('nbest-to-ctm'),
'--frame-shift={}'.format(frame_shift),
'ark:' + aligned_path,
word_ctm_path],
stderr=logf)
phone_proc = subprocess.Popen([thirdparty_binary('lattice-to-phone-lattice'), model_path,
'ark:' + aligned_path, "ark:-"],
stdout=subprocess.PIPE,
stderr=logf)
nbest_proc = subprocess.Popen([thirdparty_binary('nbest-to-ctm'),
'--frame-shift={}'.format(frame_shift),
"ark:-", phone_ctm_path],
stdin=phone_proc.stdout,
stderr=logf)
nbest_proc.communicate()
[docs]def convert_ali_to_textgrids(output_directory, model_directory, dictionary, corpus, num_jobs):
"""
Multiprocessing function that aligns based on the current model
See:
- http://kaldi-asr.org/doc/linear-to-nbest_8cc.html
- http://kaldi-asr.org/doc/lattice-align-words_8cc.html
- http://kaldi-asr.org/doc/lattice-to-phone-lattice_8cc.html
- http://kaldi-asr.org/doc/nbest-to-ctm_8cc.html
for more details
on the Kaldi binaries this function calls.
Also see https://github.com/kaldi-asr/kaldi/blob/master/egs/wsj/s5/steps/get_train_ctm.sh
for the bash script that this function was based on.
Parameters
----------
output_directory : str
Directory to write TextGrid files to
model_directory : str
Directory of training (monophone, triphone, speaker-adapted triphone
training directories)
dictionary : :class:`~aligner.dictionary.Dictionary`
Dictionary object that has information about pronunciations
corpus : :class:`~aligner.corpus.Corpus`
Corpus object that has information about the dataset
num_jobs : int
The number of processes to use in calculation
Raises
------
CorpusError
If the files per speaker exceeds the number of files that are
allowed to be open on the computer (for Unix-based systems)
"""
jobs = [(output_directory, model_directory, dictionary, corpus, x)
for x in range(num_jobs)]
with mp.Pool(processes=num_jobs) as pool:
r = False
try:
results = [pool.apply_async(ali_to_textgrid_func, args=i) for i in jobs]
output = [p.get() for p in results]
except OSError as e:
if hasattr(e, 'errno') and e.errorno == 24:
r = True
else:
raise
if r:
raise (CorpusError(
'There were too many files per speaker to process based on your OS settings. Please try to split your data into more speakers.'))
word_ctm = {}
phone_ctm = {}
for i in range(num_jobs):
word_ctm_path = os.path.join(model_directory, 'word_ctm.{}'.format(i))
phone_ctm_path = os.path.join(model_directory, 'phone_ctm.{}'.format(i))
if not os.path.exists(word_ctm_path):
continue
parsed = parse_ctm(word_ctm_path, corpus, dictionary, mode='word')
for k, v in parsed.items():
if k not in word_ctm:
word_ctm[k] = v
else:
word_ctm[k].update(v)
parsed = parse_ctm(phone_ctm_path, corpus, dictionary, mode='phone')
for k, v in parsed.items():
if k not in phone_ctm:
phone_ctm[k] = v
else:
phone_ctm[k].update(v)
ctm_to_textgrid(word_ctm, phone_ctm, output_directory, corpus, dictionary)
def tree_stats_func(directory, ci_phones, mdl, feat_path, ali_path, job_name): # pragma: no cover
context_opts = []
log_path = os.path.join(directory, 'log', 'acc_tree.{}.log'.format(job_name))
treeacc_path = os.path.join(directory, '{}.treeacc'.format(job_name))
with open(log_path, 'w', encoding='utf8') as logf:
subprocess.call([thirdparty_binary('acc-tree-stats')] + context_opts +
['--ci-phones=' + ci_phones, mdl, "ark:" + feat_path,
"ark:" + ali_path,
treeacc_path], stderr=logf)
[docs]def tree_stats(directory, align_directory, split_directory, ci_phones, num_jobs, fmllr=False):
"""
Multiprocessing function that computes stats for decision tree training
See http://kaldi-asr.org/doc/acc-tree-stats_8cc.html for more details
on the Kaldi binary this runs.
Parameters
----------
directory : str
Directory of training (triphone, speaker-adapted triphone
training directories)
align_directory : str
Directory of previous alignment
split_directory : str
Directory of training data split into the number of jobs
ci_phones : str
Colon-separated list of context-independent phones
num_jobs : int
The number of processes to use in calculation
fmllr : bool, optional
Whether the current training session is using fMLLR (speaker-adaptation),
defaults to False
"""
feat_name = 'cmvndeltafeats'
if fmllr:
feat_name += '_fmllr'
feat_name += '.{}'
mdl_path = os.path.join(align_directory, 'final.mdl')
jobs = [(directory, ci_phones, mdl_path,
os.path.join(split_directory, feat_name.format(x)),
os.path.join(align_directory, 'ali.{}'.format(x)), x)
for x in range(num_jobs)]
with mp.Pool(processes=num_jobs) as pool:
results = [pool.apply_async(tree_stats_func, args=i) for i in jobs]
output = [p.get() for p in results]
tree_accs = [os.path.join(directory, '{}.treeacc'.format(x)) for x in range(num_jobs)]
log_path = os.path.join(directory, 'log', 'sum_tree_acc.log')
with open(log_path, 'w', encoding='utf8') as logf:
subprocess.call([thirdparty_binary('sum-tree-stats'), os.path.join(directory, 'treeacc')] +
tree_accs, stderr=logf)
for f in tree_accs:
os.remove(f)
def convert_alignments_func(directory, align_directory, job_name): # pragma: no cover
mdl_path = os.path.join(directory, '1.mdl')
tree_path = os.path.join(directory, 'tree')
ali_mdl_path = os.path.join(align_directory, 'final.mdl')
ali_path = os.path.join(align_directory, 'ali.{}'.format(job_name))
new_ali_path = os.path.join(directory, 'ali.{}'.format(job_name))
log_path = os.path.join(directory, 'log', 'convert.{}.log'.format(job_name))
with open(log_path, 'w', encoding='utf8') as logf:
subprocess.call([thirdparty_binary('convert-ali'), ali_mdl_path,
mdl_path, tree_path, "ark:" + ali_path,
"ark:" + new_ali_path], stderr=logf)
[docs]def convert_alignments(directory, align_directory, num_jobs):
"""
Multiprocessing function that converts alignments from previous training
See http://kaldi-asr.org/doc/convert-ali_8cc.html for more details
on the Kaldi binary this runs.
Parameters
----------
directory : str
Directory of training (triphone, speaker-adapted triphone
training directories)
align_directory : str
Directory of previous alignment
num_jobs : int
The number of processes to use in calculation
"""
jobs = [(directory, align_directory, x)
for x in range(num_jobs)]
with mp.Pool(processes=num_jobs) as pool:
results = [pool.apply_async(convert_alignments_func, args=i) for i in jobs]
output = [p.get() for p in results]
def calc_fmllr_func(directory, split_directory, sil_phones, job_name, config, initial,
model_name='final'): # pragma: no cover
feat_path = os.path.join(split_directory, 'cmvndeltafeats')
if not initial:
feat_path += '_fmllr'
feat_path += '.{}'.format(job_name)
feat_fmllr_path = os.path.join(split_directory, 'cmvndeltafeats_fmllr.{}'.format(job_name))
log_path = os.path.join(directory, 'log', 'fmllr.{}.log'.format(job_name))
ali_path = os.path.join(directory, 'ali.{}'.format(job_name))
mdl_path = os.path.join(directory, '{}.mdl'.format(model_name))
spk2utt_path = os.path.join(split_directory, 'spk2utt.{}'.format(job_name))
utt2spk_path = os.path.join(split_directory, 'utt2spk.{}'.format(job_name))
if not initial:
tmp_trans_path = os.path.join(directory, 'trans.temp.{}'.format(job_name))
trans_path = os.path.join(directory, 'trans.{}'.format(job_name))
cmp_trans_path = os.path.join(directory, 'trans.cmp.{}'.format(job_name))
else:
tmp_trans_path = os.path.join(directory, 'trans.{}'.format(job_name))
post_path = os.path.join(directory, 'post.{}'.format(job_name))
weight_path = os.path.join(directory, 'weight.{}'.format(job_name))
with open(log_path, 'w', encoding='utf8') as logf:
subprocess.call([thirdparty_binary('ali-to-post'),
"ark:" + ali_path, 'ark:' + post_path], stderr=logf)
subprocess.call([thirdparty_binary('weight-silence-post'), '0.0',
sil_phones, mdl_path, 'ark:' + post_path,
'ark:' + weight_path], stderr=logf)
subprocess.call([thirdparty_binary('gmm-est-fmllr'),
'--verbose=4',
'--fmllr-update-type={}'.format(config.fmllr_update_type),
'--spk2utt=ark:' + spk2utt_path, mdl_path, "ark,s,cs:" + feat_path,
'ark,s,cs:' + weight_path, 'ark:' + tmp_trans_path],
stderr=logf)
if not initial:
subprocess.call([thirdparty_binary('compose-transforms'),
'--b-is-affine=true',
'ark:' + tmp_trans_path, 'ark:' + trans_path,
'ark:' + cmp_trans_path], stderr=logf)
os.remove(tmp_trans_path)
os.remove(trans_path)
os.rename(cmp_trans_path, trans_path)
feat_path = os.path.join(split_directory, 'cmvndeltafeats.{}'.format(job_name))
else:
trans_path = tmp_trans_path
subprocess.call([thirdparty_binary('transform-feats'),
'--utt2spk=ark:' + utt2spk_path,
'ark:' + trans_path, 'ark:' + feat_path,
'ark:' + feat_fmllr_path],
stderr=logf)
[docs]def calc_fmllr(directory, split_directory, sil_phones, num_jobs, config,
initial=False, iteration=None):
"""
Multiprocessing function that computes speaker adaptation (fMLLR)
See:
- http://kaldi-asr.org/doc/gmm-est-fmllr_8cc.html
- http://kaldi-asr.org/doc/ali-to-post_8cc.html
- http://kaldi-asr.org/doc/weight-silence-post_8cc.html
- http://kaldi-asr.org/doc/compose-transforms_8cc.html
- http://kaldi-asr.org/doc/transform-feats_8cc.html
for more details
on the Kaldi binary this runs.
Also see https://github.com/kaldi-asr/kaldi/blob/master/egs/wsj/s5/steps/align_fmllr.sh
for the original bash script that this function was based on.
Parameters
----------
directory : str
Directory of training (triphone, speaker-adapted triphone
training directories)
split_directory : str
Directory of training data split into the number of jobs
sil_phones : str
Colon-separated list of silence phones
num_jobs : int
The number of processes to use in calculation
config : :class:`~aligner.config.TriphoneFmllrConfig`
Configuration object for training
initial : bool, optional
Whether this is the first computation of speaker-adaptation,
defaults to False
iteration : int
Specifies the current iteration, defaults to None
"""
if iteration is None:
model_name = 'final'
else:
model_name = iteration
jobs = [(directory, split_directory, sil_phones, x, config, initial, model_name)
for x in range(num_jobs)]
with mp.Pool(processes=num_jobs) as pool:
results = [pool.apply_async(calc_fmllr_func, args=i) for i in jobs]
output = [p.get() for p in results]