Source code for montreal_forced_aligner.corpus.acoustic_corpus

"""Class definitions for corpora"""
from __future__ import annotations

import logging
import os
import threading
import time
import typing
from abc import ABCMeta
from multiprocessing.pool import ThreadPool
from pathlib import Path
from queue import Empty, Queue
from typing import List, Optional

import sqlalchemy
from kalpy.data import KaldiMapping
from kalpy.feat.cmvn import CmvnComputer
from kalpy.feat.data import FeatureArchive
from kalpy.utils import kalpy_logger
from tqdm.rich import tqdm

from montreal_forced_aligner import config
from montreal_forced_aligner.abc import MfaWorker
from montreal_forced_aligner.corpus.base import CorpusMixin
from montreal_forced_aligner.corpus.classes import FileData
from montreal_forced_aligner.corpus.features import (
    CalcFmllrArguments,
    CalcFmllrFunction,
    ComputeVadFunction,
    FeatureConfigMixin,
    FinalFeatureArguments,
    FinalFeatureFunction,
    MfccArguments,
    MfccFunction,
    VadArguments,
)
from montreal_forced_aligner.corpus.helper import find_exts
from montreal_forced_aligner.corpus.multiprocessing import (
    AcousticDirectoryParser,
    CorpusProcessWorker,
)
from montreal_forced_aligner.data import DatabaseImportData, PhoneType, WorkflowType
from montreal_forced_aligner.db import (
    Corpus,
    CorpusWorkflow,
    File,
    Phone,
    PhoneInterval,
    SoundFile,
    Speaker,
    TextFile,
    Utterance,
    bulk_update,
)
from montreal_forced_aligner.dictionary.mixins import DictionaryMixin
from montreal_forced_aligner.dictionary.multispeaker import MultispeakerDictionaryMixin
from montreal_forced_aligner.exceptions import (
    CorpusError,
    FeatureGenerationError,
    SoundFileError,
    TextGridParseError,
    TextParseError,
)
from montreal_forced_aligner.helper import load_scp, mfa_open
from montreal_forced_aligner.textgrid import parse_aligned_textgrid
from montreal_forced_aligner.utils import Counter, run_kaldi_function

__all__ = [
    "AcousticCorpusMixin",
    "AcousticCorpus",
    "AcousticCorpusWithPronunciations",
    "AcousticCorpusPronunciationMixin",
]

logger = logging.getLogger("mfa")


[docs] class AcousticCorpusMixin(CorpusMixin, FeatureConfigMixin, metaclass=ABCMeta): """ Mixin class for acoustic corpora Parameters ---------- audio_directory: str Extra directory to look for audio files See Also -------- :class:`~montreal_forced_aligner.corpus.base.CorpusMixin` For corpus parsing parameters :class:`~montreal_forced_aligner.corpus.features.FeatureConfigMixin` For feature generation parameters Attributes ---------- sound_file_errors: list[str] List of sound files with errors in loading stopped: :class:`~threading.Event` Stop check for loading the corpus """ def __init__(self, audio_directory: Optional[str] = None, **kwargs): super().__init__(**kwargs) self.audio_directory = audio_directory self.sound_file_errors = [] self.stopped = threading.Event() self.features_generated = False self.transcription_done = False self.alignment_evaluation_done = False self.transcriptions_required = True def has_alignments(self, workflow_id: typing.Optional[int] = None) -> bool: with self.session() as session: if workflow_id is None: check = session.query(PhoneInterval).limit(1).first() is not None else: if isinstance(workflow_id, int): check = ( session.query(CorpusWorkflow.alignments_collected) .filter(CorpusWorkflow.id == workflow_id) .scalar() ) else: check = ( session.query(CorpusWorkflow.alignments_collected) .filter(CorpusWorkflow.workflow_type == workflow_id) .scalar() ) return check def has_ivectors(self) -> bool: with self.session() as session: check = ( session.query(Corpus) .filter(Corpus.ivectors_calculated == True) # noqa .limit(1) .first() is not None ) return check def has_xvectors(self) -> bool: with self.session() as session: check = ( session.query(Corpus) .filter(Corpus.xvectors_loaded == True) # noqa .limit(1) .first() is not None ) return check def has_any_ivectors(self) -> bool: with self.session() as session: check = ( session.query(Corpus) .filter( sqlalchemy.or_( Corpus.ivectors_calculated == True, Corpus.xvectors_loaded == True # noqa ) ) .limit(1) .first() is not None ) return check @property def no_transcription_files(self) -> List[str]: """List of sound files without text files""" with self.session() as session: files = session.query(SoundFile.sound_file_path).filter( ~sqlalchemy.exists().where(SoundFile.file_id == TextFile.file_id) ) return [x[0] for x in files] @property def transcriptions_without_wavs(self) -> List[str]: """List of text files without sound files""" with self.session() as session: files = session.query(TextFile.text_file_path).filter( ~sqlalchemy.exists().where(SoundFile.file_id == TextFile.file_id) ) return [x[0] for x in files]
[docs] def inspect_database(self) -> None: """Check if a database file exists and create the necessary metadata""" self.initialize_database() with self.session() as session: corpus = session.query(Corpus).first() if corpus: self.imported = corpus.imported self.features_generated = corpus.features_generated self.text_normalized = corpus.text_normalized else: session.add( Corpus( name=self.data_source_identifier, path=self.corpus_directory, data_directory=self.corpus_output_directory, ) ) session.commit()
[docs] def load_reference_alignments(self, reference_directory: Path) -> None: """ Load reference alignments to use in alignment evaluation from a directory Parameters ---------- reference_directory: :class:`~pathlib.Path` Directory containing reference alignments """ self.create_new_current_workflow(WorkflowType.reference) workflow = self.current_workflow if workflow.alignments_collected: logger.info("Reference alignments already loaded!") return logger.info("Loading reference files...") indices = [] jobs = [] reference_intervals = [] with tqdm(total=self.num_files, disable=config.QUIET) as pbar, self.session() as session: phone_mapping = {} max_id = 0 interval_id = session.query(sqlalchemy.func.max(PhoneInterval.id)).scalar() if not interval_id: interval_id = 0 interval_id += 1 for p, p_id in session.query(Phone.phone, Phone.id): phone_mapping[p] = p_id if p_id > max_id: max_id = p_id new_phones = [] for root, _, files in os.walk(reference_directory, followlinks=True): root_speaker = os.path.basename(root) for f in files: if f.endswith(".TextGrid"): file_name = f.replace(".TextGrid", "") file_id = session.query(File.id).filter_by(name=file_name).scalar() if not file_id: continue if config.USE_MP: indices.append(file_id) jobs.append((os.path.join(root, f), root_speaker)) else: intervals = parse_aligned_textgrid(os.path.join(root, f), root_speaker) utterances = ( session.query( Utterance.id, Speaker.name, Utterance.begin, Utterance.end ) .join(Utterance.speaker) .filter(Utterance.file_id == file_id) .order_by(Utterance.begin) ) for u_id, speaker_name, begin, end in utterances: if speaker_name not in intervals: continue while intervals[speaker_name]: interval = intervals[speaker_name].pop(0) dur = interval.end - interval.begin mid_point = interval.begin + (dur / 2) if begin <= mid_point <= end: if interval.label not in phone_mapping: max_id += 1 phone_mapping[interval.label] = max_id new_phones.append( { "id": max_id, "mapping_id": max_id - 1, "phone": interval.label, "kaldi_label": interval.label, "phone_type": PhoneType.extra, } ) reference_intervals.append( { "id": interval_id, "begin": interval.begin, "end": interval.end, "phone_id": phone_mapping[interval.label], "utterance_id": u_id, "workflow_id": workflow.id, } ) interval_id += 1 if mid_point > end: intervals[speaker_name].insert(0, interval) break pbar.update(1) if config.USE_MP: with ThreadPool(config.NUM_JOBS) as pool: gen = pool.starmap(parse_aligned_textgrid, jobs) for i, intervals in enumerate(gen): pbar.update(1) file_id = indices[i] utterances = ( session.query( Utterance.id, Speaker.name, Utterance.begin, Utterance.end ) .join(Utterance.speaker) .filter(Utterance.file_id == file_id) .order_by(Utterance.begin) ) for u_id, speaker_name, begin, end in utterances: if speaker_name not in intervals: continue while intervals[speaker_name]: interval = intervals[speaker_name].pop(0) dur = interval.end - interval.begin mid_point = interval.begin + (dur / 2) if begin <= mid_point <= end: if interval.label not in phone_mapping: max_id += 1 phone_mapping[interval.label] = max_id new_phones.append( { "id": max_id, "mapping_id": max_id - 1, "phone": interval.label, "kaldi_label": interval.label, "phone_type": PhoneType.extra, } ) reference_intervals.append( { "id": interval_id, "begin": interval.begin, "end": interval.end, "phone_id": phone_mapping[interval.label], "utterance_id": u_id, "workflow_id": workflow.id, } ) interval_id += 1 if mid_point > end: intervals[speaker_name].insert(0, interval) break if new_phones: session.execute(sqlalchemy.insert(Phone.__table__), new_phones) session.commit() session.execute(sqlalchemy.insert(PhoneInterval.__table__), reference_intervals) session.query(CorpusWorkflow).filter(CorpusWorkflow.id == workflow.id).update( {CorpusWorkflow.done: True, CorpusWorkflow.alignments_collected: True} ) session.commit()
[docs] def validate_corpus(self): """ Validate the loaded files """ if self.transcriptions_required: with self.session() as session: has_transcriptions = ( session.query(Utterance) .filter(Utterance.text != None, Utterance.text != "") # noqa .first() is not None ) if not has_transcriptions: raise CorpusError( "MFA could not find transcription files for the sound files, " "please see " "https://montreal-forced-aligner.readthedocs.io/en/latest/user_guide/corpus_structure.html " "for details on how to structure your corpus" )
[docs] def load_corpus(self) -> None: """ Load the corpus """ self.initialize_database() self._load_corpus() self.validate_corpus() self._create_dummy_dictionary() self.initialize_jobs() self.normalize_text() self.generate_features()
def reset_features(self): with self.session() as session: logger.debug("Dropping indexes...") session.execute(sqlalchemy.text("DROP INDEX IF EXISTS utterance_xvector_index;")) session.execute(sqlalchemy.text("DROP INDEX IF EXISTS speaker_xvector_index;")) session.execute(sqlalchemy.text("DROP INDEX IF EXISTS utterance_ivector_index;")) session.execute(sqlalchemy.text("DROP INDEX IF EXISTS speaker_ivector_index;")) session.execute(sqlalchemy.text("DROP INDEX IF EXISTS utterance_plda_vector_index;")) session.execute(sqlalchemy.text("DROP INDEX IF EXISTS speaker_plda_vector_index;")) session.commit() logger.debug("Resetting utterance features...") session.execute( sqlalchemy.update(Corpus).values( ivectors_calculated=False, plda_calculated=False, xvectors_loaded=False, features_generated=False, ) ) session.execute( sqlalchemy.update(Utterance).values( ivector=None, features=None, xvector=None, plda_vector=None ) ) session.execute( sqlalchemy.update(Speaker).values( cmvn=None, fmllr=None, ivector=None, xvector=None, plda_vector=None ) ) session.commit() logger.debug("Deleting local files...") paths = [ self.output_directory.joinpath("cmvn.ark"), self.output_directory.joinpath("cmvn.scp"), self.output_directory.joinpath("feats.scp"), self.output_directory.joinpath("ivectors.scp"), ] for path in paths: path.unlink(missing_ok=True) for j in self.jobs: paths = [ j.construct_path(self.split_directory, "cmvn", "scp"), j.construct_path(self.split_directory, "ivectors", "scp"), j.construct_path(self.split_directory, "ivectors", "ark"), ] for path in paths: path.unlink(missing_ok=True) for d_id in j.dictionary_ids: paths = [ j.construct_path(self.split_directory, "trans", "scp", d_id), j.construct_path(self.split_directory, "trans", "ark", d_id), j.construct_path(self.split_directory, "cmvn", "scp", d_id), j.construct_path(self.split_directory, "feats", "scp", d_id), j.construct_path(self.split_directory, "feats", "ark", d_id), j.construct_path(self.split_directory, "final_features", "scp", d_id), j.construct_path(self.split_directory, "final_features", "ark", d_id), ] for path in paths: path.unlink(missing_ok=True)
[docs] def generate_final_features(self) -> None: """ Generate features for the corpus """ logger.info("Generating final features...") time_begin = time.time() log_directory = self.split_directory.joinpath("log") os.makedirs(log_directory, exist_ok=True) arguments = self.final_feature_arguments() for _ in run_kaldi_function( FinalFeatureFunction, arguments, total_count=self.num_utterances ): pass with self.session() as session: update_mapping = {} session.query(Utterance).update({"ignored": True}) session.commit() for j in self.jobs: with mfa_open(j.feats_scp_path, "r") as f: for line in f: line = line.strip() if line == "": continue f = line.split(maxsplit=1) utt_id = int(f[0].split("-")[-1]) feats = f[1] update_mapping[utt_id] = { "id": utt_id, "features": feats, "ignored": False, } bulk_update(session, Utterance, list(update_mapping.values())) session.commit() non_ignored_check = ( session.query(Utterance).filter(Utterance.ignored == False).first() # noqa ) if non_ignored_check is None: raise FeatureGenerationError( f"No utterances had features, please check the logs in {log_directory} for errors." ) ignored_utterances = ( session.query( SoundFile.sound_file_path, Speaker.name, Utterance.begin, Utterance.end, Utterance.text, ) .join(Utterance.speaker) .join(Utterance.file) .join(File.sound_file) .filter(Utterance.ignored == True) # noqa ) ignored_count = 0 for sound_file_path, speaker_name, begin, end, text in ignored_utterances: logger.debug(f" - Ignored File: {sound_file_path}") logger.debug(f" - Speaker: {speaker_name}") logger.debug(f" - Begin: {begin}") logger.debug(f" - End: {end}") logger.debug(f" - Text: {text}") ignored_count += 1 if ignored_count: logger.warning( f"There were {ignored_count} utterances ignored due to an issue in feature generation, see the log file for full " "details or run `mfa validate` on the corpus." ) logger.debug(f"Generating final features took {time.time() - time_begin:.3f} seconds")
[docs] def generate_features(self) -> None: """ Generate features for the corpus """ with self.session() as session: final_features_check = session.query(Corpus).first().features_generated if final_features_check: self.features_generated = True logger.info("Features already generated.") return feature_check = ( session.query(Utterance).filter(Utterance.features != None).first() # noqa is not None ) if self.feature_type == "mfcc" and not feature_check: self.mfcc() self.combine_feats() if self.uses_cmvn: logger.info("Calculating CMVN...") self.calc_cmvn() if self.uses_voiced: self.compute_vad() self.generate_final_features() self._write_feats() self.features_generated = True with self.session() as session: session.query(Corpus).update({"features_generated": True}) session.commit() self.create_corpus_split()
[docs] def create_corpus_split(self) -> None: """Create the split directory for the corpus""" with self.session() as session: c = session.query(Corpus).first() c.current_subset = 0 session.commit() logger.info("Creating corpus split...") super().create_corpus_split()
[docs] def compute_vad_arguments(self) -> List[VadArguments]: """ Generate Job arguments for :class:`~montreal_forced_aligner.corpus.features.ComputeVadFunction` Returns ------- list[:class:`~montreal_forced_aligner.corpus.features.VadArguments`] Arguments for processing """ return [ VadArguments( j.id, getattr(self, "session" if config.USE_THREADING else "db_string", ""), self.split_directory.joinpath("log", f"compute_vad.{j.id}.log"), self.vad_options, ) for j in self.jobs ]
[docs] def calc_fmllr_arguments(self, iteration: Optional[int] = None) -> List[CalcFmllrArguments]: """ Generate Job arguments for :class:`~montreal_forced_aligner.corpus.features.CalcFmllrFunction` Returns ------- list[:class:`~montreal_forced_aligner.corpus.features.CalcFmllrArguments`] Arguments for processing """ base_log = "calc_fmllr" if iteration is not None: base_log += f".{iteration}" arguments = [] for j in self.jobs: arguments.append( CalcFmllrArguments( j.id, getattr(self, "session" if config.USE_THREADING else "db_string", ""), self.working_log_directory.joinpath(f"{base_log}.{j.id}.log"), self.working_directory, self.alignment_model_path, self.model_path, self.fmllr_options, ) ) return arguments
[docs] def mfcc_arguments(self) -> List[MfccArguments]: """ Generate Job arguments for :class:`~montreal_forced_aligner.corpus.features.MfccFunction` Returns ------- list[:class:`~montreal_forced_aligner.corpus.features.MfccArguments`] Arguments for processing """ return [ MfccArguments( j.id, getattr(self, "session" if config.USE_THREADING else "db_string", ""), self.split_directory.joinpath("log", f"make_mfcc.{j.id}.log"), self.split_directory, self.mfcc_computer, self.pitch_computer, ) for j in self.jobs ]
[docs] def final_feature_arguments(self) -> List[FinalFeatureArguments]: """ Generate Job arguments for :class:`~montreal_forced_aligner.corpus.features.MfccFunction` Returns ------- list[:class:`~montreal_forced_aligner.corpus.features.MfccArguments`] Arguments for processing """ return [ FinalFeatureArguments( j.id, getattr(self, "session" if config.USE_THREADING else "db_string", ""), self.split_directory.joinpath("log", f"generate_final_features.{j.id}.log"), self.split_directory, self.uses_cmvn, getattr(self, "sliding_cmvn", False), self.uses_voiced, getattr(self, "subsample", None), ) for j in self.jobs ]
[docs] def mfcc(self) -> None: """ Multiprocessing function that converts sound files into MFCCs. See :kaldi_docs:`feat` for an overview on feature generation in Kaldi. See Also -------- :class:`~montreal_forced_aligner.corpus.features.MfccFunction` Multiprocessing helper function for each job :meth:`.AcousticCorpusMixin.mfcc_arguments` Job method for generating arguments for helper function :kaldi_steps:`make_mfcc` Reference Kaldi script """ logger.info("Generating MFCCs...") begin = time.time() log_directory = self.split_directory.joinpath("log") os.makedirs(log_directory, exist_ok=True) arguments = self.mfcc_arguments() for _ in run_kaldi_function(MfccFunction, arguments, total_count=self.num_utterances): pass logger.debug(f"Generating MFCCs took {time.time() - begin:.3f} seconds")
[docs] def calc_cmvn(self) -> None: """ Calculate CMVN statistics for speakers See Also -------- :kaldi_src:`compute-cmvn-stats` Relevant Kaldi binary """ self._write_spk2utt() spk2utt_path = self.corpus_output_directory.joinpath("spk2utt.scp") feats_scp_path = self.corpus_output_directory.joinpath("feats.scp") cmvn_ark_path = self.corpus_output_directory.joinpath("cmvn.ark") log_path = self.features_log_directory.joinpath("cmvn.log") with kalpy_logger("kalpy.cmvn", log_path) as cmvn_logger: cmvn_logger.info(f"Reading features from: {feats_scp_path}") cmvn_logger.info(f"Reading spk2utt from: {spk2utt_path}") spk2utt = KaldiMapping(list_mapping=True) spk2utt.load(spk2utt_path) mfcc_archive = FeatureArchive(feats_scp_path) computer = CmvnComputer() computer.export_cmvn(cmvn_ark_path, mfcc_archive, spk2utt, write_scp=True) mfcc_archive.close() update_mapping = [] cmvn_scp = self.corpus_output_directory.joinpath("cmvn.scp") with self.session() as session: for s, cmvn in load_scp(cmvn_scp).items(): if isinstance(cmvn, list): cmvn = " ".join(cmvn) update_mapping.append({"id": int(s), "cmvn": cmvn}) bulk_update(session, Speaker, update_mapping) session.commit() for j in self.jobs: query = ( session.query(Speaker.id, Speaker.cmvn) .join(Speaker.utterances) .filter(Speaker.cmvn != None, Utterance.job_id == j.id) # noqa .distinct() ) with mfa_open(j.construct_path(self.split_directory, "cmvn", "scp"), "w") as f: for s_id, cmvn in sorted(query, key=lambda x: str(x)): f.write(f"{s_id} {cmvn}\n")
[docs] def calc_fmllr(self, iteration: Optional[int] = None) -> None: """ Multiprocessing function that computes speaker adaptation transforms via feature-space Maximum Likelihood Linear Regression (fMLLR). See Also -------- :class:`~montreal_forced_aligner.corpus.features.CalcFmllrFunction` Multiprocessing helper function for each job :meth:`.AcousticCorpusMixin.calc_fmllr_arguments` Job method for generating arguments for the helper function :kaldi_steps:`align_fmllr` Reference Kaldi script :kaldi_steps:`train_sat` Reference Kaldi script """ begin = time.time() logger.info("Calculating fMLLR for speaker adaptation...") with self.session() as session: corpus = session.query(Corpus).first() num_utterances = corpus.current_subset if not num_utterances: num_utterances = self.num_utterances arguments = self.calc_fmllr_arguments(iteration=iteration) for _ in run_kaldi_function(CalcFmllrFunction, arguments, total_count=num_utterances): pass self.uses_speaker_adaptation = True update_mapping = [] if not config.SINGLE_SPEAKER: for j in self.jobs: for d_id in j.dictionary_ids: scp_p = j.construct_path(self.split_directory, "trans", "scp", d_id) if not scp_p.exists(): continue with mfa_open(scp_p) as f: for line in f: line = line.strip() speaker, ark = line.split(maxsplit=1) speaker = int(speaker) update_mapping.append({"id": speaker, "fmllr": ark}) with self.session() as session: bulk_update(session, Speaker, update_mapping) session.commit() logger.debug(f"Fmllr calculation took {time.time() - begin:.3f} seconds")
[docs] def compute_vad(self) -> None: """ Compute Voice Activity Detection features over the corpus See Also -------- :class:`~montreal_forced_aligner.corpus.features.ComputeVadFunction` Multiprocessing helper function for each job :meth:`.AcousticCorpusMixin.compute_vad_arguments` Job method for generating arguments for helper function """ with self.session() as session: c = session.query(Corpus).first() if c.vad_calculated: logger.info("VAD already computed, skipping!") return begin = time.time() logger.info("Computing VAD...") arguments = self.compute_vad_arguments() for _ in run_kaldi_function( ComputeVadFunction, arguments, total_count=self.num_utterances ): pass vad_lines = [] utterance_mapping = [] for j in self.jobs: vad_scp_path = j.construct_path(self.split_directory, "vad", "scp") with mfa_open(vad_scp_path) as inf: for line in inf: vad_lines.append(line) utt_id, ark = line.strip().split(maxsplit=1) utt_id = int(utt_id.split("-")[-1]) utterance_mapping.append({"id": utt_id, "vad_ark": ark}) with self.session() as session: bulk_update(session, Utterance, utterance_mapping) session.query(Corpus).update({Corpus.vad_calculated: True}) session.commit() with mfa_open(self.corpus_output_directory.joinpath("vad.scp"), "w") as outf: for line in sorted(vad_lines, key=lambda x: x.split(maxsplit=1)[0]): outf.write(line) logger.debug(f"VAD computation took {time.time() - begin:.3f} seconds")
[docs] def combine_feats(self) -> None: """ Combine feature generation results and store relevant information """ lines = [] for j in self.jobs: with mfa_open(j.feats_scp_path) as f: for line in f: lines.append(line) with open(self.corpus_output_directory.joinpath("feats.scp"), "w", encoding="utf8") as f: for line in sorted(lines, key=lambda x: x.split(maxsplit=1)[0]): f.write(line)
def _write_feats(self) -> None: """Write feats scp file for Kaldi""" with self.session() as session, open( self.corpus_output_directory.joinpath("feats.scp"), "w", encoding="utf8" ) as f: utterances = ( session.query(Utterance.kaldi_id, Utterance.features) .filter_by(ignored=False) .order_by(Utterance.kaldi_id) ) for u_id, features in utterances: f.write(f"{u_id} {features}\n")
[docs] def get_feat_dim(self) -> int: """ Calculate the feature dimension for the corpus Returns ------- int Dimension of feature vectors """ job = self.jobs[0] dict_id = None if job.dictionary_ids: dict_id = self.jobs[0].dictionary_ids[0] feature_archive = job.construct_feature_archive(self.working_directory, dict_id) feat_dim = None for _, feats in feature_archive: feat_dim = feats.NumCols() break return feat_dim
def _load_corpus_from_source_mp(self) -> None: """ Load a corpus using multiprocessing """ begin_time = time.process_time() job_queue = Queue() return_queue = Queue() finished_adding = threading.Event() stopped = threading.Event() file_counts = Counter() error_dict = {} procs = [] parser = AcousticDirectoryParser( self.corpus_directory, job_queue, self.audio_directory, stopped, finished_adding, file_counts, ) parser.start() for i in range(config.NUM_JOBS): p = CorpusProcessWorker( i, job_queue, return_queue, stopped, finished_adding, self.speaker_characters, self.sample_frequency, ) procs.append(p) p.start() last_poll = time.time() - 30 try: with self.session() as session, tqdm(total=100, disable=config.QUIET) as pbar: import_data = DatabaseImportData() while True: try: file = return_queue.get(timeout=1) if isinstance(file, tuple): error_type = file[0] error = file[1] if error_type == "error": error_dict[error_type] = error else: if error_type not in error_dict: error_dict[error_type] = [] error_dict[error_type].append(error) continue if self.stopped.is_set(): continue except Empty: for proc in procs: if not proc.finished_processing.is_set(): break else: break continue if time.time() - last_poll > 5: pbar.total = file_counts.value() last_poll = time.time() pbar.update(1) import_data.add_objects(self.generate_import_objects(file)) return_queue.task_done() logger.debug(f"Processing queue: {time.process_time() - begin_time}") if "error" in error_dict: session.rollback() raise error_dict["error"] self._finalize_load(session, import_data) for k in ["sound_file_errors", "decode_error_files", "textgrid_read_errors"]: if hasattr(self, k): if k in error_dict: logger.info( "There were some issues with files in the corpus. " "Please look at the log file or run the validator for more information." ) logger.debug(f"{k} showed {len(error_dict[k])} errors:") if k in {"textgrid_read_errors", "sound_file_errors"}: getattr(self, k).extend(error_dict[k]) for e in error_dict[k]: logger.debug(f"{e.file_name}: {e.error}") else: logger.debug(", ".join(error_dict[k])) setattr(self, k, error_dict[k]) except Exception as e: if isinstance(e, KeyboardInterrupt): logger.info( "Detected ctrl-c, please wait a moment while we clean everything up..." ) self.stopped.set() finished_adding.set() while True: try: _ = job_queue.get(timeout=1) if self.stopped.is_set(): continue job_queue.task_done() except Empty: for proc in procs: if not proc.finished_processing.is_set(): break else: break try: _ = return_queue.get(timeout=1) _ = job_queue.get(timeout=1) if self.stopped.is_set(): continue return_queue.task_done() job_queue.task_done() except Empty: for proc in procs: if not proc.finished_processing.is_set(): break else: break raise finally: parser.join() for p in procs: p.join() if self.stopped.is_set(): logger.info(f"Stopped parsing early ({time.process_time() - begin_time} seconds)") else: logger.debug( f"Parsed corpus directory with {config.NUM_JOBS} jobs in {time.process_time() - begin_time} seconds" ) def _load_corpus_from_source(self) -> None: """ Load a corpus without using multiprocessing """ begin_time = time.time() all_sound_files = {} use_audio_directory = False if self.audio_directory and os.path.exists(self.audio_directory): use_audio_directory = True for root, _, files in os.walk(self.audio_directory, followlinks=True): if self.stopped.is_set(): return exts = find_exts(files) exts.wav_files = {k: os.path.join(root, v) for k, v in exts.wav_files.items()} exts.other_audio_files = { k: os.path.join(root, v) for k, v in exts.other_audio_files.items() } all_sound_files.update(exts.other_audio_files) all_sound_files.update(exts.wav_files) logger.debug(f"Walking through {self.corpus_directory}...") with self.session() as session: import_data = DatabaseImportData() for root, _, files in os.walk(self.corpus_directory, followlinks=True): exts = find_exts(files) relative_path = ( root.replace(str(self.corpus_directory), "").lstrip("/").lstrip("\\") ) if self.stopped.is_set(): return if not use_audio_directory: all_sound_files = {} wav_files = {k: os.path.join(root, v) for k, v in exts.wav_files.items()} other_audio_files = { k: os.path.join(root, v) for k, v in exts.other_audio_files.items() } all_sound_files.update(other_audio_files) all_sound_files.update(wav_files) for file_name in exts.identifiers: wav_path = None transcription_path = None if file_name in all_sound_files: wav_path = all_sound_files[file_name] if file_name in exts.lab_files: lab_name = exts.lab_files[file_name] transcription_path = os.path.join(root, lab_name) elif file_name in exts.textgrid_files: tg_name = exts.textgrid_files[file_name] transcription_path = os.path.join(root, tg_name) if wav_path is None: # Not a file for MFA continue try: file = FileData.parse_file( file_name, wav_path, transcription_path, relative_path, self.speaker_characters, self.sample_frequency, ) import_data.add_objects(self.generate_import_objects(file)) except TextParseError as e: self.decode_error_files.append(e) except TextGridParseError as e: self.textgrid_read_errors.append(e) except SoundFileError as e: self.sound_file_errors.append(e) self._finalize_load(session, import_data) if self.decode_error_files or self.textgrid_read_errors: logger.info( "There were some issues with files in the corpus. " "Please look at the log file or run the validator for more information." ) if self.decode_error_files: logger.debug( f"There were {len(self.decode_error_files)} errors decoding text files:" ) logger.debug(", ".join(self.decode_error_files)) if self.textgrid_read_errors: logger.debug( f"There were {len(self.textgrid_read_errors)} errors decoding reading TextGrid files:" ) for e in self.textgrid_read_errors: logger.debug(f"{e.file_name}: {e.error}") logger.debug(f"Parsed corpus directory in {time.time() - begin_time:.3f} seconds")
[docs] class AcousticCorpusPronunciationMixin( AcousticCorpusMixin, MultispeakerDictionaryMixin, metaclass=ABCMeta ): """ Mixin for acoustic corpora with Pronunciation dictionaries See Also -------- :class:`~montreal_forced_aligner.corpus.acoustic_corpus.AcousticCorpusMixin` For corpus parsing parameters :class:`~montreal_forced_aligner.dictionary.multispeaker.MultispeakerDictionaryMixin` For dictionary parsing parameters """ def __init__(self, **kwargs): super().__init__(**kwargs)
[docs] def load_corpus(self) -> None: """ Load the corpus """ all_begin = time.time() self.initialize_database() if self.dictionary_model is not None and not self.imported: self.dictionary_setup() logger.debug(f"Loaded dictionary in {time.time() - all_begin:.3f} seconds") begin = time.time() self._load_corpus() logger.debug(f"Loaded corpus in {time.time() - begin:.3f} seconds") begin = time.time() self.initialize_jobs() logger.debug(f"Initialized jobs in {time.time() - begin:.3f} seconds") initialized_check = self.text_normalized self.normalize_text() if self.dictionary_model is not None and not initialized_check: self.apply_phonological_rules() self.calculate_disambiguation() self.calculate_phone_mapping() begin = time.time() self.write_lexicon_information() logger.debug(f"Wrote lexicon information in {time.time() - begin:.3f} seconds") else: self.load_phone_groups() self.load_lexicon_compilers() begin = time.time() self.generate_features() logger.debug(f"Generated features in {time.time() - begin:.3f} seconds") logger.debug(f"Setting up corpus took {time.time() - all_begin:.3f} seconds")
[docs] class AcousticCorpus(AcousticCorpusMixin, DictionaryMixin, MfaWorker): """ Standalone class for working with acoustic corpora and pronunciation dictionaries Most functionality in MFA will use the :class:`~montreal_forced_aligner.corpus.acoustic_corpus.AcousticCorpusPronunciationMixin` class instead of this class. See Also -------- :class:`~montreal_forced_aligner.corpus.acoustic_corpus.AcousticCorpusPronunciationMixin` For dictionary and corpus parsing parameters :class:`~montreal_forced_aligner.abc.MfaWorker` For MFA processing parameters :class:`~montreal_forced_aligner.abc.TemporaryDirectoryMixin` For temporary directory parameters """ def __init__(self, **kwargs): super(AcousticCorpus, self).__init__(**kwargs) @property def identifier(self) -> str: """Identifier for the corpus""" return self.data_source_identifier @property def output_directory(self) -> Path: """Root temporary directory to store corpus and dictionary files""" return config.TEMPORARY_DIRECTORY.joinpath(self.identifier) @property def working_directory(self) -> Path: """Working directory to save temporary corpus and dictionary files""" return self.corpus_output_directory
class AcousticCorpusWithPronunciations(AcousticCorpusPronunciationMixin, MfaWorker): """ Standalone class for parsing an acoustic corpus with a pronunciation dictionary """ def __init__(self, **kwargs): super().__init__(**kwargs) @property def identifier(self) -> str: """Identifier for the corpus""" return self.data_source_identifier @property def output_directory(self) -> Path: """Root temporary directory to store corpus and dictionary files""" return config.TEMPORARY_DIRECTORY.joinpath(self.identifier) @property def working_directory(self) -> Path: """Working directory to save temporary corpus and dictionary files""" return self.output_directory