Source code for montreal_forced_aligner.alignment.adapting

"""Class definitions for adapting acoustic models"""
from __future__ import annotations

import logging
import os
import shutil
import time
from pathlib import Path
from typing import List

from _kalpy.gmm import AccumAmDiagGmm, IsmoothStatsAmDiagGmmFromModel
from _kalpy.matrix import DoubleVector
from kalpy.gmm.utils import read_gmm_model, write_gmm_model
from kalpy.utils import kalpy_logger

from montreal_forced_aligner import config
from montreal_forced_aligner.abc import AdapterMixin, MetaDict
from montreal_forced_aligner.alignment.multiprocessing import AccStatsArguments, AccStatsFunction
from montreal_forced_aligner.alignment.pretrained import PretrainedAligner
from montreal_forced_aligner.data import WorkflowType
from montreal_forced_aligner.db import CorpusWorkflow
from montreal_forced_aligner.exceptions import KaldiProcessingError
from montreal_forced_aligner.models import AcousticModel
from montreal_forced_aligner.utils import log_kaldi_errors, run_kaldi_function

__all__ = ["AdaptingAligner"]

logger = logging.getLogger("mfa")


[docs] class AdaptingAligner(PretrainedAligner, AdapterMixin): """ Adapt an acoustic model to a new dataset Parameters ---------- mapping_tau: int Tau to use in mapping stats between new domain data and pretrained model See Also -------- :class:`~montreal_forced_aligner.alignment.pretrained.PretrainedAligner` For dictionary, corpus, and alignment parameters :class:`~montreal_forced_aligner.abc.AdapterMixin` For adapting parameters Attributes ---------- initialized: bool Flag for whether initialization is complete adaptation_done: bool Flag for whether adaptation is complete """ def __init__(self, mapping_tau: int = 20, **kwargs): self.initialized = False self.adaptation_done = False super().__init__(**kwargs) self.mapping_tau = mapping_tau
[docs] def map_acc_stats_arguments(self, alignment=False) -> List[AccStatsArguments]: """ Generate Job arguments for :func:`~montreal_forced_aligner.alignment.multiprocessing.AccStatsFunction` Returns ------- list[:class:`~montreal_forced_aligner.alignment.multiprocessing.AccStatsArguments`] Arguments for processing """ if alignment: model_path = self.alignment_model_path else: model_path = self.model_path arguments = [] for j in self.jobs: arguments.append( AccStatsArguments( j.id, getattr(self, "session" if config.USE_THREADING else "db_string", ""), self.working_log_directory.joinpath(f"map_acc_stats.{j.id}.log"), self.working_directory, model_path, ) ) return arguments
[docs] def acc_stats(self, alignment: bool = False) -> None: """ Accumulate stats for the mapped model Parameters ---------- alignment: bool Flag for whether to accumulate stats for the mapped alignment model """ arguments = self.map_acc_stats_arguments(alignment) if alignment: initial_mdl_path = self.working_directory.joinpath("unadapted.alimdl") final_mdl_path = self.working_directory.joinpath("final.alimdl") else: initial_mdl_path = self.working_directory.joinpath("unadapted.mdl") final_mdl_path = self.working_directory.joinpath("final.mdl") logger.info("Accumulating statistics...") transition_model, acoustic_model = read_gmm_model(initial_mdl_path) transition_accs = DoubleVector() gmm_accs = AccumAmDiagGmm() transition_model.InitStats(transition_accs) gmm_accs.init(acoustic_model) for result in run_kaldi_function( AccStatsFunction, arguments, total_count=self.num_current_utterances ): if isinstance(result, tuple): job_transition_accs, job_gmm_accs = result transition_accs.AddVec(1.0, job_transition_accs) gmm_accs.Add(1.0, job_gmm_accs) log_path = self.working_log_directory.joinpath("map_model_est.log") with kalpy_logger("kalpy.train", log_path): IsmoothStatsAmDiagGmmFromModel(acoustic_model, self.mapping_tau, gmm_accs) objf_impr, count = transition_model.mle_update(transition_accs) logger.debug( f"Transition model update: Overall {objf_impr/count} " f"log-like improvement per frame over {count} frames." ) objf_impr, count = acoustic_model.mle_update( gmm_accs, update_flags_str="m", remove_low_count_gaussians=False ) logger.debug( f"GMM update: Overall {objf_impr/count} " f"objective function improvement per frame over {count} frames." ) tot_like = gmm_accs.TotLogLike() tot_t = gmm_accs.TotCount() logger.debug( f"Average Likelihood per frame = {tot_like/tot_t} " f"over {tot_t} frames." ) write_gmm_model(str(final_mdl_path), transition_model, acoustic_model)
@property def align_directory(self) -> Path: """Align directory""" return self.output_directory.joinpath("adapted_align") @property def working_log_directory(self) -> Path: """Current log directory""" return self.working_directory.joinpath("log") @property def model_path(self) -> Path: """Current acoustic model path""" if self.current_workflow.workflow_type == WorkflowType.acoustic_model_adaptation: return self.working_directory.joinpath("unadapted.mdl") return self.working_directory.joinpath("final.mdl") @property def alignment_model_path(self) -> Path: """Current acoustic model path""" if self.current_workflow.workflow_type == WorkflowType.acoustic_model_adaptation: path = self.working_directory.joinpath("unadapted.alimdl") if os.path.exists(path) and not getattr(self, "uses_speaker_adaptation", False): return path return self.model_path return super().alignment_model_path @property def next_model_path(self) -> Path: """Mapped acoustic model path""" return self.working_directory.joinpath("final.mdl")
[docs] def train_map(self) -> None: """ Trains an adapted acoustic model through mapping model states and update those with enough data. See Also -------- :class:`~montreal_forced_aligner.alignment.multiprocessing.AccStatsFunction` Multiprocessing helper function for each job :meth:`.AdaptingAligner.map_acc_stats_arguments` Job method for generating arguments for the helper function :kaldi_src:`gmm-sum-accs` Relevant Kaldi binary :kaldi_src:`gmm-ismooth-stats` Relevant Kaldi binary :kaldi_src:`gmm-est` Relevant Kaldi binary :kaldi_steps:`train_map` Reference Kaldi script """ begin = time.time() log_directory = self.working_log_directory os.makedirs(log_directory, exist_ok=True) self.acc_stats(alignment=False) if self.uses_speaker_adaptation: self.acc_stats(alignment=True) logger.debug(f"Mapping models took {time.time() - begin:.3f} seconds")
[docs] def adapt(self) -> None: """Run the adaptation""" logger.info("Generating initial alignments...") self.align() alignment_workflow = self.current_workflow self.create_new_current_workflow(WorkflowType.acoustic_model_adaptation) for f in ["final.mdl", "final.alimdl"]: shutil.copyfile( os.path.join(alignment_workflow.working_directory, f), self.working_directory.joinpath(f).with_stem("unadapted"), ) shutil.copyfile( os.path.join(alignment_workflow.working_directory, "tree"), self.working_directory.joinpath("tree"), ) shutil.copyfile( os.path.join(alignment_workflow.working_directory, "lda.mat"), self.working_directory.joinpath("lda.mat"), ) for j in self.jobs: old_paths = j.construct_path_dictionary( alignment_workflow.working_directory, "ali", "ark" ) new_paths = j.construct_path_dictionary(self.working_directory, "ali", "ark") for k, v in old_paths.items(): shutil.copyfile(v, new_paths[k]) os.makedirs(self.align_directory, exist_ok=True) try: logger.info("Adapting pretrained model...") self.train_map() self.export_model(self.working_log_directory.joinpath("acoustic_model.zip")) shutil.copyfile( self.working_directory.joinpath("final.mdl"), os.path.join(self.align_directory, "final.mdl"), ) shutil.copyfile( self.working_directory.joinpath("tree"), os.path.join(self.align_directory, "tree"), ) if os.path.exists(self.working_directory.joinpath("final.alimdl")): shutil.copyfile( self.working_directory.joinpath("final.alimdl"), os.path.join(self.align_directory, "final.alimdl"), ) if os.path.exists(self.working_directory.joinpath("lda.mat")): shutil.copyfile( self.working_directory.joinpath("lda.mat"), os.path.join(self.align_directory, "lda.mat"), ) wf = self.current_workflow with self.session() as session: session.query(CorpusWorkflow).filter(CorpusWorkflow.id == wf.id).update( {"done": True} ) session.commit() except Exception as e: wf = self.current_workflow with self.session() as session: session.query(CorpusWorkflow).filter(CorpusWorkflow.id == wf.id).update( {"dirty": True} ) session.commit() if isinstance(e, KaldiProcessingError): log_kaldi_errors(e.error_logs) e.update_log_file() raise
@property def meta(self) -> MetaDict: """Acoustic model metadata""" from datetime import datetime from ..utils import get_mfa_version data = { "phones": sorted(self.non_silence_phones), "version": get_mfa_version(), "architecture": self.acoustic_model.meta["architecture"], "train_date": str(datetime.now()), "features": self.feature_options, "phone_set_type": str(self.phone_set_type), "dictionaries": { "names": sorted(self.dictionary_base_names.values()), "default": self.dictionary_base_names[self._default_dictionary_id], "silence_word": self.silence_word, "use_g2p": self.use_g2p, "oov_word": self.oov_word, "bracketed_word": self.bracketed_word, "laughter_word": self.laughter_word, "clitic_marker": self.clitic_marker, "position_dependent_phones": self.position_dependent_phones, }, "oov_phone": self.oov_phone, "optional_silence_phone": self.optional_silence_phone, "silence_probability": self.silence_probability, "initial_silence_probability": self.initial_silence_probability, "final_silence_correction": self.final_silence_correction, "final_non_silence_correction": self.final_non_silence_correction, } return data
[docs] def export_model(self, output_model_path: Path) -> None: """ Output an acoustic model to the specified path Parameters ---------- output_model_path : str Path to save adapted acoustic model """ directory = output_model_path.parent acoustic_model = AcousticModel.empty( output_model_path.stem, root_directory=self.working_log_directory ) acoustic_model.add_meta_file(self) acoustic_model.add_model(self.working_directory) acoustic_model.add_model(self.phones_dir) if directory: os.makedirs(directory, exist_ok=True) basename, _ = os.path.splitext(output_model_path) acoustic_model.dump(output_model_path)