Source code for montreal_forced_aligner.alignment.mixins

"""Class definitions for alignment mixins"""
from __future__ import annotations

import datetime
import logging
import os
import time
import typing
from abc import abstractmethod
from pathlib import Path
from typing import TYPE_CHECKING, List

from montreal_forced_aligner import config
from montreal_forced_aligner.alignment.multiprocessing import (
    AlignArguments,
    AlignFunction,
    CompileTrainGraphsArguments,
    CompileTrainGraphsFunction,
    PhoneConfidenceArguments,
    PhoneConfidenceFunction,
)
from montreal_forced_aligner.db import CorpusWorkflow, Job, PhoneInterval, Utterance, bulk_update
from montreal_forced_aligner.dictionary.mixins import DictionaryMixin
from montreal_forced_aligner.exceptions import NoAlignmentsError
from montreal_forced_aligner.utils import run_kaldi_function

if TYPE_CHECKING:
    from montreal_forced_aligner.abc import MetaDict


logger = logging.getLogger("mfa")


[docs] class AlignMixin(DictionaryMixin): """ Configuration object for alignment Parameters ---------- transition_scale : float Transition scale, defaults to 1.0 acoustic_scale : float Acoustic scale, defaults to 0.1 self_loop_scale : float Self-loop scale, defaults to 0.1 boost_silence : float Factor to boost silence probabilities, 1.0 is no boost or reduction beam : int Size of the beam to use in decoding, defaults to 10 retry_beam : int Size of the beam to use in decoding if it fails with the initial beam width, defaults to 40 See Also -------- :class:`~montreal_forced_aligner.dictionary.mixins.DictionaryMixin` For dictionary parsing parameters Attributes ---------- jobs: list[:class:`~montreal_forced_aligner.corpus.multiprocessing.Job`] Jobs to process """ logger: logging.Logger jobs: List[Job] def __init__( self, transition_scale: float = 1.0, acoustic_scale: float = 0.1, self_loop_scale: float = 0.1, boost_silence: float = 1.0, beam: int = 10, retry_beam: int = 40, fine_tune: bool = False, phone_confidence: bool = False, use_phone_model: bool = False, **kwargs, ): super().__init__(**kwargs) self.transition_scale = transition_scale self.acoustic_scale = acoustic_scale self.self_loop_scale = self_loop_scale self.boost_silence = boost_silence self.beam = beam self.retry_beam = retry_beam self.fine_tune = fine_tune self.phone_confidence = phone_confidence self.use_phone_model = use_phone_model if self.retry_beam <= self.beam: self.retry_beam = self.beam * 4 self.unaligned_files = set() self.final_alignment = False @property def tree_path(self) -> Path: """Path to tree file""" return self.working_directory.joinpath("tree") @property @abstractmethod def data_directory(self) -> str: """Corpus data directory""" ...
[docs] def compile_train_graphs_arguments(self) -> typing.List[CompileTrainGraphsArguments]: """ Generate Job arguments for :class:`~montreal_forced_aligner.alignment.multiprocessing.CompileTrainGraphsFunction` Returns ------- list[:class:`~montreal_forced_aligner.alignment.multiprocessing.CompileTrainGraphsArguments`] Arguments for processing """ args = [] lexicon_compilers = {} if getattr(self, "use_g2p", False): lexicon_compilers = getattr(self, "lexicon_compilers", {}) for j in self.jobs: args.append( CompileTrainGraphsArguments( j.id, getattr(self, "session" if config.USE_THREADING else "db_string", ""), self.working_log_directory.joinpath(f"compile_train_graphs.{j.id}.log"), self.working_directory, lexicon_compilers, self.working_directory.joinpath("tree"), self.alignment_model_path, getattr(self, "use_g2p", False), ) ) return args
[docs] def align_arguments(self) -> List[AlignArguments]: """ Generate Job arguments for :class:`~montreal_forced_aligner.alignment.multiprocessing.AlignFunction` Returns ------- list[:class:`~montreal_forced_aligner.alignment.multiprocessing.AlignArguments`] Arguments for processing """ args = [] iteration = getattr(self, "iteration", None) for j in self.jobs: if iteration is not None: log_path = self.working_log_directory.joinpath(f"align.{iteration}.{j.id}.log") else: log_path = self.working_log_directory.joinpath(f"align.{j.id}.log") if getattr(self, "uses_speaker_adaptation", False): log_path = log_path.with_suffix(".fmllr.log") args.append( AlignArguments( j.id, getattr(self, "session" if config.USE_THREADING else "db_string", ""), log_path, self.working_directory, self.alignment_model_path, self.align_options, self.phone_confidence, getattr(self, "final_alignment", False), ) ) return args
[docs] def phone_confidence_arguments(self) -> List[PhoneConfidenceArguments]: """ Generate Job arguments for :class:`~montreal_forced_aligner.alignment.multiprocessing.PhoneConfidenceFunction` Returns ------- list[:class:`~montreal_forced_aligner.alignment.multiprocessing.PhoneConfidenceArguments`] Arguments for processing """ args = [] for j in self.jobs: log_path = self.working_log_directory.joinpath(f"phone_confidence.{j.id}.log") args.append( PhoneConfidenceArguments( j.id, getattr(self, "session" if config.USE_THREADING else "db_string", ""), log_path, self.working_directory, self.model_path, self.phone_pdf_counts_path, ) ) return args
@property def align_options(self) -> MetaDict: """Options for use in aligning""" return { "transition_scale": self.transition_scale, "acoustic_scale": self.acoustic_scale, "self_loop_scale": self.self_loop_scale, "beam": self.beam, "retry_beam": self.retry_beam, "boost_silence": self.boost_silence, }
[docs] def alignment_configuration(self) -> MetaDict: """Configuration parameters""" return { "transition_scale": self.transition_scale, "acoustic_scale": self.acoustic_scale, "self_loop_scale": self.self_loop_scale, "boost_silence": self.boost_silence, "beam": self.beam, "retry_beam": self.retry_beam, }
@property def num_current_utterances(self) -> int: """Number of current utterances""" return getattr(self, "num_utterances", 0)
[docs] def compile_train_graphs(self) -> None: """ Multiprocessing function that compiles training graphs for utterances. See Also -------- :class:`~montreal_forced_aligner.alignment.multiprocessing.CompileTrainGraphsFunction` Multiprocessing helper function for each job :meth:`.AlignMixin.compile_train_graphs_arguments` Job method for generating arguments for the helper function :kaldi_steps:`align_si` Reference Kaldi script :kaldi_steps:`align_fmllr` Reference Kaldi script """ begin = time.time() log_directory = self.working_log_directory os.makedirs(log_directory, exist_ok=True) logger.info("Compiling training graphs...") arguments = self.compile_train_graphs_arguments() for _ in run_kaldi_function(CompileTrainGraphsFunction, arguments): pass logger.debug(f"Compiling training graphs took {time.time() - begin:.3f} seconds")
def get_phone_confidences(self): if not os.path.exists(self.phone_pdf_counts_path): logger.warning("Cannot calculate phone confidences with the current model.") return logger.info("Calculating phone confidences...") begin = time.time() with self.session() as session: arguments = self.phone_confidence_arguments() interval_update_mappings = [] for result in run_kaldi_function( PhoneConfidenceFunction, arguments, total_count=self.num_current_utterances ): interval_update_mappings.extend(result) bulk_update(session, PhoneInterval, interval_update_mappings) session.commit() logger.debug(f"Calculating phone confidences took {time.time() - begin:.3f} seconds")
[docs] def align_utterances(self, training=False) -> None: """ Multiprocessing function that aligns based on the current model. See Also -------- :class:`~montreal_forced_aligner.alignment.multiprocessing.AlignFunction` Multiprocessing helper function for each job :meth:`.AlignMixin.align_arguments` Job method for generating arguments for the helper function :kaldi_steps:`align_si` Reference Kaldi script :kaldi_steps:`align_fmllr` Reference Kaldi script """ begin = time.time() logger.info("Generating alignments...") self.working_log_directory.mkdir(parents=True, exist_ok=True) log_like_sum = 0 log_like_count = 0 update_mappings = [] num_errors = 0 num_successful = 0 for utterance, log_likelihood in run_kaldi_function( AlignFunction, self.align_arguments(), total_count=self.num_current_utterances ): if log_likelihood: num_successful += 1 if log_likelihood: log_like_sum += log_likelihood log_like_count += 1 else: num_errors += 1 if not training: update_mappings.append( { "id": int(utterance.split("-")[-1]), "alignment_log_likelihood": log_likelihood, } ) if not training: if len(update_mappings) == 0 or num_successful == 0: raise NoAlignmentsError(self.num_current_utterances, self.beam, self.retry_beam) with self.session() as session: bulk_update(session, Utterance, update_mappings) session.commit() session.query(Utterance).filter( Utterance.alignment_log_likelihood != None # noqa ).update( { Utterance.alignment_log_likelihood: Utterance.alignment_log_likelihood / Utterance.num_frames }, synchronize_session="fetch", ) workflow = ( session.query(CorpusWorkflow) .filter(CorpusWorkflow.current == True) # noqa .first() ) workflow.time_stamp = datetime.datetime.now() workflow.score = log_like_sum / log_like_count session.commit() logger.debug( f"Aligned {num_successful}, errors on {num_errors}, total {num_successful + num_errors}" ) logger.debug(f"Alignment round took {time.time() - begin:.3f} seconds")
@property @abstractmethod def working_directory(self) -> Path: """Working directory""" ... @property @abstractmethod def working_log_directory(self) -> Path: """Working log directory""" ... @property def model_path(self) -> Path: """Acoustic model file path""" return self.working_directory.joinpath("final.mdl") @property def phone_pdf_counts_path(self) -> Path: """Acoustic model file path""" return self.working_directory.joinpath("phone_pdf.counts") @property def alignment_model_path(self) -> Path: """Acoustic model file path for speaker-independent alignment""" path = self.working_directory.joinpath("final.alimdl") if os.path.exists(path) and not getattr(self, "uses_speaker_adaptation", False): return path return self.model_path