Source code for montreal_forced_aligner.transcription.multiprocessing

"""
Transcription functions
-----------------------

"""
from __future__ import annotations

import os
import typing
from pathlib import Path
from typing import TYPE_CHECKING, Dict

from _kalpy.fstext import ConstFst, VectorFst
from _kalpy.lat import CompactLatticeWriter
from _kalpy.lm import ConstArpaLm
from _kalpy.util import BaseFloatMatrixWriter, Int32VectorWriter, ReadKaldiObject
from kalpy.data import KaldiMapping, MatrixArchive
from kalpy.decoder.decode_graph import DecodeGraphCompiler
from kalpy.feat.data import FeatureArchive
from kalpy.feat.fmllr import FmllrComputer
from kalpy.fstext.lexicon import LexiconCompiler
from kalpy.gmm.data import LatticeArchive
from kalpy.gmm.decode import GmmDecoder, GmmRescorer
from kalpy.lm.rescore import LmRescorer
from kalpy.utils import generate_write_specifier
from sqlalchemy.orm import joinedload, subqueryload

from montreal_forced_aligner.abc import KaldiFunction, MetaDict
from montreal_forced_aligner.data import MfaArguments, PhoneType
from montreal_forced_aligner.db import Job, Phone, Utterance
from montreal_forced_aligner.utils import thread_logger

if TYPE_CHECKING:
    from dataclasses import dataclass
else:
    from dataclassy import dataclass


__all__ = [
    "FmllrRescoreFunction",
    "FinalFmllrFunction",
    "InitialFmllrFunction",
    "CarpaLmRescoreFunction",
    "DecodeFunction",
    "LmRescoreFunction",
    "CreateHclgFunction",
]


[docs] @dataclass class CreateHclgArguments(MfaArguments): """ Arguments for :class:`~montreal_forced_aligner.transcription.multiprocessing.CreateHclgFunction` Parameters ---------- job_name: int Integer ID of the job db_string: str String for database connections log_path: :class:`~pathlib.Path` Path to save logging information during the run working_directory: :class:`~pathlib.Path` Current working directory words_path: :class:`~pathlib.Path` Path to words symbol table carpa_path: :class:`~pathlib.Path` Path to .carpa file small_arpa_path: :class:`~pathlib.Path` Path to small ARPA file medium_arpa_path: :class:`~pathlib.Path` Path to medium ARPA file big_arpa_path: :class:`~pathlib.Path` Path to big ARPA file model_path: :class:`~pathlib.Path` Acoustic model path disambig_L_path: :class:`~pathlib.Path` Path to disambiguated lexicon file disambig_int_path: :class:`~pathlib.Path` Path to disambiguation symbol integer file hclg_options: dict[str, Any] HCLG options words_mapping: dict[str, int] Words mapping """ lexicon_compiler: LexiconCompiler working_directory: Path small_arpa_path: Path medium_arpa_path: Path big_arpa_path: Path model_path: Path tree_path: Path hclg_options: MetaDict
[docs] @dataclass class DecodeArguments(MfaArguments): """ Arguments for :class:`~montreal_forced_aligner.transcription.multiprocessing.DecodeFunction` Parameters ---------- job_name: int Integer ID of the job db_string: str String for database connections log_path: :class:`~pathlib.Path` Path to save logging information during the run dictionaries: list[int] List of dictionary ids feature_strings: dict[int, str] Mapping of dictionaries to feature generation strings decode_options: dict[str, Any] Decoding options model_path: :class:`~pathlib.Path` Path to model file lat_paths: dict[int, Path] Per dictionary lattice paths word_symbol_paths: dict[int, Path] Per dictionary word symbol table paths hclg_paths: dict[int, Path] Per dictionary HCLG.fst paths """ working_directory: Path model_path: Path decode_options: MetaDict hclg_paths: Dict[int, Path]
@dataclass class DecodePhoneArguments(MfaArguments): """ Arguments for :class:`~montreal_forced_aligner.validation.corpus_validator.DecodePhoneFunction` Parameters ---------- job_name: int Integer ID of the job db_string: str String for database connections log_path: :class:`~pathlib.Path` Path to save logging information during the run dictionaries: list[int] List of dictionary ids feature_strings: dict[int, str] Mapping of dictionaries to feature generation strings decode_options: dict[str, Any] Decoding options model_path: :class:`~pathlib.Path` Path to model file lat_paths: dict[int, Path] Per dictionary lattice paths phone_symbol_path: :class:`~pathlib.Path` Phone symbol table paths hclg_path: :class:`~pathlib.Path` HCLG.fst paths """ working_directory: Path model_path: Path hclg_path: Path decode_options: MetaDict
[docs] @dataclass class LmRescoreArguments(MfaArguments): """ Arguments for :class:`~montreal_forced_aligner.transcription.multiprocessing.LmRescoreFunction` Parameters ---------- job_name: int Integer ID of the job db_string: str String for database connections log_path: :class:`~pathlib.Path` Path to save logging information during the run dictionaries: list[int] List of dictionary ids lm_rescore_options: dict[str, Any] Rescoring options lat_paths: dict[int, Path] Per dictionary lattice paths rescored_lat_paths: dict[int, Path] Per dictionary rescored lattice paths old_g_paths: dict[int, Path] Mapping of dictionaries to small G.fst paths new_g_paths: dict[int, Path] Mapping of dictionaries to medium G.fst paths """ working_directory: Path lm_rescore_options: MetaDict old_g_paths: Dict[int, Path] new_g_paths: Dict[int, Path]
[docs] @dataclass class CarpaLmRescoreArguments(MfaArguments): """ Arguments for :class:`~montreal_forced_aligner.transcription.multiprocessing.CarpaLmRescoreFunction` Parameters ---------- job_name: int Integer ID of the job db_string: str String for database connections log_path: :class:`~pathlib.Path` Path to save logging information during the run dictionaries: list[int] List of dictionary ids lat_paths: dict[int, Path] Per dictionary lattice paths rescored_lat_paths: dict[int, Path] Per dictionary rescored lattice paths old_g_paths: dict[int, Path] Mapping of dictionaries to medium G.fst paths new_g_paths: dict[int, Path] Mapping of dictionaries to G.carpa paths """ working_directory: Path lm_rescore_options: MetaDict old_g_paths: Dict[int, Path] new_g_paths: Dict[int, Path]
[docs] @dataclass class InitialFmllrArguments(MfaArguments): """ Arguments for :class:`~montreal_forced_aligner.transcription.multiprocessing.InitialFmllrFunction` Parameters ---------- job_name: int Integer ID of the job db_string: str String for database connections log_path: :class:`~pathlib.Path` Path to save logging information during the run dictionaries: list[int] List of dictionary ids feature_strings: dict[int, str] Mapping of dictionaries to feature generation strings model_path: :class:`~pathlib.Path` Path to model file fmllr_options: dict[str, Any] fMLLR options pre_trans_paths: dict[int, Path] Per dictionary pre-fMLLR lattice paths lat_paths: dict[int, Path] Per dictionary lattice paths spk2utt_paths: dict[int, Path] Per dictionary speaker to utterance mapping paths """ working_directory: Path ali_model_path: Path model_path: Path fmllr_options: MetaDict
[docs] @dataclass class FinalFmllrArguments(MfaArguments): """ Arguments for :class:`~montreal_forced_aligner.transcription.multiprocessing.FinalFmllrFunction` Parameters ---------- job_name: int Integer ID of the job db_string: str String for database connections log_path: :class:`~pathlib.Path` Path to save logging information during the run working_directory: :class:`~pathlib.Path` Working directory model_path: :class:`~pathlib.Path` Path to model file fmllr_options: dict[str, Any] fMLLR options """ working_directory: Path ali_model_path: Path model_path: Path fmllr_options: MetaDict
[docs] @dataclass class FmllrRescoreArguments(MfaArguments): """ Arguments for :class:`~montreal_forced_aligner.transcription.multiprocessing.FmllrRescoreFunction` Parameters ---------- job_name: int Integer ID of the job db_string: str String for database connections log_path: :class:`~pathlib.Path` Path to save logging information during the run dictionaries: list[int] List of dictionary ids feature_strings: dict[int, str] Mapping of dictionaries to feature generation strings model_path: :class:`~pathlib.Path` Path to model file fmllr_options: dict[str, Any] fMLLR options tmp_lat_paths: dict[int, Path] Per dictionary temporary lattice paths final_lat_paths: dict[int, Path] Per dictionary lattice paths """ working_directory: Path model_path: Path rescore_options: MetaDict
[docs] class CreateHclgFunction(KaldiFunction): """ Create HCLG.fst file See Also -------- :meth:`.Transcriber.create_hclgs` Main function that calls this function in parallel :meth:`.Transcriber.create_hclgs_arguments` Job method for generating arguments for this function :kaldi_src:`add-self-loops` Relevant Kaldi binary :openfst_src:`fstconvert` Relevant OpenFst binary Parameters ---------- args: :class:`~montreal_forced_aligner.transcription.multiprocessing.CreateHclgArguments` Arguments for the function """ def __init__(self, args: CreateHclgArguments): super().__init__(args) self.lexicon_compiler = args.lexicon_compiler self.working_directory = args.working_directory self.small_arpa_path = args.small_arpa_path self.medium_arpa_path = args.medium_arpa_path self.big_arpa_path = args.big_arpa_path self.model_path = args.model_path self.tree_path = args.tree_path self.hclg_options = args.hclg_options def _run(self) -> typing.Generator[typing.Tuple[bool, str]]: """Run the function""" with thread_logger("kalpy.decode_graph", self.log_path, job_name=self.job_name): hclg_path = self.working_directory.joinpath(f"HCLG.{self.job_name}.fst") small_g_path = self.working_directory.joinpath(f"G_small.{self.job_name}.fst") medium_g_path = self.working_directory.joinpath(f"G_med.{self.job_name}.fst") carpa_path = self.working_directory.joinpath(f"G.{self.job_name}.carpa") small_compiler = DecodeGraphCompiler( self.model_path, self.tree_path, self.lexicon_compiler, **self.hclg_options ) small_compiler.export_hclg(self.small_arpa_path, hclg_path) small_compiler.export_g(small_g_path) del small_compiler medium_compiler = DecodeGraphCompiler( self.model_path, self.tree_path, self.lexicon_compiler, **self.hclg_options ) medium_compiler.compile_g_fst(self.medium_arpa_path) medium_compiler.export_g(medium_g_path) del medium_compiler carpa_compiler = DecodeGraphCompiler( self.model_path, self.tree_path, self.lexicon_compiler, **self.hclg_options ) carpa_compiler.compile_g_carpa(self.big_arpa_path, carpa_path) del carpa_compiler if hclg_path.exists(): self.callback((True, hclg_path)) else: self.callback((False, hclg_path))
[docs] class DecodeFunction(KaldiFunction): """ Multiprocessing function for performing decoding See Also -------- :meth:`.TranscriberMixin.transcribe_utterances` Main function that calls this function in parallel :meth:`.TranscriberMixin.decode_arguments` Job method for generating arguments for this function :kaldi_src:`gmm-latgen-faster` Relevant Kaldi binary Parameters ---------- args: :class:`~montreal_forced_aligner.transcription.multiprocessing.DecodeArguments` Arguments for the function """ def __init__(self, args: DecodeArguments): super().__init__(args) self.working_directory = args.working_directory self.hclg_paths = args.hclg_paths self.decode_options = args.decode_options self.model_path = args.model_path def _run(self) -> None: """Run the function""" with self.session() as session, thread_logger( "kalpy.decode", self.log_path, job_name=self.job_name ) as decode_logger: job: Job = ( session.query(Job) .options(joinedload(Job.corpus, innerjoin=True), subqueryload(Job.dictionaries)) .filter(Job.id == self.job_name) .first() ) silence_phones = [ x for x, in session.query(Phone.mapping_id).filter( Phone.phone_type.in_([PhoneType.silence]) ) ] for d in job.dictionaries: decode_logger.debug(f"Decoding for dictionary {d.name} ({d.id})") decode_logger.debug(f"Decoding with model: {self.model_path}") dict_id = d.id feature_archive = job.construct_feature_archive(self.working_directory, dict_id) lat_path = job.construct_path(self.working_directory, "lat", "ark", dict_id) alignment_file_name = job.construct_path( self.working_directory, "ali", "ark", dict_id ) words_path = job.construct_path(self.working_directory, "words", "ark", dict_id) hclg_fst = ConstFst.Read(str(self.hclg_paths[dict_id])) boost_silence = self.decode_options.pop("boost_silence", 1.0) decoder = GmmDecoder(self.model_path, hclg_fst, **self.decode_options) if boost_silence != 1.0: decoder.boost_silence(boost_silence, silence_phones) decoder.export_lattices( lat_path, feature_archive, word_file_name=words_path, alignment_file_name=alignment_file_name, callback=self.callback, )
[docs] class LmRescoreFunction(KaldiFunction): """ Multiprocessing function rescore lattices by replacing the small G.fst with the medium G.fst See Also -------- :meth:`.TranscriberMixin.transcribe_utterances` Main function that calls this function in parallel :meth:`.TranscriberMixin.lm_rescore_arguments` Job method for generating arguments for this function :kaldi_src:`lattice-lmrescore-pruned` Relevant Kaldi binary :openfst_src:`fstproject` Relevant OpenFst binary Parameters ---------- args: :class:`~montreal_forced_aligner.transcription.multiprocessing.LmRescoreArguments` Arguments for the function """ def __init__(self, args: LmRescoreArguments): super().__init__(args) self.working_directory = args.working_directory self.old_g_paths = args.old_g_paths self.new_g_paths = args.new_g_paths self.lm_rescore_options = args.lm_rescore_options def _run(self) -> None: """Run the function""" with self.session() as session, thread_logger( "kalpy.lm", self.log_path, job_name=self.job_name ): job: Job = ( session.query(Job) .options(joinedload(Job.corpus, innerjoin=True), subqueryload(Job.dictionaries)) .filter(Job.id == self.job_name) .first() ) for d in job.dictionaries: dict_id = d.id lat_path = job.construct_path(self.working_directory, "lat", "ark", dict_id) tmp_lat_path = job.construct_path( self.working_directory, "lat.tmp", "ark", dict_id ) os.rename(lat_path, tmp_lat_path) old_g_path = self.old_g_paths[dict_id] new_g_path = self.new_g_paths[dict_id] olg_g = VectorFst.Read(str(old_g_path)) new_lm = VectorFst.Read(str(new_g_path)) rescorer = LmRescorer(olg_g, **self.lm_rescore_options) lattice_archive = LatticeArchive(tmp_lat_path, determinized=True) rescorer.export_lattices(lat_path, lattice_archive, new_lm, callback=self.callback) lattice_archive.close() os.remove(tmp_lat_path)
[docs] class CarpaLmRescoreFunction(KaldiFunction): """ Multiprocessing function to rescore lattices by replacing medium G.fst with large G.carpa See Also -------- :meth:`.TranscriberMixin.transcribe_utterances` Main function that calls this function in parallel :meth:`.TranscriberMixin.carpa_lm_rescore_arguments` Job method for generating arguments for this function :openfst_src:`fstproject` Relevant OpenFst binary :kaldi_src:`lattice-lmrescore` Relevant Kaldi binary :kaldi_src:`lattice-lmrescore-const-arpa` Relevant Kaldi binary Parameters ---------- args: CarpaLmRescoreArguments Arguments """ def __init__(self, args: CarpaLmRescoreArguments): super().__init__(args) self.working_directory = args.working_directory self.lm_rescore_options = args.lm_rescore_options self.old_g_paths = args.old_g_paths self.new_g_paths = args.new_g_paths def _run(self) -> None: """Run the function""" with self.session() as session, thread_logger( "kalpy.lm", self.log_path, job_name=self.job_name ): job: Job = ( session.query(Job) .options(joinedload(Job.corpus, innerjoin=True), subqueryload(Job.dictionaries)) .filter(Job.id == self.job_name) .first() ) for d in job.dictionaries: dict_id = d.id lat_path = job.construct_path(self.working_directory, "lat", "ark", dict_id) tmp_lat_path = job.construct_path( self.working_directory, "lat.tmp", "ark", dict_id ) os.rename(lat_path, tmp_lat_path) old_g_path = self.old_g_paths[dict_id] new_g_path = self.new_g_paths[dict_id] olg_g = VectorFst.Read(str(old_g_path)) new_lm = ConstArpaLm() ReadKaldiObject(str(new_g_path), new_lm) rescorer = LmRescorer(olg_g, **self.lm_rescore_options) lattice_archive = LatticeArchive(tmp_lat_path, determinized=True) rescorer.export_lattices(lat_path, lattice_archive, new_lm, callback=self.callback) lattice_archive.close() os.remove(tmp_lat_path)
[docs] class InitialFmllrFunction(KaldiFunction): """ Multiprocessing function for running initial fMLLR calculation See Also -------- :meth:`.TranscriberMixin.transcribe_fmllr` Main function that calls this function in parallel :meth:`.TranscriberMixin.initial_fmllr_arguments` Job method for generating arguments for this function :kaldi_src:`lattice-to-post` Relevant Kaldi binary :kaldi_src:`weight-silence-post` Relevant Kaldi binary :kaldi_src:`gmm-post-to-gpost` Relevant Kaldi binary :kaldi_src:`gmm-est-fmllr-gpost` Relevant Kaldi binary Parameters ---------- args: :class:`~montreal_forced_aligner.transcription.multiprocessing.InitialFmllrArguments` Arguments for the function """ def __init__(self, args: InitialFmllrArguments): super().__init__(args) self.working_directory = args.working_directory self.ali_model_path = args.ali_model_path self.model_path = args.model_path self.fmllr_options = args.fmllr_options def _run(self) -> None: """Run the function""" with self.session() as session, thread_logger( "kalpy.fmllr", self.log_path, job_name=self.job_name ) as fmllr_logger: fmllr_logger.debug(f"Using acoustic model: {self.model_path}\n") job: typing.Optional[Job] = session.get( Job, self.job_name, options=[joinedload(Job.dictionaries), joinedload(Job.corpus)] ) lda_mat_path = self.working_directory.joinpath("lda.mat") if not lda_mat_path.exists(): lda_mat_path = None for dict_id in job.dictionary_ids: feat_path = job.construct_path( job.corpus.current_subset_directory, "feats", "scp", dictionary_id=dict_id ) utt2spk_path = job.construct_path( job.corpus.current_subset_directory, "utt2spk", "scp", dictionary_id=dict_id ) spk2utt_path = job.construct_path( job.corpus.current_subset_directory, "spk2utt", "scp", dictionary_id=dict_id ) utt2spk = KaldiMapping() utt2spk.load(utt2spk_path) spk2utt = KaldiMapping(list_mapping=True) spk2utt.load(spk2utt_path) feature_archive = FeatureArchive( feat_path, utt2spk=utt2spk, lda_mat_file_name=lda_mat_path, deltas=True, ) silence_phones = [ x for x, in session.query(Phone.mapping_id).filter( Phone.phone_type.in_([PhoneType.silence, PhoneType.oov]) ) ] computer = FmllrComputer( self.ali_model_path, self.model_path, silence_phones, spk2utt=spk2utt, **self.fmllr_options, ) lat_path = job.construct_path(self.working_directory, "lat", "ark", dict_id) fmllr_logger.debug(f"Processing {lat_path} with features from {feat_path}") lattice_archive = LatticeArchive(lat_path, determinized=False) temp_trans_path = job.construct_path( self.working_directory, "trans", "ark", dict_id ) computer.export_transforms( temp_trans_path, feature_archive, lattice_archive, callback=self.callback, ) feature_archive.close() lattice_archive.close() del feature_archive del lattice_archive del computer trans_archive = MatrixArchive(temp_trans_path) write_specifier = generate_write_specifier( job.construct_path( job.corpus.current_subset_directory, "trans", "ark", dictionary_id=dict_id ), write_scp=True, ) writer = BaseFloatMatrixWriter(write_specifier) for speaker, trans in trans_archive: writer.Write(str(speaker), trans) writer.Close() trans_archive.close() del trans_archive os.remove(temp_trans_path)
[docs] class FinalFmllrFunction(KaldiFunction): """ Multiprocessing function for running final fMLLR estimation See Also -------- :meth:`.TranscriberMixin.transcribe_fmllr` Main function that calls this function in parallel :meth:`.TranscriberMixin.final_fmllr_arguments` Job method for generating arguments for this function :kaldi_src:`lattice-determinize-pruned` Relevant Kaldi binary :kaldi_src:`lattice-to-post` Relevant Kaldi binary :kaldi_src:`weight-silence-post` Relevant Kaldi binary :kaldi_src:`gmm-est-fmllr` Relevant Kaldi binary :kaldi_src:`compose-transforms` Relevant Kaldi binary Parameters ---------- args: :class:`~montreal_forced_aligner.transcription.multiprocessing.FinalFmllrArguments` Arguments for the function """ def __init__(self, args: FinalFmllrArguments): super().__init__(args) self.working_directory = args.working_directory self.ali_model_path = args.ali_model_path self.model_path = args.model_path self.fmllr_options = args.fmllr_options def _run(self) -> None: """Run the function""" with self.session() as session, thread_logger( "kalpy.fmllr", self.log_path, job_name=self.job_name ) as fmllr_logger: fmllr_logger.debug(f"Using acoustic model: {self.model_path}\n") job: typing.Optional[Job] = session.get( Job, self.job_name, options=[joinedload(Job.dictionaries), joinedload(Job.corpus)] ) lda_mat_path = self.working_directory.joinpath("lda.mat") if not lda_mat_path.exists(): lda_mat_path = None for dict_id in job.dictionary_ids: feat_path = job.construct_path( job.corpus.current_subset_directory, "feats", "scp", dictionary_id=dict_id ) fmllr_trans_path = job.construct_path( job.corpus.current_subset_directory, "trans", "scp", dictionary_id=dict_id ) previous_transform_archive = None if not fmllr_trans_path.exists(): fmllr_logger.debug("Computing transforms from scratch") fmllr_trans_path = None else: fmllr_logger.debug(f"Updating previous transforms {fmllr_trans_path}") previous_transform_archive = MatrixArchive(fmllr_trans_path) utt2spk_path = job.construct_path( job.corpus.current_subset_directory, "utt2spk", "scp", dictionary_id=dict_id ) spk2utt_path = job.construct_path( job.corpus.current_subset_directory, "spk2utt", "scp", dictionary_id=dict_id ) utt2spk = KaldiMapping() utt2spk.load(utt2spk_path) spk2utt = KaldiMapping(list_mapping=True) spk2utt.load(spk2utt_path) feature_archive = FeatureArchive( feat_path, utt2spk=utt2spk, lda_mat_file_name=lda_mat_path, transform_file_name=fmllr_trans_path, deltas=True, ) silence_phones = [ x for x, in session.query(Phone.mapping_id).filter( Phone.phone_type.in_([PhoneType.silence, PhoneType.oov]) ) ] computer = FmllrComputer( self.ali_model_path, self.model_path, silence_phones, spk2utt=spk2utt, **self.fmllr_options, ) lat_path = job.construct_path(self.working_directory, "lat", "ark", dict_id) fmllr_logger.debug(f"Processing {lat_path} with features from {feat_path}") lattice_archive = LatticeArchive(lat_path, determinized=False) temp_trans_path = job.construct_path( self.working_directory, "trans", "ark", dict_id ) computer.export_transforms( temp_trans_path, feature_archive, lattice_archive, previous_transform_archive=previous_transform_archive, callback=self.callback, ) feature_archive.close() del previous_transform_archive del feature_archive del lattice_archive del computer if fmllr_trans_path is not None: os.remove(fmllr_trans_path) os.remove(fmllr_trans_path.with_suffix(".ark")) trans_archive = MatrixArchive(temp_trans_path) write_specifier = generate_write_specifier( job.construct_path( job.corpus.current_subset_directory, "trans", "ark", dictionary_id=dict_id ), write_scp=True, ) writer = BaseFloatMatrixWriter(write_specifier) for speaker, trans in trans_archive: writer.Write(str(speaker), trans) writer.Close() del trans_archive os.remove(temp_trans_path)
[docs] class FmllrRescoreFunction(KaldiFunction): """ Multiprocessing function to rescore lattices following fMLLR estimation See Also -------- :meth:`.TranscriberMixin.transcribe_fmllr` Main function that calls this function in parallel :meth:`.TranscriberMixin.fmllr_rescore_arguments` Job method for generating arguments for this function :kaldi_src:`gmm-rescore-lattice` Relevant Kaldi binary :kaldi_src:`lattice-determinize-pruned` Relevant Kaldi binary Parameters ---------- args: :class:`~montreal_forced_aligner.transcription.multiprocessing.FmllrRescoreArguments` Arguments for the function """ def __init__(self, args: FmllrRescoreArguments): super().__init__(args) self.working_directory = args.working_directory self.model_path = args.model_path self.rescore_options = args.rescore_options def _run(self) -> None: """Run the function""" with self.session() as session, thread_logger( "kalpy.decode", self.log_path, job_name=self.job_name ) as decode_logger: job: Job = ( session.query(Job) .options(joinedload(Job.corpus, innerjoin=True), subqueryload(Job.dictionaries)) .filter(Job.id == self.job_name) .first() ) rescorer = GmmRescorer(self.model_path, **self.rescore_options) for d in job.dictionaries: decode_logger.debug(f"Aligning for dictionary {d.name} ({d.id})") decode_logger.debug(f"Aligning with model: {self.model_path}") dict_id = d.id fst_path = job.construct_path(self.working_directory, "fsts", "ark", dict_id) decode_logger.debug(f"Training graph archive: {fst_path}") fmllr_path = job.construct_path( job.corpus.current_subset_directory, "trans", "scp", dict_id ) if not fmllr_path.exists(): fmllr_path = None lda_mat_path = self.working_directory.joinpath("lda.mat") if not lda_mat_path.exists(): lda_mat_path = None feat_path = job.construct_path( job.corpus.current_subset_directory, "feats", "scp", dictionary_id=dict_id ) utt2spk_path = job.construct_path( job.corpus.current_subset_directory, "utt2spk", "scp", dict_id ) utt2spk = KaldiMapping() utt2spk.load(utt2spk_path) decode_logger.debug(f"Feature path: {feat_path}") decode_logger.debug(f"LDA transform path: {lda_mat_path}") decode_logger.debug(f"Speaker transform path: {fmllr_path}") decode_logger.debug(f"utt2spk path: {utt2spk_path}") feature_archive = FeatureArchive( feat_path, utt2spk=utt2spk, lda_mat_file_name=lda_mat_path, transform_file_name=fmllr_path, deltas=True, ) lat_path = job.construct_path(self.working_directory, "lat", "ark", dict_id) tmp_lat_path = job.construct_path( self.working_directory, "lat.tmp", "ark", dict_id ) os.rename(lat_path, tmp_lat_path) lattice_archive = LatticeArchive(tmp_lat_path, determinized=True) rescorer.export_lattices( lat_path, lattice_archive, feature_archive, callback=self.callback ) lattice_archive.close() os.remove(tmp_lat_path)
@dataclass class PerSpeakerDecodeArguments(MfaArguments): """Arguments for :class:`~montreal_forced_aligner.validation.corpus_validator.PerSpeakerDecodeFunction`""" working_directory: Path model_path: Path tree_path: Path decode_options: MetaDict order: int method: str class PerSpeakerDecodeFunction(KaldiFunction): """ Multiprocessing function to test utterance transcriptions with utterance and speaker ngram models See Also -------- :kaldi_src:`compile-train-graphs-fsts` Relevant Kaldi binary :kaldi_src:`gmm-latgen-faster` Relevant Kaldi binary :kaldi_src:`lattice-oracle` Relevant Kaldi binary :openfst_src:`farcompilestrings` Relevant OpenFst binary :ngram_src:`ngramcount` Relevant OpenGrm-Ngram binary :ngram_src:`ngrammake` Relevant OpenGrm-Ngram binary :ngram_src:`ngramshrink` Relevant OpenGrm-Ngram binary Parameters ---------- args: :class:`~montreal_forced_aligner.validation.corpus_validator.PerSpeakerDecodeArguments` Arguments for the function """ def __init__(self, args: PerSpeakerDecodeArguments): super().__init__(args) self.working_directory = args.working_directory self.model_path = args.model_path self.decode_options = args.decode_options self.tree_path = args.tree_path self.order = args.order self.method = args.method self.word_symbols_paths = {} def _run(self) -> typing.Generator[typing.Tuple[int, str]]: """Run the function""" with self.session() as session, thread_logger( "kalpy.decode", self.log_path, job_name=self.job_name ) as decode_logger: job: Job = ( session.query(Job) .options(joinedload(Job.corpus, innerjoin=True), subqueryload(Job.dictionaries)) .filter(Job.id == self.job_name) .first() ) silence_phones = [ x for x, in session.query(Phone.mapping_id).filter( Phone.phone_type.in_([PhoneType.silence]) ) ] for d in job.dictionaries: decode_logger.debug(f"Decoding for dictionary {d.name} ({d.id})") decode_logger.debug(f"Decoding with model: {self.model_path}") dict_id = d.id fmllr_path = job.construct_path( job.corpus.current_subset_directory, "trans", "scp", dict_id ) if not fmllr_path.exists(): fmllr_path = None lda_mat_path = self.working_directory.joinpath("lda.mat") if not lda_mat_path.exists(): lda_mat_path = None feat_path = job.construct_path( job.corpus.current_subset_directory, "feats", "scp", dictionary_id=dict_id ) utt2spk_path = job.construct_path( job.corpus.current_subset_directory, "utt2spk", "scp", dict_id ) utt2spk = KaldiMapping() utt2spk.load(utt2spk_path) decode_logger.debug(f"Feature path: {feat_path}") decode_logger.debug(f"LDA transform path: {lda_mat_path}") decode_logger.debug(f"Speaker transform path: {fmllr_path}") decode_logger.debug(f"utt2spk path: {utt2spk_path}") feature_archive = FeatureArchive( feat_path, utt2spk=utt2spk, lda_mat_file_name=lda_mat_path, transform_file_name=fmllr_path, deltas=True, ) lat_path = job.construct_path(self.working_directory, "lat", "ark", dict_id) alignment_file_name = job.construct_path( self.working_directory, "ali", "ark", dict_id ) words_path = job.construct_path(self.working_directory, "words", "ark", dict_id) boost_silence = self.decode_options.pop("boost_silence", 1.0) current_speaker = None write_specifier = generate_write_specifier(lat_path, write_scp=False) alignment_writer = None if alignment_file_name: alignment_write_specifier = generate_write_specifier( alignment_file_name, write_scp=False ) alignment_writer = Int32VectorWriter(alignment_write_specifier) word_writer = None if words_path: word_write_specifier = generate_write_specifier(words_path, write_scp=False) word_writer = Int32VectorWriter(word_write_specifier) writer = CompactLatticeWriter(write_specifier) for utt_id, speaker_id in ( session.query(Utterance.kaldi_id, Utterance.speaker_id) .filter(Utterance.job_id == job.id) .order_by(Utterance.kaldi_id) ): if speaker_id != current_speaker: lm_path = os.path.join(d.temp_directory, f"{speaker_id}.fst") hclg_fst = ConstFst.Read(str(lm_path)) decoder = GmmDecoder(self.model_path, hclg_fst, **self.decode_options) if boost_silence != 1.0: decoder.boost_silence(boost_silence, silence_phones) for transcription in decoder.decode_utterances(feature_archive): if transcription is None: continue utt_id = int(transcription.utterance_id.split("-")[-1]) self.callback((utt_id, transcription.likelihood)) writer.Write(str(transcription.utterance_id), transcription.lattice) if alignment_writer is not None: alignment_writer.Write( str(transcription.utterance_id), transcription.alignment ) if word_writer is not None: word_writer.Write(str(transcription.utterance_id), transcription.words) writer.Close() if alignment_writer is not None: alignment_writer.Close() if word_writer is not None: word_writer.Close() class DecodePhoneFunction(KaldiFunction): """ Multiprocessing function for performing decoding See Also -------- :meth:`.TranscriberMixin.transcribe_utterances` Main function that calls this function in parallel :meth:`.TranscriberMixin.decode_arguments` Job method for generating arguments for this function :kaldi_src:`gmm-latgen-faster` Relevant Kaldi binary Parameters ---------- args: :class:`~montreal_forced_aligner.transcription.multiprocessing.DecodeArguments` Arguments for the function """ def __init__(self, args: DecodePhoneArguments): super().__init__(args) self.working_directory = args.working_directory self.hclg_path = args.hclg_path self.decode_options = args.decode_options self.model_path = args.model_path def _run(self) -> None: """Run the function""" with self.session() as session, thread_logger( "kalpy.decode", self.log_path, job_name=self.job_name ) as decode_logger: job: Job = ( session.query(Job) .options(joinedload(Job.corpus, innerjoin=True), subqueryload(Job.dictionaries)) .filter(Job.id == self.job_name) .first() ) silence_phones = [ x for x, in session.query(Phone.mapping_id).filter( Phone.phone_type.in_([PhoneType.silence]) ) ] phones = session.query(Phone.mapping_id, Phone.phone) reversed_phone_mapping = {} for p_id, phone in phones: reversed_phone_mapping[p_id] = phone hclg_fst = ConstFst.Read(str(self.hclg_path)) for d in job.dictionaries: decode_logger.debug(f"Decoding for dictionary {d.name} ({d.id})") decode_logger.debug(f"Decoding with model: {self.model_path}") dict_id = d.id feature_archive = job.construct_feature_archive(self.working_directory, dict_id) lat_path = job.construct_path(self.working_directory, "lat", "ark", dict_id) alignment_file_name = job.construct_path( self.working_directory, "ali", "ark", dict_id ) words_path = job.construct_path(self.working_directory, "words", "ark", dict_id) boost_silence = self.decode_options.pop("boost_silence", 1.0) decoder = GmmDecoder(self.model_path, hclg_fst, **self.decode_options) if boost_silence != 1.0: decoder.boost_silence(boost_silence, silence_phones) decoder.export_lattices( lat_path, feature_archive, word_file_name=words_path, alignment_file_name=alignment_file_name, callback=self.callback, )