Source code for montreal_forced_aligner.acoustic_modeling.trainer

"""Class definitions for trainable aligners"""
from __future__ import annotations

import collections
import json
import logging
import os
import shutil
import time
import typing
from pathlib import Path
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple

from _kalpy.hmm import AlignmentToPosterior
from _kalpy.matrix import DoubleVector
from kalpy.gmm.data import AlignmentArchive
from kalpy.gmm.utils import read_gmm_model
from sqlalchemy.orm import joinedload, subqueryload

from montreal_forced_aligner import config
from montreal_forced_aligner.abc import (
    KaldiFunction,
    MetaDict,
    ModelExporterMixin,
    TopLevelMfaWorker,
)
from montreal_forced_aligner.data import MfaArguments, WorkflowType
from montreal_forced_aligner.db import (
    CorpusWorkflow,
    Dictionary,
    Job,
    PhoneInterval,
    Speaker,
    Utterance,
    WordInterval,
    bulk_update,
)
from montreal_forced_aligner.exceptions import ConfigError, KaldiProcessingError
from montreal_forced_aligner.helper import load_configuration, mfa_open, parse_old_features
from montreal_forced_aligner.models import AcousticModel, DictionaryModel
from montreal_forced_aligner.transcription.transcriber import TranscriberMixin
from montreal_forced_aligner.utils import log_kaldi_errors, run_kaldi_function

if TYPE_CHECKING:
    from dataclasses import dataclass

    from montreal_forced_aligner.acoustic_modeling.base import AcousticModelTrainingMixin
    from montreal_forced_aligner.acoustic_modeling.pronunciation_probabilities import (
        PronunciationProbabilityTrainer,
    )
else:
    from dataclassy import dataclass

__all__ = ["TrainableAligner", "TransitionAccFunction", "TransitionAccArguments"]


logger = logging.getLogger("mfa")


[docs] @dataclass class TransitionAccArguments(MfaArguments): """Arguments for :class:`~montreal_forced_aligner.acoustic_modeling.trainer.TransitionAccFunction`""" working_directory: Path model_path: Path
[docs] class TransitionAccFunction(KaldiFunction): """ Multiprocessing function to accumulate transition stats See Also -------- :kaldi_src:`ali-to-post` Relevant Kaldi binary :kaldi_src:`post-to-tacc` Relevant Kaldi binary Parameters ---------- args: :class:`~montreal_forced_aligner.acoustic_modeling.trainer.TransitionAccArguments` Arguments for the function """ def __init__(self, args: TransitionAccArguments): super().__init__(args) self.working_directory = args.working_directory self.model_path = args.model_path def _run(self) -> typing.Generator[typing.Tuple[int, str]]: """Run the function""" with self.session() as session: job = ( session.query(Job) .options(joinedload(Job.corpus, innerjoin=True), subqueryload(Job.dictionaries)) .filter(Job.id == self.job_name) .first() ) transition_model, acoustic_model = read_gmm_model(self.model_path) for dict_id in job.dictionary_ids: ali_path = job.construct_path(self.working_directory, "ali", "ark", dict_id) transition_accs = DoubleVector(transition_model.NumTransitionIds() + 1) alignment_archive = AlignmentArchive(ali_path) for alignment in alignment_archive: post = AlignmentToPosterior(alignment.alignment) for i in range(len(post)): for j in range(len(post[i])): tid = post[i][j][0] transition_accs[tid] += post[i][j][1] self.callback(1) self.callback(transition_accs)
[docs] class TrainableAligner(TranscriberMixin, TopLevelMfaWorker, ModelExporterMixin): """ Train acoustic model Parameters ---------- training_configuration : list[tuple[str, dict[str, Any]]] Training identifiers and parameters for training blocks phone_set_type: str Type of phone set to use for acoustic modeling See Also -------- :class:`~montreal_forced_aligner.alignment.base.CorpusAligner` For dictionary and corpus parsing parameters and alignment parameters :class:`~montreal_forced_aligner.abc.TopLevelMfaWorker` For top-level parameters :class:`~montreal_forced_aligner.abc.ModelExporterMixin` For model export parameters Attributes ---------- param_dict: dict[str, Any] Parameters to pass to training blocks final_identifier: str Identifier of the final training block current_subset: int Current training block's subset current_acoustic_model: :class:`~montreal_forced_aligner.models.AcousticModel` Acoustic model to use in aligning, based on previous training block training_configs: dict[str, :class:`~montreal_forced_aligner.acoustic_modeling.base.AcousticModelTrainingMixin`] Training blocks """ def __init__( self, training_configuration: List[Tuple[str, Dict[str, Any]]] = None, phone_set_type: str = None, model_version: str = None, **kwargs, ): self.param_dict = { k: v for k, v in kwargs.items() if not k.endswith("_directory") and not k.endswith("_path") and k not in ["speaker_characters"] } self.final_identifier = None self.current_subset: int = 0 self.current_aligner: Optional[AcousticModelTrainingMixin] = None self.current_trainer: Optional[AcousticModelTrainingMixin] = None self.current_acoustic_model: Optional[AcousticModel] = None super().__init__(**kwargs) if phone_set_type and phone_set_type != "UNKNOWN": self.dictionary_model = DictionaryModel( self.dictionary_model.path, phone_set_type=phone_set_type ) self.phone_set_type = self.dictionary_model.phone_set_type os.makedirs(self.output_directory, exist_ok=True) self.training_configs: Dict[ str, typing.Union[AcousticModelTrainingMixin, PronunciationProbabilityTrainer] ] = {} if training_configuration is None: training_configuration = TrainableAligner.default_training_configurations() for k, v in training_configuration: self.add_config(k, v) self.final_alignment = True self.model_version = model_version
[docs] @classmethod def default_training_configurations(cls) -> List[Tuple[str, Dict[str, Any]]]: """Default MFA training configuration""" training_params = [] training_params.append(("monophone", {"subset": 10000, "boost_silence": 1.25})) training_params.append( ( "triphone", { "subset": 20000, "boost_silence": 1.25, "num_leaves": 2000, "max_gaussians": 10000, }, ) ) training_params.append( ("lda", {"subset": 20000, "num_leaves": 2500, "max_gaussians": 15000}) ) training_params.append( ("sat", {"subset": 20000, "num_leaves": 2500, "max_gaussians": 15000}) ) training_params.append( ("sat", {"subset": 50000, "num_leaves": 4200, "max_gaussians": 40000}) ) training_params.append(("pronunciation_probabilities", {"subset": 50000})) training_params.append( ("sat", {"subset": 150000, "num_leaves": 5000, "max_gaussians": 100000}) ) training_params.append( ( "pronunciation_probabilities", {"subset": 150000, "optional": True}, ) ) training_params.append( ( "sat", { "subset": 0, "num_leaves": 7000, "optional": True, "max_gaussians": 150000, "num_iterations": 20, "quick": True, }, ) ) return training_params
[docs] @classmethod def parse_parameters( cls, config_path: Optional[Path] = None, args: Optional[Dict[str, Any]] = None, unknown_args: Optional[typing.Iterable[str]] = None, ) -> MetaDict: """ Parse configuration parameters from a config file and command line arguments Parameters ---------- config_path: :class:`~pathlib.Path`, optional Path to yaml configuration file args: dict[str, Any] Parsed arguments unknown_args: list[str] Optional list of arguments that were not parsed Returns ------- dict[str, Any] Dictionary of specified configuration parameters """ global_params = {} training_params = [] use_default = True if config_path: data = load_configuration(config_path) training_params = [] for k, v in data.items(): if k == "training": for t in v: for k2, v2 in t.items(): if "features" in v2: global_params.update(parse_old_features(v2["features"])) del v2["features"] training_params.append((k2, v2)) elif k == "features": global_params.update(parse_old_features(v)) else: if v is None and k in cls.nullable_fields: v = [] global_params[k] = v if training_params: use_default = False if use_default: # default training configuration training_params = TrainableAligner.default_training_configurations() if training_params: if training_params[0][0] != "monophone": raise ConfigError("The first round of training must be monophone.") global_params["training_configuration"] = training_params global_params.update(cls.parse_args(args, unknown_args)) return global_params
def setup_trainers(self): self.write_training_information() with self.session() as session: workflows: typing.Dict[str, CorpusWorkflow] = { x.name: x for x in session.query(CorpusWorkflow) } for i, (identifier, c) in enumerate(self.training_configs.items()): if isinstance(c, str): continue c.non_silence_phones = self.non_silence_phones ali_identifier = f"{identifier}_ali" if identifier not in workflows: self.create_new_current_workflow( WorkflowType.acoustic_training, name=identifier ) self.create_new_current_workflow(WorkflowType.alignment, name=ali_identifier) else: wf = workflows[identifier] if wf.dirty and not wf.done: shutil.rmtree(wf.working_directory, ignore_errors=True) ali_wf = workflows[ali_identifier] if ali_wf.dirty and not ali_wf.done: shutil.rmtree(ali_wf.working_directory, ignore_errors=True) if i == 0: wf.current = True session.commit() def filter_training_utterances(self): logger.info("Filtering utterances for training...") with self.session() as session: dictionaries = session.query(Dictionary) for d in dictionaries: update_mapping = [] word_mapping = d.word_mapping utterances = ( session.query(Utterance.id, Utterance.normalized_text) .join(Utterance.speaker) .filter(Utterance.ignored == False) # noqa .filter(Speaker.dictionary_id == d.id) ) for u_id, text in utterances: if not text: update_mapping.append({"id": u_id, "ignored": True}) continue words = text.split() if any(x in word_mapping for x in words): continue update_mapping.append({"id": u_id, "ignored": True}) if update_mapping: bulk_update(session, Utterance, update_mapping) session.commit()
[docs] def setup(self) -> None: """Setup for acoustic model training""" super().setup() self.initialized = self.features_generated if self.initialized: logger.info("Using previous initialization.") return try: self.load_corpus() self.setup_trainers() self.filter_training_utterances() except Exception as e: if isinstance(e, KaldiProcessingError): log_kaldi_errors(e.error_logs) e.update_log_file() raise self.initialized = True
@property def configuration(self) -> MetaDict: """Configuration for the worker""" config = super().configuration config.update( { "dictionary_path": str(self.dictionary_model.path), "corpus_directory": str(self.corpus_directory), } ) return config @property def meta(self) -> MetaDict: """Metadata about the final round of training""" meta = self.training_configs[self.final_identifier].meta if self.model_version is not None: meta["version"] = self.model_version return meta
[docs] def add_config(self, train_type: str, params: MetaDict) -> None: """ Add a trainer to the pipeline Parameters ---------- train_type: str Type of trainer to add, one of ``monophone``, ``triphone``, ``lda`` or ``sat`` params: dict[str, Any] Parameters to initialize trainer Raises ------ :class:`~montreal_forced_aligner.exceptions.ConfigError` If an invalid train_type is specified """ from montreal_forced_aligner.acoustic_modeling.lda import LdaTrainer from montreal_forced_aligner.acoustic_modeling.monophone import MonophoneTrainer from montreal_forced_aligner.acoustic_modeling.pronunciation_probabilities import ( # noqa PronunciationProbabilityTrainer, ) from montreal_forced_aligner.acoustic_modeling.sat import SatTrainer from montreal_forced_aligner.acoustic_modeling.triphone import TriphoneTrainer p = {} p.update(self.param_dict) p.update(params) identifier = train_type index = 2 while identifier in self.training_configs: identifier = f"{train_type}_{index}" index += 1 self.final_identifier = identifier if train_type == "monophone": p = { k: v for k, v in p.items() if k in MonophoneTrainer.get_configuration_parameters() } config = MonophoneTrainer(identifier=identifier, worker=self, **p) elif train_type == "triphone": p = {k: v for k, v in p.items() if k in TriphoneTrainer.get_configuration_parameters()} config = TriphoneTrainer(identifier=identifier, worker=self, **p) elif train_type == "lda": p = {k: v for k, v in p.items() if k in LdaTrainer.get_configuration_parameters()} config = LdaTrainer(identifier=identifier, worker=self, **p) elif train_type == "sat": p = {k: v for k, v in p.items() if k in SatTrainer.get_configuration_parameters()} config = SatTrainer(identifier=identifier, worker=self, **p) elif train_type == "pronunciation_probabilities": p = { k: v for k, v in p.items() if k in PronunciationProbabilityTrainer.get_configuration_parameters() } previous_trainer = self.training_configs[list(self.training_configs.keys())[-1]] config = PronunciationProbabilityTrainer( identifier=identifier, previous_trainer=previous_trainer, worker=self, **p ) else: raise ConfigError(f"Invalid training type '{train_type}' in config file") self.training_configs[identifier] = config
[docs] def export_model(self, output_model_path: Path) -> None: """ Export an acoustic model to the specified path Parameters ---------- output_model_path : str Path to save acoustic model """ if "pronunciation_probabilities" in self.training_configs: export_directory = os.path.dirname(output_model_path) if export_directory: os.makedirs(export_directory, exist_ok=True) self.export_trained_rules( self.training_configs[self.final_identifier].working_directory ) with self.session() as session: for d in session.query(Dictionary): base_name = self.dictionary_base_names[d.id] if d.use_g2p: shutil.copyfile( self.phone_symbol_table_path, os.path.join( self.training_configs[self.final_identifier].working_directory, "phones.txt", ), ) shutil.copyfile( self.grapheme_symbol_table_path, os.path.join( self.training_configs[self.final_identifier].working_directory, "graphemes.txt", ), ) shutil.copyfile( d.lexicon_fst_path, os.path.join( self.training_configs[self.final_identifier].working_directory, self.dictionary_base_names[d.id] + ".fst", ), ) shutil.copyfile( d.align_lexicon_path, os.path.join( self.training_configs[self.final_identifier].working_directory, self.dictionary_base_names[d.id] + "_align.fst", ), ) else: output_dictionary_path = os.path.join( export_directory, base_name + ".dict" ) self.export_lexicon( d.id, output_dictionary_path, probability=True, ) self.training_configs[self.final_identifier].export_model(output_model_path) logger.info(f"Saved model to {output_model_path}")
[docs] def train(self) -> None: """ Run through the training configurations to produce a final acoustic model """ self.setup() previous = None begin = time.time() for trainer in self.training_configs.values(): if self.current_subset is None and trainer.optional: logger.info( "Exiting training early to save time as the corpus is below the subset size for later training stages" ) break if trainer.subset < self.num_utterances: self.current_subset = trainer.subset else: self.current_subset = None trainer.subset = 0 self.subset_directory(self.current_subset) if previous is not None: self.set_current_workflow(f"{previous.identifier}_ali") self.current_aligner = previous os.makedirs(self.working_log_directory, exist_ok=True) self.current_acoustic_model = AcousticModel( previous.exported_model_path, self.working_directory ) self.align() if config.DEBUG: with self.session() as session: session.query(WordInterval).delete() session.query(PhoneInterval).delete() session.commit() self.collect_alignments() self.set_current_workflow(trainer.identifier) if trainer.identifier.startswith("pronunciation_probabilities"): if config.DEBUG: with self.session() as session: session.query(WordInterval).delete() session.query(PhoneInterval).delete() session.commit() trainer.train_pronunciation_probabilities() else: trainer.train() previous = trainer self.final_identifier = trainer.identifier self.current_subset = None self.current_aligner = previous self.set_current_workflow(f"{previous.identifier}_ali") os.makedirs(self.working_log_directory, exist_ok=True) self.current_acoustic_model = AcousticModel( previous.exported_model_path, self.working_directory ) self.acoustic_model = AcousticModel(previous.exported_model_path, self.working_directory) self.align() self.finalize_training() counts_path = self.working_directory.joinpath("phone_pdf.counts") new_counts_path = os.path.join(previous.working_directory, "phone_pdf.counts") if not os.path.exists(new_counts_path): shutil.copyfile(counts_path, new_counts_path) phone_lm_path = os.path.join(self.phones_dir, "phone_lm.fst") new_phone_lm_path = os.path.join(previous.working_directory, "phone_lm.fst") if not os.path.exists(new_phone_lm_path) and os.path.exists(phone_lm_path): shutil.copyfile(phone_lm_path, new_phone_lm_path) logger.info(f"Completed training in {time.time()-begin} seconds!")
[docs] def transition_acc_arguments(self) -> List[TransitionAccArguments]: """ Generate Job arguments for :class:`~montreal_forced_aligner.acoustic_modeling.trainer.TransitionAccArguments` Returns ------- list[:class:`~montreal_forced_aligner.acoustic_modeling.trainer.TransitionAccArguments`] Arguments for processing """ return [ TransitionAccArguments( j.id, getattr(self, "session" if config.USE_THREADING else "db_string", ""), self.working_log_directory.joinpath(f"test_utterances.{j.id}.log"), self.working_directory, self.model_path, ) for j in self.jobs ]
[docs] def compute_phone_pdf_counts(self) -> None: """ Calculate the counts of pdfs corresponding to phones """ phone_pdf_counts_path = self.working_directory.joinpath("phone_pdf.counts") if phone_pdf_counts_path.exists(): return logger.info("Accumulating transition stats...") begin = time.time() log_directory = self.working_log_directory os.makedirs(log_directory, exist_ok=True) arguments = self.transition_acc_arguments() transition_model, acoustic_model = read_gmm_model(self.model_path) transition_accs = DoubleVector(transition_model.NumTransitionIds() + 1) for result in run_kaldi_function( TransitionAccFunction, arguments, total_count=self.num_utterances ): if not isinstance(result, int): transition_accs.AddVec(1.0, result) smoothing = 1 phone_pdf_mapping = collections.defaultdict(collections.Counter) for tid in range(1, transition_model.NumTransitionIds() + 1): pdf_id = transition_model.TransitionIdToPdf(tid) phone_id = transition_model.TransitionIdToPhone(tid) phone = self.reversed_phone_mapping[phone_id] t_count = smoothing + float(transition_accs[tid]) phone_pdf_mapping[phone][pdf_id] += t_count with mfa_open(phone_pdf_counts_path, "w") as f: json.dump(phone_pdf_mapping, f, ensure_ascii=False) logger.debug(f"Accumulating transition stats took {time.time() - begin:.3f} seconds") logger.info("Finished accumulating transition stats!")
def finalize_training(self): self.compute_phone_pdf_counts() self.collect_alignments() self.analyze_alignments() self.train_phone_lm()
[docs] def export_files( self, output_directory: Path, output_format: Optional[str] = None, include_original_text: bool = False, ) -> None: """ Export a TextGrid file for every sound file in the dataset Parameters ---------- output_directory: :class:`~pathlib.Path` Directory to save to output_format: str, optional Format to save alignments, one of 'long_textgrids' (the default), 'short_textgrids', or 'json', passed to praatio include_original_text: bool Flag for including the original text of the corpus files as a tier """ self.align() super(TrainableAligner, self).export_files( output_directory, output_format, include_original_text )
@property def num_current_utterances(self) -> int: """Number of utterances in the current subset""" if self.current_subset and self.current_subset < self.num_utterances: return self.current_subset return self.num_utterances @property def align_options(self) -> MetaDict: """Alignment options""" if self.current_aligner is not None: return self.current_aligner.align_options return super().align_options
[docs] def align(self) -> None: """ Multiprocessing function that aligns based on the current model. See Also -------- :class:`~montreal_forced_aligner.alignment.multiprocessing.AlignFunction` Multiprocessing helper function for each job :meth:`.AlignMixin.align_arguments` Job method for generating arguments for the helper function :kaldi_steps:`align_si` Reference Kaldi script :kaldi_steps:`align_fmllr` Reference Kaldi script """ wf = self.current_workflow if wf.done and wf.working_directory.exists(): logger.debug(f"Skipping {self.current_aligner.identifier} alignments") return try: self.current_acoustic_model.export_model(self.working_directory) self.uses_speaker_adaptation = False if self.current_acoustic_model.meta["features"]["uses_speaker_adaptation"]: self.uses_speaker_adaptation = False for j in self.jobs: for path in j.construct_path_dictionary( j.corpus.current_subset_directory, "trans", "scp" ).values(): path.unlink(missing_ok=True) self.compile_train_graphs() self.align_utterances() if self.current_acoustic_model.meta["features"]["uses_speaker_adaptation"]: assert self.alignment_model_path.suffix == ".alimdl" self.calc_fmllr() self.uses_speaker_adaptation = True assert self.alignment_model_path.suffix == ".mdl" self.align_utterances() with self.session() as session: session.query(CorpusWorkflow).filter(CorpusWorkflow.id == wf.id).update( {"done": True} ) session.commit() except Exception as e: with self.session() as session: session.query(CorpusWorkflow).filter(CorpusWorkflow.id == wf.id).update( {"dirty": True} ) session.commit() if isinstance(e, KaldiProcessingError): log_kaldi_errors(e.error_logs) e.update_log_file() raise
@property def alignment_model_path(self) -> Path: """Current alignment model path""" path = self.working_directory.joinpath("final.alimdl") if os.path.exists(path) and not self.uses_speaker_adaptation: return path return self.model_path @property def model_path(self) -> Path: """Current model path""" if self.current_trainer is not None: return self.current_trainer.model_path return self.working_directory.joinpath("final.mdl") @property def data_directory(self) -> str: """Current data directory based on the trainer's subset""" return self.subset_directory(self.current_subset) @property def working_directory(self) -> Optional[Path]: """Working directory""" if self.current_trainer is not None and not self.current_trainer.training_complete: return self.current_trainer.working_directory if self.current_aligner is None: return None return self.output_directory.joinpath(f"{self.current_aligner.identifier}_ali") @property def working_log_directory(self) -> Optional[str]: """Current log directory""" return self.working_directory.joinpath("log")