Source code for montreal_forced_aligner.corpus.text_corpus

"""Class definitions for corpora"""
from __future__ import annotations

import logging
import os
import threading
import time
from pathlib import Path
from queue import Empty, Queue

from tqdm.rich import tqdm

from montreal_forced_aligner import config
from montreal_forced_aligner.abc import MfaWorker, TemporaryDirectoryMixin
from montreal_forced_aligner.corpus.base import CorpusMixin
from montreal_forced_aligner.corpus.classes import FileData
from montreal_forced_aligner.corpus.helper import find_exts
from montreal_forced_aligner.corpus.multiprocessing import CorpusProcessWorker
from montreal_forced_aligner.data import DatabaseImportData
from montreal_forced_aligner.dictionary.multispeaker import MultispeakerDictionaryMixin
from montreal_forced_aligner.exceptions import TextGridParseError, TextParseError

logger = logging.getLogger("mfa")


[docs] class TextCorpusMixin(CorpusMixin): """ Abstract mixin class for processing text corpora See Also -------- :class:`~montreal_forced_aligner.corpus.base.CorpusMixin` For corpus parsing parameters """ def __init__(self, **kwargs): super().__init__(**kwargs) def _load_corpus_from_source_mp(self) -> None: """ Load a corpus using multiprocessing """ if self.stopped is None: self.stopped = threading.Event() begin_time = time.time() job_queue = Queue() return_queue = Queue() error_dict = {} finished_adding = threading.Event() procs = [] for i in range(config.NUM_JOBS): p = CorpusProcessWorker( i, job_queue, return_queue, self.stopped, finished_adding, self.speaker_characters, sample_rate=0, ) procs.append(p) p.start() import_data = DatabaseImportData() try: file_count = 0 with tqdm(total=1, disable=config.QUIET) as pbar, self.session() as session: for root, _, files in os.walk(self.corpus_directory, followlinks=True): exts = find_exts(files) relative_path = ( root.replace(str(self.corpus_directory), "").lstrip("/").lstrip("\\") ) if self.stopped.is_set(): break for file_name in exts.identifiers: if self.stopped.is_set(): break wav_path = None if file_name in exts.lab_files: lab_name = exts.lab_files[file_name] transcription_path = os.path.join(root, lab_name) elif file_name in exts.textgrid_files: tg_name = exts.textgrid_files[file_name] transcription_path = os.path.join(root, tg_name) else: continue job_queue.put((file_name, wav_path, transcription_path, relative_path)) file_count += 1 pbar.total = file_count finished_adding.set() while True: try: file = return_queue.get(timeout=1) if isinstance(file, tuple): error_type = file[0] error = file[1] if error_type == "error": error_dict[error_type] = error else: if error_type not in error_dict: error_dict[error_type] = [] error_dict[error_type].append(error) continue if self.stopped.is_set(): continue except Empty: for proc in procs: if not proc.finished_processing.is_set(): break else: break continue pbar.update(1) import_data.add_objects(self.generate_import_objects(file)) return_queue.task_done() logger.debug("Waiting for workers to finish...") for p in procs: p.join() if "error" in error_dict: session.rollback() raise error_dict["error"][1] self._finalize_load(session, import_data) for k in ["decode_error_files", "textgrid_read_errors"]: if hasattr(self, k): if k in error_dict: logger.info( "There were some issues with files in the corpus. " "Please look at the log file or run the validator for more information." ) logger.debug(f"{k} showed {len(error_dict[k])} errors:") if k == "textgrid_read_errors": getattr(self, k).extend(error_dict[k]) for e in error_dict[k]: logger.debug(f"{e.file_name}: {e.error}") else: logger.debug(", ".join(error_dict[k])) setattr(self, k, error_dict[k]) except KeyboardInterrupt: logger.info("Detected ctrl-c, please wait a moment while we clean everything up...") self.stopped.set() finished_adding.set() while True: try: _ = return_queue.get(timeout=1) if self.stopped.is_set(): continue return_queue.task_done() except Empty: for proc in procs: if not proc.finished_processing.is_set(): break else: break finally: finished_adding.set() for p in procs: p.join() if self.stopped.is_set(): logger.info(f"Stopped parsing early ({time.time() - begin_time:.3f} seconds)") else: logger.debug( f"Parsed corpus directory with {config.NUM_JOBS} jobs in {time.time() - begin_time:.3f} seconds" ) def _load_corpus_from_source(self) -> None: """ Load a corpus without using multiprocessing """ begin_time = time.time() self.stopped = False import_data = DatabaseImportData() sanitize_function = getattr(self, "sanitize_function", None) with self.session() as session: for root, _, files in os.walk(self.corpus_directory, followlinks=True): exts = find_exts(files) relative_path = ( root.replace(str(self.corpus_directory), "").lstrip("/").lstrip("\\") ) if self.stopped: return for file_name in exts.identifiers: wav_path = None if file_name in exts.lab_files: lab_name = exts.lab_files[file_name] transcription_path = os.path.join(root, lab_name) elif file_name in exts.textgrid_files: tg_name = exts.textgrid_files[file_name] transcription_path = os.path.join(root, tg_name) else: continue try: file = FileData.parse_file( file_name, wav_path, transcription_path, relative_path, self.speaker_characters, sanitize_function, ) import_data.add_objects(self.generate_import_objects(file)) except TextParseError as e: self.decode_error_files.append(e) except TextGridParseError as e: self.textgrid_read_errors.append(e) self._finalize_load(session, import_data) if self.decode_error_files or self.textgrid_read_errors: logger.info( "There were some issues with files in the corpus. " "Please look at the log file or run the validator for more information." ) if self.decode_error_files: logger.debug( f"There were {len(self.decode_error_files)} errors decoding text files:" ) logger.debug(", ".join(self.decode_error_files)) if self.textgrid_read_errors: logger.debug( f"There were {len(self.textgrid_read_errors)} errors decoding reading TextGrid files:" ) for e in self.textgrid_read_errors: logger.debug(f"{e.file_name}: {e.error}") logger.debug(f"Parsed corpus directory in {time.time()-begin_time} seconds")
[docs] class DictionaryTextCorpusMixin(TextCorpusMixin, MultispeakerDictionaryMixin): """ Abstract mixin class for processing text corpora with pronunciation dictionaries. This is primarily useful for training language models, as you can treat words in the language model as OOV if they aren't in your pronunciation dictionary See Also -------- :class:`~montreal_forced_aligner.corpus.text_corpus.TextCorpusMixin` For corpus parsing parameters :class:`~montreal_forced_aligner.dictionary.multispeaker.MultispeakerDictionaryMixin` For dictionary parsing parameters """ def __init__(self, **kwargs): super().__init__(**kwargs)
[docs] def load_corpus(self) -> None: """ Load the corpus """ self.initialize_database() if not self.imported: self.dictionary_setup() self._load_corpus() self.initialize_jobs() initialized_check = self.text_normalized self.normalize_text() if not initialized_check: self.write_lexicon_information() self.create_corpus_split()
[docs] class TextCorpus(TextCorpusMixin, MfaWorker, TemporaryDirectoryMixin): """ Standalone class for working with text corpora without a pronunciation dictionary Most MFA functionality will use the :class:`~montreal_forced_aligner.corpus.text_corpus.TextCorpusMixin` class rather than this class. See Also -------- :class:`~montreal_forced_aligner.corpus.text_corpus.DictionaryTextCorpusMixin` For dictionary and corpus parsing parameters :class:`~montreal_forced_aligner.abc.MfaWorker` For MFA processing parameters :class:`~montreal_forced_aligner.abc.TemporaryDirectoryMixin` For temporary directory parameters """ def __init__(self, **kwargs): super().__init__(**kwargs)
[docs] def load_corpus(self) -> None: """ Load the corpus """ self.initialize_database() self._load_corpus() self.initialize_jobs() self.create_corpus_split()
@property def identifier(self) -> str: """Identifier for the corpus""" return self.data_source_identifier @property def output_directory(self) -> Path: """Root temporary directory to store all corpus and dictionary files""" return config.TEMPORARY_DIRECTORY.joinpath(self.identifier) @property def working_directory(self) -> Path: """Working directory""" return self.corpus_output_directory
class DictionaryTextCorpus(DictionaryTextCorpusMixin, MfaWorker, TemporaryDirectoryMixin): """ Standalone class for working with text corpora and pronunciation dictionaries Most MFA functionality will use the :class:`~montreal_forced_aligner.corpus.text_corpus.DictionaryTextCorpusMixin` class rather than this class. See Also -------- :class:`~montreal_forced_aligner.corpus.text_corpus.DictionaryTextCorpusMixin` For dictionary and corpus parsing parameters :class:`~montreal_forced_aligner.abc.MfaWorker` For MFA processing parameters :class:`~montreal_forced_aligner.abc.TemporaryDirectoryMixin` For temporary directory parameters """ def __init__(self, **kwargs): super().__init__(**kwargs) @property def identifier(self) -> str: """Identifier for the corpus""" return self.data_source_identifier @property def output_directory(self) -> Path: """Root temporary directory to store all corpus and dictionary files""" return config.TEMPORARY_DIRECTORY.joinpath(self.identifier) @property def working_directory(self) -> Path: """Working directory""" return self.corpus_output_directory