Source code for montreal_forced_aligner.transcription.multiprocessing

"""
Transcription functions
-----------------------

"""
from __future__ import annotations

import os
import re
import subprocess
import typing
from pathlib import Path
from typing import TYPE_CHECKING, Dict, List, TextIO

import pynini
from sqlalchemy.orm import Session, joinedload, subqueryload

from montreal_forced_aligner.abc import KaldiFunction, MetaDict
from montreal_forced_aligner.data import MfaArguments
from montreal_forced_aligner.db import Job, Phone, Utterance
from montreal_forced_aligner.helper import mfa_open
from montreal_forced_aligner.utils import thirdparty_binary

if TYPE_CHECKING:
    from dataclasses import dataclass
else:
    from dataclassy import dataclass


__all__ = [
    "compose_g",
    "compose_lg",
    "compose_clg",
    "compose_hclg",
    "compose_g_carpa",
    "FmllrRescoreFunction",
    "FinalFmllrFunction",
    "InitialFmllrFunction",
    "LatGenFmllrFunction",
    "CarpaLmRescoreFunction",
    "DecodeFunction",
    "LmRescoreFunction",
    "CreateHclgFunction",
]


[docs] @dataclass class CreateHclgArguments(MfaArguments): """ Arguments for :class:`~montreal_forced_aligner.transcription.multiprocessing.CreateHclgFunction` Parameters ---------- job_name: int Integer ID of the job db_string: str String for database connections log_path: :class:`~pathlib.Path` Path to save logging information during the run working_directory: :class:`~pathlib.Path` Current working directory words_path: :class:`~pathlib.Path` Path to words symbol table carpa_path: :class:`~pathlib.Path` Path to .carpa file small_arpa_path: :class:`~pathlib.Path` Path to small ARPA file medium_arpa_path: :class:`~pathlib.Path` Path to medium ARPA file big_arpa_path: :class:`~pathlib.Path` Path to big ARPA file model_path: :class:`~pathlib.Path` Acoustic model path disambig_L_path: :class:`~pathlib.Path` Path to disambiguated lexicon file disambig_int_path: :class:`~pathlib.Path` Path to disambiguation symbol integer file hclg_options: dict[str, Any] HCLG options words_mapping: dict[str, int] Words mapping """ working_directory: Path words_path: Path carpa_path: Path small_arpa_path: Path medium_arpa_path: Path big_arpa_path: Path model_path: Path disambig_L_path: Path disambig_int_path: Path hclg_options: MetaDict words_mapping: Dict[str, int]
[docs] @dataclass class DecodeArguments(MfaArguments): """ Arguments for :class:`~montreal_forced_aligner.transcription.multiprocessing.DecodeFunction` Parameters ---------- job_name: int Integer ID of the job db_string: str String for database connections log_path: :class:`~pathlib.Path` Path to save logging information during the run dictionaries: list[int] List of dictionary ids feature_strings: dict[int, str] Mapping of dictionaries to feature generation strings decode_options: dict[str, Any] Decoding options model_path: :class:`~pathlib.Path` Path to model file lat_paths: dict[int, Path] Per dictionary lattice paths word_symbol_paths: dict[int, Path] Per dictionary word symbol table paths hclg_paths: dict[int, Path] Per dictionary HCLG.fst paths """ dictionaries: List[int] feature_strings: Dict[int, str] decode_options: MetaDict model_path: Path lat_paths: Dict[int, Path] word_symbol_paths: Dict[int, Path] hclg_paths: Dict[int, Path]
@dataclass class DecodePhoneArguments(MfaArguments): """ Arguments for :class:`~montreal_forced_aligner.validation.corpus_validator.DecodePhoneFunction` Parameters ---------- job_name: int Integer ID of the job db_string: str String for database connections log_path: :class:`~pathlib.Path` Path to save logging information during the run dictionaries: list[int] List of dictionary ids feature_strings: dict[int, str] Mapping of dictionaries to feature generation strings decode_options: dict[str, Any] Decoding options model_path: :class:`~pathlib.Path` Path to model file lat_paths: dict[int, Path] Per dictionary lattice paths phone_symbol_path: :class:`~pathlib.Path` Phone symbol table paths hclg_path: :class:`~pathlib.Path` HCLG.fst paths """ dictionaries: List[int] feature_strings: Dict[int, str] decode_options: MetaDict model_path: Path lat_paths: Dict[int, Path] phone_symbol_path: Path hclg_path: Path
[docs] @dataclass class LmRescoreArguments(MfaArguments): """ Arguments for :class:`~montreal_forced_aligner.transcription.multiprocessing.LmRescoreFunction` Parameters ---------- job_name: int Integer ID of the job db_string: str String for database connections log_path: :class:`~pathlib.Path` Path to save logging information during the run dictionaries: list[int] List of dictionary ids lm_rescore_options: dict[str, Any] Rescoring options lat_paths: dict[int, Path] Per dictionary lattice paths rescored_lat_paths: dict[int, Path] Per dictionary rescored lattice paths old_g_paths: dict[int, Path] Mapping of dictionaries to small G.fst paths new_g_paths: dict[int, Path] Mapping of dictionaries to medium G.fst paths """ dictionaries: List[int] lm_rescore_options: MetaDict lat_paths: Dict[int, Path] rescored_lat_paths: Dict[int, Path] old_g_paths: Dict[int, Path] new_g_paths: Dict[int, Path]
[docs] @dataclass class CarpaLmRescoreArguments(MfaArguments): """ Arguments for :class:`~montreal_forced_aligner.transcription.multiprocessing.CarpaLmRescoreFunction` Parameters ---------- job_name: int Integer ID of the job db_string: str String for database connections log_path: :class:`~pathlib.Path` Path to save logging information during the run dictionaries: list[int] List of dictionary ids lat_paths: dict[int, Path] Per dictionary lattice paths rescored_lat_paths: dict[int, Path] Per dictionary rescored lattice paths old_g_paths: dict[int, Path] Mapping of dictionaries to medium G.fst paths new_g_paths: dict[int, Path] Mapping of dictionaries to G.carpa paths """ dictionaries: List[int] lat_paths: Dict[int, Path] rescored_lat_paths: Dict[int, Path] old_g_paths: Dict[int, Path] new_g_paths: Dict[int, Path]
[docs] @dataclass class InitialFmllrArguments(MfaArguments): """ Arguments for :class:`~montreal_forced_aligner.transcription.multiprocessing.InitialFmllrFunction` Parameters ---------- job_name: int Integer ID of the job db_string: str String for database connections log_path: :class:`~pathlib.Path` Path to save logging information during the run dictionaries: list[int] List of dictionary ids feature_strings: dict[int, str] Mapping of dictionaries to feature generation strings model_path: :class:`~pathlib.Path` Path to model file fmllr_options: dict[str, Any] fMLLR options pre_trans_paths: dict[int, Path] Per dictionary pre-fMLLR lattice paths lat_paths: dict[int, Path] Per dictionary lattice paths spk2utt_paths: dict[int, Path] Per dictionary speaker to utterance mapping paths """ dictionaries: List[int] feature_strings: Dict[int, str] model_path: Path fmllr_options: MetaDict pre_trans_paths: Dict[int, Path] lat_paths: Dict[int, Path] spk2utt_paths: Dict[int, Path]
[docs] @dataclass class LatGenFmllrArguments(MfaArguments): """ Arguments for :class:`~montreal_forced_aligner.transcription.multiprocessing.LatGenFmllrFunction` Parameters ---------- job_name: int Integer ID of the job db_string: str String for database connections log_path: :class:`~pathlib.Path` Path to save logging information during the run dictionaries: list[int] List of dictionary ids feature_strings: dict[int, str] Mapping of dictionaries to feature generation strings model_path: :class:`~pathlib.Path` Path to model file decode_options: dict[str, Any] Decoding options hclg_paths: dict[int, Path] Per dictionary HCLG.fst paths tmp_lat_paths: dict[int, Path] Per dictionary temporary lattice paths """ dictionaries: List[int] feature_strings: Dict[int, str] model_path: Path decode_options: MetaDict word_symbol_paths: Dict[int, Path] hclg_paths: typing.Union[Dict[int, Path], Path] tmp_lat_paths: Dict[int, Path]
[docs] @dataclass class FinalFmllrArguments(MfaArguments): """ Arguments for :class:`~montreal_forced_aligner.transcription.multiprocessing.FinalFmllrFunction` Parameters ---------- job_name: int Integer ID of the job db_string: str String for database connections log_path: :class:`~pathlib.Path` Path to save logging information during the run dictionaries: list[int] List of dictionary ids feature_strings: dict[int, str] Mapping of dictionaries to feature generation strings model_path: :class:`~pathlib.Path` Path to model file fmllr_options: dict[str, Any] fMLLR options trans_paths: dict[int, Path] Per dictionary transform paths spk2utt_paths: dict[int, Path] Per dictionary speaker to utterance mapping paths tmp_lat_paths: dict[int, Path] Per dictionary temporary lattice paths """ dictionaries: List[int] feature_strings: Dict[int, str] model_path: Path fmllr_options: MetaDict trans_paths: Dict[int, Path] spk2utt_paths: Dict[int, Path] tmp_lat_paths: Dict[int, Path]
[docs] @dataclass class FmllrRescoreArguments(MfaArguments): """ Arguments for :class:`~montreal_forced_aligner.transcription.multiprocessing.FmllrRescoreFunction` Parameters ---------- job_name: int Integer ID of the job db_string: str String for database connections log_path: :class:`~pathlib.Path` Path to save logging information during the run dictionaries: list[int] List of dictionary ids feature_strings: dict[int, str] Mapping of dictionaries to feature generation strings model_path: :class:`~pathlib.Path` Path to model file fmllr_options: dict[str, Any] fMLLR options tmp_lat_paths: dict[int, Path] Per dictionary temporary lattice paths final_lat_paths: dict[int, Path] Per dictionary lattice paths """ dictionaries: List[int] feature_strings: Dict[int, str] model_path: Path fmllr_options: MetaDict tmp_lat_paths: Dict[int, Path] final_lat_paths: Dict[int, Path]
[docs] def compose_lg(dictionary_path: Path, small_g_path: Path, lg_path: Path, log_file: TextIO) -> None: """ Compose an LG.fst See Also -------- :kaldi_src:`fsttablecompose` Relevant Kaldi binary :kaldi_src:`fstdeterminizestar` Relevant Kaldi binary :kaldi_src:`fstminimizeencoded` Relevant Kaldi binary :kaldi_src:`fstpushspecial` Relevant Kaldi binary Parameters ---------- dictionary_path: :class:`~pathlib.Path` Path to a lexicon fst file small_g_path: :class:`~pathlib.Path` Path to the small language model's G.fst lg_path: :class:`~pathlib.Path` Output path to LG.fst log_file: TextIO Log file handler to output logging info to """ if os.path.exists(lg_path): return compose_proc = subprocess.Popen( [thirdparty_binary("fsttablecompose"), dictionary_path, small_g_path], stderr=log_file, stdout=subprocess.PIPE, env=os.environ, ) determinize_proc = subprocess.Popen( [ thirdparty_binary("fstdeterminizestar"), "--use-log=true", ], stdin=compose_proc.stdout, stdout=subprocess.PIPE, stderr=log_file, env=os.environ, ) minimize_proc = subprocess.Popen( [thirdparty_binary("fstminimizeencoded")], stdin=determinize_proc.stdout, stdout=subprocess.PIPE, stderr=log_file, env=os.environ, ) push_proc = subprocess.Popen( [thirdparty_binary("fstpushspecial"), "-", lg_path], stdin=minimize_proc.stdout, stderr=log_file, env=os.environ, ) push_proc.communicate()
[docs] def compose_clg( in_disambig: typing.Optional[Path], out_disambig: typing.Optional[Path], context_width: int, central_pos: int, ilabels_temp: Path, lg_path: Path, clg_path: Path, log_file: TextIO, ) -> None: """ Compose a CLG.fst See Also -------- :kaldi_src:`fstcomposecontext` Relevant Kaldi binary :openfst_src:`fstarcsort` Relevant OpenFst binary Parameters ---------- in_disambig: :class:`~pathlib.Path` Path to read disambiguation symbols file out_disambig: :class:`~pathlib.Path` Path to write disambiguation symbols file context_width: int Context width of the acoustic model central_pos: int Central position of the acoustic model ilabels_temp: :class:`~pathlib.Path` Temporary file for ilabels lg_path: :class:`~pathlib.Path` Path to a LG.fst file clg_path: :class:`~pathlib.Path` Path to save CLG.fst file log_file: TextIO Log file handler to output logging info to """ com = [ thirdparty_binary("fstcomposecontext"), f"--context-size={context_width}", f"--central-position={central_pos}", ] if in_disambig: com.append(f"--read-disambig-syms={in_disambig}") if out_disambig: com.append(f"--write-disambig-syms={out_disambig}") com.extend([ilabels_temp, lg_path]) compose_proc = subprocess.Popen( com, stdout=subprocess.PIPE, stderr=log_file, ) sort_proc = subprocess.Popen( [thirdparty_binary("fstarcsort"), "--sort_type=ilabel", "-", clg_path], stdin=compose_proc.stdout, stderr=log_file, env=os.environ, ) sort_proc.communicate()
[docs] def compose_hclg( model_path: Path, ilabels_temp: Path, transition_scale: float, clg_path: Path, hclga_path: Path, log_file: TextIO, ) -> None: """ Compost HCLG.fst for a dictionary See Also -------- :kaldi_src:`make-h-transducer` Relevant Kaldi binary :kaldi_src:`fsttablecompose` Relevant Kaldi binary :kaldi_src:`fstdeterminizestar` Relevant Kaldi binary :kaldi_src:`fstrmsymbols` Relevant Kaldi binary :kaldi_src:`fstrmepslocal` Relevant Kaldi binary :kaldi_src:`fstminimizeencoded` Relevant Kaldi binary :openfst_src:`fstarcsort` Relevant OpenFst binary Parameters ---------- model_path: :class:`~pathlib.Path` Path to acoustic model ilabels_temp: :class:`~pathlib.Path` Path to temporary ilabels file transition_scale: float Transition scale for the fst clg_path: :class:`~pathlib.Path` Path to CLG.fst file hclga_path: :class:`~pathlib.Path` Path to save HCLGa.fst file log_file: TextIO Log file handler to output logging info to """ tree_path = model_path.with_name("tree") ha_path = hclga_path.with_stem("Ha" + hclga_path.stem.split(".")[-1]) ha_out_disambig = hclga_path.with_stem("disambig_tid" + hclga_path.stem.split(".")[-1]) make_h_proc = subprocess.Popen( [ thirdparty_binary("make-h-transducer"), f"--disambig-syms-out={ha_out_disambig}", f"--transition-scale={transition_scale}", ilabels_temp, tree_path, model_path, ha_path, ], stderr=log_file, stdout=log_file, env=os.environ, ) make_h_proc.communicate() compose_proc = subprocess.Popen( [thirdparty_binary("fsttablecompose"), ha_path, clg_path], stderr=log_file, stdout=subprocess.PIPE, env=os.environ, ) determinize_proc = subprocess.Popen( [thirdparty_binary("fstdeterminizestar"), "--use-log=true"], stdin=compose_proc.stdout, stdout=subprocess.PIPE, stderr=log_file, env=os.environ, ) rmsymbols_proc = subprocess.Popen( [thirdparty_binary("fstrmsymbols"), ha_out_disambig], stdin=determinize_proc.stdout, stdout=subprocess.PIPE, stderr=log_file, env=os.environ, ) rmeps_proc = subprocess.Popen( [thirdparty_binary("fstrmepslocal")], stdin=rmsymbols_proc.stdout, stdout=subprocess.PIPE, stderr=log_file, env=os.environ, ) minimize_proc = subprocess.Popen( [thirdparty_binary("fstminimizeencoded"), "-", hclga_path], stdin=rmeps_proc.stdout, stderr=log_file, env=os.environ, ) minimize_proc.communicate()
[docs] def compose_g(arpa_path: Path, words_path: Path, g_path: Path, log_file: TextIO) -> None: """ Create G.fst from an ARPA formatted language model See Also -------- :kaldi_src:`arpa2fst` Relevant Kaldi binary Parameters ---------- arpa_path: :class:`~pathlib.Path` Path to ARPA file words_path: :class:`~pathlib.Path` Path to words symbols file g_path: :class:`~pathlib.Path` Path to output G.fst file log_file: TextIO Log file handler to output logging info to """ arpafst_proc = subprocess.Popen( [ thirdparty_binary("arpa2fst"), "--disambig-symbol=#0", f"--read-symbol-table={words_path}", arpa_path, g_path, ], stderr=log_file, stdout=log_file, ) arpafst_proc.communicate()
[docs] def compose_g_carpa( in_carpa_path: Path, temp_carpa_path: Path, words_mapping: Dict[str, int], carpa_path: Path, log_file: TextIO, ): """ Compose a large ARPA model into a G.carpa file See Also -------- :kaldi_src:`arpa-to-const-arpa` Relevant Kaldi binary Parameters ---------- in_carpa_path: :class:`~pathlib.Path` Input ARPA model path temp_carpa_path: :class:`~pathlib.Path` Temporary CARPA model path words_mapping: dict[str, int] Words symbols mapping carpa_path: :class:`~pathlib.Path` Path to save output G.carpa log_file: TextIO Log file handler to output logging info to """ bos_symbol = words_mapping["<s>"] eos_symbol = words_mapping["</s>"] unk_symbol = words_mapping["<unk>"] with mfa_open(in_carpa_path, "r") as f, mfa_open(temp_carpa_path, "w") as outf: current_order = -1 num_oov_lines = 0 for line in f: line = line.strip() col = line.split() if current_order == -1 and not re.match(r"^\\data\\$", line): continue if re.match(r"^\\data\\$", line): log_file.write(r"Processing data...\n") current_order = 0 outf.write(line + "\n") elif re.match(r"^\\[0-9]*-grams:$", line): current_order = int(re.sub(r"\\([0-9]*)-grams:$", r"\1", line)) log_file.write(f"Processing {current_order} grams...\n") outf.write(line + "\n") elif re.match(r"^\\end\\$", line): outf.write(line + "\n") elif not line: if current_order >= 1: outf.write("\n") else: if current_order == 0: outf.write(line + "\n") else: if len(col) > 2 + current_order or len(col) < 1 + current_order: raise Exception(f'Bad line in arpa lm "{line}"') prob = col.pop(0) is_oov = False for i in range(current_order): try: col[i] = str(words_mapping[col[i]]) except KeyError: is_oov = True num_oov_lines += 1 break if not is_oov: rest_of_line = " ".join(col) outf.write(f"{prob}\t{rest_of_line}\n") carpa_proc = subprocess.Popen( [ thirdparty_binary("arpa-to-const-arpa"), f"--bos-symbol={bos_symbol}", f"--eos-symbol={eos_symbol}", f"--unk-symbol={unk_symbol}", temp_carpa_path, carpa_path, ], stdin=subprocess.PIPE, stderr=log_file, stdout=log_file, env=os.environ, ) carpa_proc.communicate() os.remove(temp_carpa_path)
[docs] class CreateHclgFunction(KaldiFunction): """ Create HCLG.fst file See Also -------- :meth:`.Transcriber.create_hclgs` Main function that calls this function in parallel :meth:`.Transcriber.create_hclgs_arguments` Job method for generating arguments for this function :kaldi_src:`add-self-loops` Relevant Kaldi binary :openfst_src:`fstconvert` Relevant OpenFst binary Parameters ---------- args: :class:`~montreal_forced_aligner.transcription.multiprocessing.CreateHclgArguments` Arguments for the function """ def __init__(self, args: CreateHclgArguments): super().__init__(args) self.working_directory = args.working_directory self.words_path = args.words_path self.carpa_path = args.carpa_path self.small_arpa_path = args.small_arpa_path self.medium_arpa_path = args.medium_arpa_path self.big_arpa_path = args.big_arpa_path self.model_path = args.model_path self.disambig_L_path = args.disambig_L_path self.disambig_int_path = args.disambig_int_path self.hclg_options = args.hclg_options self.words_mapping = args.words_mapping def _run(self) -> typing.Generator[typing.Tuple[bool, str]]: """Run the function""" hclg_path = self.working_directory.joinpath(f"HCLG.{self.job_name}.fst") small_g_path = hclg_path.with_stem(f"G_small.{self.job_name}") medium_g_path = hclg_path.with_stem(f"G_med.{self.job_name}") lg_path = hclg_path.with_stem(f"LG.{self.job_name}") hclga_path = hclg_path.with_stem(f"HCLGa.{self.job_name}") if os.path.exists(hclg_path): return with mfa_open(self.log_path, "w") as log_file: context_width = self.hclg_options["context_width"] central_pos = self.hclg_options["central_pos"] clg_path = hclg_path.with_stem(f"CLG_{context_width}_{central_pos}.{self.job_name}") ilabels_temp = hclg_path.with_name( f"ilabels_{context_width}_{central_pos}.{self.job_name}" ) out_disambig = hclg_path.with_name( f"disambig_ilabels_{context_width}_{central_pos}_{self.job_name}.int" ) log_file.write("Generating decoding graph...\n") if not os.path.exists(small_g_path): log_file.write("Generating small_G.fst...") compose_g(self.small_arpa_path, self.words_path, small_g_path, log_file) yield 1 if not os.path.exists(medium_g_path): log_file.write("Generating med_G.fst...") compose_g(self.medium_arpa_path, self.words_path, medium_g_path, log_file) yield 1 if not os.path.exists(self.carpa_path): log_file.write("Generating G.carpa...") temp_carpa_path = self.carpa_path.with_suffix(".temp") compose_g_carpa( self.big_arpa_path, temp_carpa_path, self.words_mapping, self.carpa_path, log_file, ) yield 1 if not os.path.exists(lg_path): log_file.write("Generating LG.fst...") compose_lg(self.disambig_L_path, small_g_path, lg_path, log_file) yield 1 if not os.path.exists(clg_path): log_file.write("Generating CLG.fst...") compose_clg( self.disambig_int_path, out_disambig, context_width, central_pos, ilabels_temp, lg_path, clg_path, log_file, ) yield 1 if not os.path.exists(hclga_path): log_file.write("Generating HCLGa.fst...") compose_hclg( self.model_path, ilabels_temp, self.hclg_options["transition_scale"], clg_path, hclga_path, log_file, ) yield 1 log_file.write("Generating HCLG.fst...") self_loop_proc = subprocess.Popen( [ thirdparty_binary("add-self-loops"), f"--self-loop-scale={self.hclg_options['self_loop_scale']}", "--reorder=true", self.model_path, hclga_path, ], stderr=log_file, stdout=subprocess.PIPE, env=os.environ, ) convert_proc = subprocess.Popen( [ thirdparty_binary("fstconvert"), "--v=100", "--fst_type=const", "-", hclg_path, ], stdin=self_loop_proc.stdout, stderr=log_file, env=os.environ, ) convert_proc.communicate() self.check_call(convert_proc) if hclg_path.exists(): yield True, hclg_path else: yield False, hclg_path
[docs] class DecodeFunction(KaldiFunction): """ Multiprocessing function for performing decoding See Also -------- :meth:`.TranscriberMixin.transcribe_utterances` Main function that calls this function in parallel :meth:`.TranscriberMixin.decode_arguments` Job method for generating arguments for this function :kaldi_src:`gmm-latgen-faster` Relevant Kaldi binary Parameters ---------- args: :class:`~montreal_forced_aligner.transcription.multiprocessing.DecodeArguments` Arguments for the function """ progress_pattern = re.compile( r"^LOG.*Log-like per frame for utterance (?P<utterance>.*) is (?P<loglike>[-\d.]+) over (?P<num_frames>\d+) frames." ) def __init__(self, args: DecodeArguments): super().__init__(args) self.dictionaries = args.dictionaries self.feature_strings = args.feature_strings self.lat_paths = args.lat_paths self.word_symbol_paths = args.word_symbol_paths self.hclg_paths = args.hclg_paths self.decode_options = args.decode_options self.model_path = args.model_path def _run(self) -> typing.Generator[typing.Tuple[str, float, int]]: """Run the function""" with mfa_open(self.log_path, "w") as log_file: log_file.write(f"{self.decode_options}\n") for dict_id in self.dictionaries: feature_string = self.feature_strings[dict_id] lat_path = self.lat_paths[dict_id] word_symbol_path = self.word_symbol_paths[dict_id] hclg_path = self.hclg_paths[dict_id] if os.path.exists(lat_path): continue if ( self.decode_options["uses_speaker_adaptation"] and self.decode_options["first_beam"] is not None ): beam = self.decode_options["first_beam"] else: beam = self.decode_options["beam"] if ( self.decode_options["uses_speaker_adaptation"] and self.decode_options["first_max_active"] is not None ): max_active = self.decode_options["first_max_active"] else: max_active = self.decode_options["max_active"] decode_proc = subprocess.Popen( [ thirdparty_binary("gmm-latgen-faster"), f"--max-active={max_active}", f"--beam={beam}", f"--lattice-beam={self.decode_options['lattice_beam']}", "--allow-partial=true", f"--word-symbol-table={word_symbol_path}", f"--acoustic-scale={self.decode_options['acoustic_scale']}", self.model_path, hclg_path, feature_string, f"ark:{lat_path}", ], stderr=subprocess.PIPE, env=os.environ, encoding="utf8", ) for line in decode_proc.stderr: log_file.write(line) m = self.progress_pattern.match(line.strip()) if m: yield m.group("utterance"), float(m.group("loglike")), int( m.group("num_frames") ) self.check_call(decode_proc)
[docs] class LmRescoreFunction(KaldiFunction): """ Multiprocessing function rescore lattices by replacing the small G.fst with the medium G.fst See Also -------- :meth:`.TranscriberMixin.transcribe_utterances` Main function that calls this function in parallel :meth:`.TranscriberMixin.lm_rescore_arguments` Job method for generating arguments for this function :kaldi_src:`lattice-lmrescore-pruned` Relevant Kaldi binary :openfst_src:`fstproject` Relevant OpenFst binary Parameters ---------- args: :class:`~montreal_forced_aligner.transcription.multiprocessing.LmRescoreArguments` Arguments for the function """ progress_pattern = re.compile( r"^LOG .* Overall, succeeded for (?P<succeeded>\d+) lattices, failed for (?P<failed>\d+)" ) def __init__(self, args: LmRescoreArguments): super().__init__(args) self.dictionaries = args.dictionaries self.lat_paths = args.lat_paths self.rescored_lat_paths = args.rescored_lat_paths self.old_g_paths = args.old_g_paths self.new_g_paths = args.new_g_paths self.lm_rescore_options = args.lm_rescore_options def _run(self) -> typing.Generator[typing.Tuple[int, int]]: """Run the function""" with mfa_open(self.log_path, "w") as log_file: for dict_id in self.dictionaries: lat_path = self.lat_paths[dict_id] rescored_lat_path = self.rescored_lat_paths[dict_id] old_g_path = self.old_g_paths[dict_id] new_g_path = self.new_g_paths[dict_id] project_type_arg = "--project_type=output" if os.path.exists(rescored_lat_path): continue project_proc = subprocess.Popen( [thirdparty_binary("fstproject"), project_type_arg, old_g_path], stdout=subprocess.PIPE, stderr=log_file, env=os.environ, ) lattice_scale_proc = subprocess.Popen( [ thirdparty_binary("lattice-lmrescore-pruned"), f"--acoustic-scale={self.lm_rescore_options['acoustic_scale']}", "-", f'fstproject {project_type_arg} "{new_g_path}" |', f"ark,s,cs:{lat_path}", f"ark:{rescored_lat_path}", ], stdin=project_proc.stdout, stderr=subprocess.PIPE, env=os.environ, encoding="utf8", ) for line in lattice_scale_proc.stderr: log_file.write(line) m = self.progress_pattern.match(line.strip()) if m: yield int(m.group("succeeded")), int(m.group("failed")) self.check_call(lattice_scale_proc)
[docs] class CarpaLmRescoreFunction(KaldiFunction): """ Multiprocessing function to rescore lattices by replacing medium G.fst with large G.carpa See Also -------- :meth:`.TranscriberMixin.transcribe_utterances` Main function that calls this function in parallel :meth:`.TranscriberMixin.carpa_lm_rescore_arguments` Job method for generating arguments for this function :openfst_src:`fstproject` Relevant OpenFst binary :kaldi_src:`lattice-lmrescore` Relevant Kaldi binary :kaldi_src:`lattice-lmrescore-const-arpa` Relevant Kaldi binary Parameters ---------- args: CarpaLmRescoreArguments Arguments """ progress_pattern = re.compile( r"^LOG .* Overall, succeeded for (?P<succeeded>\d+) lattices, failed for (?P<failed>\d+)" ) def __init__(self, args: CarpaLmRescoreArguments): super().__init__(args) self.dictionaries = args.dictionaries self.lat_paths = args.lat_paths self.rescored_lat_paths = args.rescored_lat_paths self.old_g_paths = args.old_g_paths self.new_g_paths = args.new_g_paths def _run(self) -> typing.Generator[typing.Tuple[int, int]]: """Run the function""" with mfa_open(self.log_path, "a") as log_file: for dict_id in self.dictionaries: project_type_arg = "--project_type=output" lat_path = self.lat_paths[dict_id] rescored_lat_path = self.rescored_lat_paths[dict_id] old_g_path = self.old_g_paths[dict_id] new_g_path = self.new_g_paths[dict_id] if os.path.exists(rescored_lat_path): continue project_proc = subprocess.Popen( [thirdparty_binary("fstproject"), project_type_arg, old_g_path], stdout=subprocess.PIPE, stderr=log_file, env=os.environ, ) lmrescore_proc = subprocess.Popen( [ thirdparty_binary("lattice-lmrescore"), "--lm-scale=-1.0", f"ark,s,cs:{lat_path}", "-", "ark:-", ], stdout=subprocess.PIPE, stdin=project_proc.stdout, stderr=log_file, env=os.environ, ) lmrescore_const_proc = subprocess.Popen( [ thirdparty_binary("lattice-lmrescore-const-arpa"), "--lm-scale=1.0", "ark,s,cs:-", new_g_path, f"ark:{rescored_lat_path}", ], stdin=lmrescore_proc.stdout, stderr=subprocess.PIPE, env=os.environ, encoding="utf8", ) for line in lmrescore_const_proc.stderr: log_file.write(line) m = self.progress_pattern.match(line.strip()) if m: yield int(m.group("succeeded")), int(m.group("failed")) self.check_call(lmrescore_const_proc)
[docs] class InitialFmllrFunction(KaldiFunction): """ Multiprocessing function for running initial fMLLR calculation See Also -------- :meth:`.TranscriberMixin.transcribe_fmllr` Main function that calls this function in parallel :meth:`.TranscriberMixin.initial_fmllr_arguments` Job method for generating arguments for this function :kaldi_src:`lattice-to-post` Relevant Kaldi binary :kaldi_src:`weight-silence-post` Relevant Kaldi binary :kaldi_src:`gmm-post-to-gpost` Relevant Kaldi binary :kaldi_src:`gmm-est-fmllr-gpost` Relevant Kaldi binary Parameters ---------- args: :class:`~montreal_forced_aligner.transcription.multiprocessing.InitialFmllrArguments` Arguments for the function """ progress_pattern = re.compile( r"^LOG.*For speaker \w+, auxf-impr from fMLLR is [\d.]+, over [\d.]+ frames." ) def __init__(self, args: InitialFmllrArguments): super().__init__(args) self.dictionaries = args.dictionaries self.feature_strings = args.feature_strings self.model_path = args.model_path self.fmllr_options = args.fmllr_options self.pre_trans_paths = args.pre_trans_paths self.lat_paths = args.lat_paths self.spk2utt_paths = args.spk2utt_paths def _run(self) -> typing.Generator[int]: """Run the function""" with mfa_open(self.log_path, "w") as log_file: for dict_id in self.dictionaries: lat_path = self.lat_paths[dict_id] feature_string = self.feature_strings[dict_id] spk2utt_path = self.spk2utt_paths[dict_id] trans_path = self.pre_trans_paths[dict_id] latt_post_proc = subprocess.Popen( [ thirdparty_binary("lattice-to-post"), f"--acoustic-scale={self.fmllr_options['acoustic_scale']}", f"ark,s,cs:{lat_path}", "ark:-", ], stdout=subprocess.PIPE, stderr=log_file, env=os.environ, ) weight_silence_proc = subprocess.Popen( [ thirdparty_binary("weight-silence-post"), f"{self.fmllr_options['silence_weight']}", self.fmllr_options["sil_phones"], self.model_path, "ark,s,cs:-", "ark:-", ], stdin=latt_post_proc.stdout, stdout=subprocess.PIPE, stderr=log_file, env=os.environ, ) gmm_gpost_proc = subprocess.Popen( [ thirdparty_binary("gmm-post-to-gpost"), self.model_path, feature_string, "ark,s,cs:-", "ark:-", ], stdin=weight_silence_proc.stdout, stdout=subprocess.PIPE, stderr=log_file, env=os.environ, ) fmllr_proc = subprocess.Popen( [ thirdparty_binary("gmm-est-fmllr-gpost"), f"--fmllr-update-type={self.fmllr_options['fmllr_update_type']}", f"--spk2utt=ark,s,cs:{spk2utt_path}", self.model_path, feature_string, "ark,s,cs:-", f"ark:{trans_path}", ], stdin=gmm_gpost_proc.stdout, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=os.environ, encoding="utf8", ) for line in fmllr_proc.stderr: log_file.write(line) m = self.progress_pattern.match(line.strip()) if m: yield 1 self.check_call(fmllr_proc)
[docs] class LatGenFmllrFunction(KaldiFunction): """ Regenerate lattices using initial fMLLR transforms See Also -------- :meth:`.TranscriberMixin.transcribe_fmllr` Main function that calls this function in parallel :meth:`.TranscriberMixin.lat_gen_fmllr_arguments` Job method for generating arguments for this function :kaldi_src:`gmm-latgen-faster` Relevant Kaldi binary Parameters ---------- args: :class:`~montreal_forced_aligner.transcription.multiprocessing.LatGenFmllrArguments` Arguments for the function """ progress_pattern = re.compile( r"^LOG.*Log-like per frame for utterance (?P<utterance>.*) is (?P<loglike>[-\d.]+) over (?P<num_frames>\d+) frames." ) def __init__(self, args: LatGenFmllrArguments): super().__init__(args) self.dictionaries = args.dictionaries self.feature_strings = args.feature_strings self.tmp_lat_paths = args.tmp_lat_paths self.word_symbol_paths = args.word_symbol_paths self.hclg_paths = args.hclg_paths self.decode_options = args.decode_options self.model_path = args.model_path def _run(self) -> typing.Generator[typing.Tuple[str, float, int]]: """Run the function""" with mfa_open(self.log_path, "w") as log_file: for dict_id in self.dictionaries: feature_string = self.feature_strings[dict_id] if isinstance(self.hclg_paths, dict): words_path = self.word_symbol_paths[dict_id] hclg_path = self.hclg_paths[dict_id] else: words_path = self.word_symbol_paths hclg_path = self.hclg_paths tmp_lat_path = self.tmp_lat_paths[dict_id] lat_gen_proc = subprocess.Popen( [ thirdparty_binary("gmm-latgen-faster"), f"--max-active={self.decode_options['max_active']}", f"--beam={self.decode_options['beam']}", f"--lattice-beam={self.decode_options['lattice_beam']}", f"--acoustic-scale={self.decode_options['acoustic_scale']}", "--determinize-lattice=false", "--allow-partial=true", f"--word-symbol-table={words_path}", self.model_path, hclg_path, feature_string, f"ark:{tmp_lat_path}", ], stderr=subprocess.PIPE, env=os.environ, encoding="utf8", ) for line in lat_gen_proc.stderr: log_file.write(line) log_file.flush() m = self.progress_pattern.match(line.strip()) if m: yield m.group("utterance"), float(m.group("loglike")), int( m.group("num_frames") ) self.check_call(lat_gen_proc)
[docs] class FinalFmllrFunction(KaldiFunction): """ Multiprocessing function for running final fMLLR estimation See Also -------- :meth:`.TranscriberMixin.transcribe_fmllr` Main function that calls this function in parallel :meth:`.TranscriberMixin.final_fmllr_arguments` Job method for generating arguments for this function :kaldi_src:`lattice-determinize-pruned` Relevant Kaldi binary :kaldi_src:`lattice-to-post` Relevant Kaldi binary :kaldi_src:`weight-silence-post` Relevant Kaldi binary :kaldi_src:`gmm-est-fmllr` Relevant Kaldi binary :kaldi_src:`compose-transforms` Relevant Kaldi binary Parameters ---------- args: :class:`~montreal_forced_aligner.transcription.multiprocessing.FinalFmllrArguments` Arguments for the function """ progress_pattern = re.compile( r"^LOG.*For speaker \w+, auxf-impr from fMLLR is [\d.]+, over [\d.]+ frames." ) def __init__(self, args: FinalFmllrArguments): super().__init__(args) self.dictionaries = args.dictionaries self.feature_strings = args.feature_strings self.model_path = args.model_path self.fmllr_options = args.fmllr_options self.trans_paths = args.trans_paths self.tmp_lat_paths = args.tmp_lat_paths self.spk2utt_paths = args.spk2utt_paths def _run(self) -> typing.Generator[int]: """Run the function""" with mfa_open(self.log_path, "w") as log_file: for dict_id in self.dictionaries: feature_string = self.feature_strings[dict_id] trans_path = self.trans_paths[dict_id] temp_trans_path = trans_path.with_suffix(".temp") temp_composed_trans_path = trans_path.with_suffix(".temp_composed") spk2utt_path = self.spk2utt_paths[dict_id] tmp_lat_path = self.tmp_lat_paths[dict_id] determinize_proc = subprocess.Popen( [ thirdparty_binary("lattice-determinize-pruned"), f"--acoustic-scale={self.fmllr_options['acoustic_scale']}", "--beam=4.0", f"ark,s,cs:{tmp_lat_path}", "ark:-", ], stderr=log_file, stdout=subprocess.PIPE, env=os.environ, ) latt_post_proc = subprocess.Popen( [ thirdparty_binary("lattice-to-post"), f"--acoustic-scale={self.fmllr_options['acoustic_scale']}", "ark,s,cs:-", "ark:-", ], stdin=determinize_proc.stdout, stdout=subprocess.PIPE, stderr=log_file, env=os.environ, ) weight_silence_proc = subprocess.Popen( [ thirdparty_binary("weight-silence-post"), f"{self.fmllr_options['silence_weight']}", self.fmllr_options["sil_phones"], self.model_path, "ark,s,cs:-", "ark:-", ], stdin=latt_post_proc.stdout, stdout=subprocess.PIPE, stderr=log_file, env=os.environ, ) fmllr_proc = subprocess.Popen( [ thirdparty_binary("gmm-est-fmllr"), f"--fmllr-update-type={self.fmllr_options['fmllr_update_type']}", f"--spk2utt=ark,s,cs:{spk2utt_path}", self.model_path, feature_string, "ark,s,cs:-", f"ark:{temp_trans_path}", ], stdin=weight_silence_proc.stdout, stderr=subprocess.PIPE, env=os.environ, encoding="utf8", ) for line in fmllr_proc.stderr: log_file.write(line) m = self.progress_pattern.match(line.strip()) if m: yield 1 self.check_call(fmllr_proc) compose_proc = subprocess.Popen( [ thirdparty_binary("compose-transforms"), "--b-is-affine=true", f"ark:{temp_trans_path}", f"ark:{trans_path}", f"ark:{temp_composed_trans_path}", ], stderr=log_file, stdin=fmllr_proc.stdout, env=os.environ, ) compose_proc.communicate() self.check_call(compose_proc) os.remove(trans_path) os.remove(temp_trans_path) os.rename(temp_composed_trans_path, trans_path)
[docs] class FmllrRescoreFunction(KaldiFunction): """ Multiprocessing function to rescore lattices following fMLLR estimation See Also -------- :meth:`.TranscriberMixin.transcribe_fmllr` Main function that calls this function in parallel :meth:`.TranscriberMixin.fmllr_rescore_arguments` Job method for generating arguments for this function :kaldi_src:`gmm-rescore-lattice` Relevant Kaldi binary :kaldi_src:`lattice-determinize-pruned` Relevant Kaldi binary Parameters ---------- args: :class:`~montreal_forced_aligner.transcription.multiprocessing.FmllrRescoreArguments` Arguments for the function """ progress_pattern = re.compile( r"^LOG.*Done (?P<done>\d+) lattices, determinization finished earlier than specified by the beam (or output was empty) on (?P<errors>\d+) of these." ) def __init__(self, args: FmllrRescoreArguments): super().__init__(args) self.dictionaries = args.dictionaries self.feature_strings = args.feature_strings self.model_path = args.model_path self.fmllr_options = args.fmllr_options self.tmp_lat_paths = args.tmp_lat_paths self.final_lat_paths = args.final_lat_paths def _run(self) -> typing.Generator[typing.Tuple[int, int]]: """Run the function""" with mfa_open(self.log_path, "w") as log_file: for dict_id in self.dictionaries: feature_string = self.feature_strings[dict_id] tmp_lat_path = self.tmp_lat_paths[dict_id] final_lat_path = self.final_lat_paths[dict_id] rescore_proc = subprocess.Popen( [ thirdparty_binary("gmm-rescore-lattice"), self.model_path, f"ark,s,cs:{tmp_lat_path}", feature_string, "ark:-", ], stdout=subprocess.PIPE, stderr=log_file, env=os.environ, ) determinize_proc = subprocess.Popen( [ thirdparty_binary("lattice-determinize-pruned"), f"--acoustic-scale={self.fmllr_options['acoustic_scale']}", f"--beam={self.fmllr_options['lattice_beam']}", "ark,s,cs:-", f"ark:{final_lat_path}", ], stdin=rescore_proc.stdout, stderr=subprocess.PIPE, encoding="utf8", env=os.environ, ) for line in determinize_proc.stderr: log_file.write(line) m = self.progress_pattern.match(line.strip()) if m: yield int(m.group("done")), int(m.group("errors")) self.check_call(determinize_proc)
@dataclass class PerSpeakerDecodeArguments(MfaArguments): """Arguments for :class:`~montreal_forced_aligner.validation.corpus_validator.PerSpeakerDecodeFunction`""" model_directory: Path feature_strings: Dict[int, str] lat_paths: Dict[int, Path] model_path: Path disambiguation_symbols_int_path: Path decode_options: MetaDict tree_path: Path order: int method: str class PerSpeakerDecodeFunction(KaldiFunction): """ Multiprocessing function to test utterance transcriptions with utterance and speaker ngram models See Also -------- :kaldi_src:`compile-train-graphs-fsts` Relevant Kaldi binary :kaldi_src:`gmm-latgen-faster` Relevant Kaldi binary :kaldi_src:`lattice-oracle` Relevant Kaldi binary :openfst_src:`farcompilestrings` Relevant OpenFst binary :ngram_src:`ngramcount` Relevant OpenGrm-Ngram binary :ngram_src:`ngrammake` Relevant OpenGrm-Ngram binary :ngram_src:`ngramshrink` Relevant OpenGrm-Ngram binary Parameters ---------- args: :class:`~montreal_forced_aligner.validation.corpus_validator.PerSpeakerDecodeArguments` Arguments for the function """ progress_pattern = re.compile( r"^LOG.*Log-like per frame for utterance (?P<utterance>.*) is (?P<loglike>[-\d.]+) over (?P<num_frames>\d+) frames." ) def __init__(self, args: PerSpeakerDecodeArguments): super().__init__(args) self.feature_strings = args.feature_strings self.disambiguation_symbols_int_path = args.disambiguation_symbols_int_path self.model_directory = args.model_directory self.model_path = args.model_path self.decode_options = args.decode_options self.lat_paths = args.lat_paths self.tree_path = args.tree_path self.order = args.order self.method = args.method self.word_symbols_paths = {} def _run(self) -> typing.Generator[typing.Tuple[int, str]]: """Run the function""" with mfa_open(self.log_path, "w") as log_file, Session(self.db_engine()) as session: job: Job = ( session.query(Job) .options(joinedload(Job.corpus, innerjoin=True), subqueryload(Job.dictionaries)) .filter(Job.id == self.job_name) .first() ) for d in job.dictionaries: self.oov_word = d.oov_word self.word_symbols_paths[d.id] = d.words_symbol_path feature_string = self.feature_strings[d.id] lat_path = self.lat_paths[d.id] latgen_proc = subprocess.Popen( [ thirdparty_binary("gmm-latgen-faster"), f"--acoustic-scale={self.decode_options['acoustic_scale']}", f"--beam={self.decode_options['beam']}", f"--max-active={self.decode_options['max_active']}", f"--lattice-beam={self.decode_options['lattice_beam']}", f"--word-symbol-table={d.words_symbol_path}", self.model_path, "ark,s,cs:-", feature_string, f"ark:{lat_path}", ], stderr=subprocess.PIPE, stdin=subprocess.PIPE, env=os.environ, ) current_speaker = None for utt_id, speaker_id in ( session.query(Utterance.kaldi_id, Utterance.speaker_id) .filter(Utterance.job_id == job.id) .order_by(Utterance.kaldi_id) ): if speaker_id != current_speaker: lm_path = os.path.join(d.temp_directory, f"{speaker_id}.fst") fst = pynini.Fst.read(lm_path) fst_string = fst.write_to_string() del fst latgen_proc.stdin.write(utt_id.encode("utf8") + b" " + fst_string) latgen_proc.stdin.flush() while True: line = latgen_proc.stderr.readline().decode("utf8") line = line.strip() if not line: break log_file.write(line + "\n") log_file.flush() m = self.progress_pattern.match(line.strip()) if m: yield m.group("utterance"), float(m.group("loglike")), int( m.group("num_frames") ) break latgen_proc.stdin.close() self.check_call(latgen_proc) class DecodePhoneFunction(KaldiFunction): """ Multiprocessing function for performing decoding See Also -------- :meth:`.TranscriberMixin.transcribe_utterances` Main function that calls this function in parallel :meth:`.TranscriberMixin.decode_arguments` Job method for generating arguments for this function :kaldi_src:`gmm-latgen-faster` Relevant Kaldi binary Parameters ---------- args: :class:`~montreal_forced_aligner.transcription.multiprocessing.DecodeArguments` Arguments for the function """ progress_pattern = re.compile( r"^LOG.*Log-like per frame for utterance (?P<utterance>.*) is (?P<loglike>[-\d.]+) over (?P<num_frames>\d+) frames." ) def __init__(self, args: DecodePhoneArguments): super().__init__(args) self.dictionaries = args.dictionaries self.feature_strings = args.feature_strings self.lat_paths = args.lat_paths self.phone_symbol_path = args.phone_symbol_path self.hclg_path = args.hclg_path self.decode_options = args.decode_options self.model_path = args.model_path def _run(self) -> typing.Generator[typing.Tuple[str, float, int]]: """Run the function""" with Session(self.db_engine()) as session, mfa_open(self.log_path, "w") as log_file: phones = session.query(Phone.mapping_id, Phone.phone) reversed_phone_mapping = {} for p_id, phone in phones: reversed_phone_mapping[p_id] = phone for dict_id in self.dictionaries: feature_string = self.feature_strings[dict_id] lat_path = self.lat_paths[dict_id] if os.path.exists(lat_path): continue if ( self.decode_options["uses_speaker_adaptation"] and self.decode_options["first_beam"] is not None ): beam = self.decode_options["first_beam"] else: beam = self.decode_options["beam"] if ( self.decode_options["uses_speaker_adaptation"] and self.decode_options["first_max_active"] is not None ): max_active = self.decode_options["first_max_active"] else: max_active = self.decode_options["max_active"] decode_proc = subprocess.Popen( [ thirdparty_binary("gmm-latgen-faster"), f"--max-active={max_active}", f"--beam={beam}", f"--lattice-beam={self.decode_options['lattice_beam']}", "--allow-partial=true", f"--word-symbol-table={self.phone_symbol_path}", f"--acoustic-scale={self.decode_options['acoustic_scale']}", self.model_path, self.hclg_path, feature_string, f"ark:{lat_path}", ], stderr=subprocess.PIPE, env=os.environ, encoding="utf8", ) for line in decode_proc.stderr: log_file.write(line) m = self.progress_pattern.match(line.strip()) if m: yield m.group("utterance"), float(m.group("loglike")), int( m.group("num_frames") ) self.check_call(decode_proc)