Source code for montreal_forced_aligner.corpus.multiprocessing

"""
Corpus loading worker
---------------------
"""
from __future__ import annotations

import os
import threading
import typing
from pathlib import Path
from queue import Empty, Queue

import sqlalchemy
from sqlalchemy.orm import joinedload, subqueryload

from montreal_forced_aligner.abc import KaldiFunction
from montreal_forced_aligner.corpus.classes import FileData
from montreal_forced_aligner.corpus.helper import find_exts
from montreal_forced_aligner.data import Language, MfaArguments
from montreal_forced_aligner.db import Dictionary, Job, Speaker, Utterance
from montreal_forced_aligner.exceptions import SoundFileError, TextGridParseError, TextParseError
from montreal_forced_aligner.helper import mfa_open
from montreal_forced_aligner.utils import Counter

if typing.TYPE_CHECKING:
    from dataclasses import dataclass

    from montreal_forced_aligner.models import G2PModel
    from montreal_forced_aligner.tokenization.simple import SimpleTokenizer

    try:
        from spacy.language import Language as SpacyLanguage
    except ImportError:
        SpacyLanguage = None
else:
    from dataclassy import dataclass

__all__ = [
    "AcousticDirectoryParser",
    "CorpusProcessWorker",
    "ExportKaldiFilesFunction",
    "ExportKaldiFilesArguments",
    "NormalizeTextFunction",
    "NormalizeTextArguments",
    "dictionary_ids_for_job",
]


def dictionary_ids_for_job(session, job_id):
    dictionary_ids = [
        x[0]
        for x in session.query(Dictionary.id)
        .join(Utterance.speaker)
        .join(Speaker.dictionary)
        .filter(Utterance.in_subset == True)  # noqa
        .filter(Utterance.job_id == job_id)
        .distinct()
    ]
    return dictionary_ids


class AcousticDirectoryParser(threading.Thread):
    """
    Worker for processing directories for acoustic sound files

    Parameters
    ----------
    corpus_directory: str
        Directory to parse
    job_queue: Queue
        Queue to add file names to
    audio_directory: str
        Directory with additional audio files
    stopped: :class:`~threading.Event`
        Check for whether to exit early
    finished_adding: :class:`~threading.Event`
        Check to set when the parser is done adding files to the queue
    file_counts: :class:`~montreal_forced_aligner.utils.Counter`
        Counter for the number of total files that the parser has found
    """

    def __init__(
        self,
        corpus_directory: str,
        job_queue: Queue,
        audio_directory: str,
        stopped: threading.Event,
        finished_adding: threading.Event,
        file_counts: Counter,
    ):
        super().__init__()
        self.corpus_directory = corpus_directory
        self.job_queue = job_queue
        self.audio_directory = audio_directory
        self.stopped = stopped
        self.finished_adding = finished_adding
        self.file_counts = file_counts

    def run(self) -> None:
        """
        Run the corpus loading job
        """

        use_audio_directory = False
        all_sound_files = {}
        if self.audio_directory and os.path.exists(self.audio_directory):
            use_audio_directory = True
            for root, _, files in os.walk(self.audio_directory, followlinks=True):
                exts = find_exts(files)
                wav_files = {k: os.path.join(root, v) for k, v in exts.wav_files.items()}
                other_audio_files = {
                    k: os.path.join(root, v) for k, v in exts.other_audio_files.items()
                }
                all_sound_files.update(other_audio_files)
                all_sound_files.update(wav_files)
        for root, _, files in os.walk(self.corpus_directory, followlinks=True):
            exts = find_exts(files)
            relative_path = root.replace(str(self.corpus_directory), "").lstrip("/").lstrip("\\")

            if self.stopped.is_set():
                break
            if not use_audio_directory:
                all_sound_files = {}
                exts.wav_files = {k: os.path.join(root, v) for k, v in exts.wav_files.items()}
                exts.other_audio_files = {
                    k: os.path.join(root, v) for k, v in exts.other_audio_files.items()
                }
                all_sound_files.update(exts.other_audio_files)
                all_sound_files.update(exts.wav_files)
            for file_name in exts.identifiers:
                if self.stopped.is_set():
                    break
                wav_path = None
                transcription_path = None
                if file_name in all_sound_files:
                    wav_path = all_sound_files[file_name]
                if file_name in exts.lab_files:
                    lab_name = exts.lab_files[file_name]
                    transcription_path = os.path.join(root, lab_name)

                elif file_name in exts.textgrid_files:
                    tg_name = exts.textgrid_files[file_name]
                    transcription_path = os.path.join(root, tg_name)
                if wav_path is None:
                    continue
                self.job_queue.put((file_name, wav_path, transcription_path, relative_path))
                self.file_counts.increment()

        self.finished_adding.set()


[docs] class CorpusProcessWorker(threading.Thread): """ Multiprocessing corpus loading worker Attributes ---------- job_q: :class:`~multiprocessing.Queue` Job queue for files to process return_dict: dict Dictionary to catch errors return_q: :class:`~multiprocessing.Queue` Return queue for processed Files stopped: :class:`~threading.Event` Stop check for whether corpus loading should exit finished_adding: :class:`~threading.Event` Signal that the main thread has stopped adding new files to be processed """ def __init__( self, name: int, job_q: Queue, return_q: Queue, stopped: threading.Event, finished_adding: threading.Event, speaker_characters: typing.Union[int, str], sample_rate: typing.Optional[int], ): super().__init__() self.name = str(name) self.job_q = job_q self.return_q = return_q self.stopped = stopped self.finished_adding = finished_adding self.finished_processing = threading.Event() self.speaker_characters = speaker_characters self.sample_rate = sample_rate
[docs] def run(self) -> None: """ Run the corpus loading job """ while True: try: file_name, wav_path, text_path, relative_path = self.job_q.get(timeout=1) except Empty: if self.finished_adding.is_set(): break continue if self.stopped.is_set(): continue try: file = FileData.parse_file( file_name, wav_path, text_path, relative_path, self.speaker_characters, self.sample_rate, ) self.return_q.put(file) except TextParseError as e: self.return_q.put(("decode_error_files", e)) except TextGridParseError as e: self.return_q.put(("textgrid_read_errors", e)) except SoundFileError as e: self.return_q.put(("sound_file_errors", e)) except Exception as e: self.stopped.set() self.return_q.put(("error", e)) self.finished_processing.set() return
@dataclass class NormalizeTextArguments(MfaArguments): """ Arguments for :class:`~montreal_forced_aligner.corpus.multiprocessing.NormalizeTextFunction` """ tokenizers: typing.Union[typing.Dict[int, SimpleTokenizer], Language] g2p_model: typing.Optional[G2PModel] ignore_case: bool @dataclass class ExportKaldiFilesArguments(MfaArguments): """ Arguments for :class:`~montreal_forced_aligner.corpus.multiprocessing.NormalizeTextFunction` Parameters ---------- """ split_directory: Path class NormalizeTextFunction(KaldiFunction): """ Multiprocessing function for normalizing text. Parameters ---------- args: :class:`~montreal_forced_aligner.corpus.multiprocessing.NormalizeTextArguments` Arguments for the function """ def __init__(self, args: NormalizeTextArguments): super().__init__(args) self.tokenizers = args.tokenizers self.g2p_model = args.g2p_model self.ignore_case = args.ignore_case def _run(self): """Run the function""" from montreal_forced_aligner.tokenization.simple import SimpleTokenizer with self.session() as session: dict_count = session.query(Dictionary).join(Dictionary.words).limit(1).count() if dict_count > 0 or isinstance(self.tokenizers, dict): dictionaries = session.query(Dictionary) for d in dictionaries: if isinstance(self.tokenizers, dict): tokenizer = self.tokenizers[d.id] else: tokenizer = self.tokenizers simple_tokenization = isinstance(tokenizer, SimpleTokenizer) if isinstance(tokenizer, Language): from montreal_forced_aligner.tokenization.spacy import ( generate_language_tokenizer, ) tokenizer = generate_language_tokenizer( tokenizer, ignore_case=self.ignore_case ) utterances = ( session.query(Utterance.id, Utterance.text) .join(Utterance.speaker) .filter(Utterance.text != "") .filter(Utterance.job_id == self.job_name) .filter(Speaker.dictionary_id == d.id) ) for u_id, u_text in utterances: if simple_tokenization: normalized_text, normalized_character_text, oovs = tokenizer(u_text) self.callback( ( { "id": u_id, "oovs": " ".join(sorted(oovs)), "normalized_text": normalized_text, "normalized_character_text": normalized_character_text, }, d.id, ) ) else: tokenized = tokenizer(u_text) if isinstance(tokenized, tuple): normalized_text, pronunciation_form = tokenized else: if not isinstance(tokenized, str): tokenized = " ".join([x.text for x in tokenized]) if self.ignore_case: tokenized = tokenized.lower() normalized_text, pronunciation_form = tokenized, tokenized.lower() oovs = set() self.callback( ( { "id": u_id, "oovs": " ".join(sorted(oovs)), "normalized_text": normalized_text, "normalized_character_text": pronunciation_form, }, d.id, ) ) else: tokenizer = self.tokenizers utterances = ( session.query(Utterance.id, Utterance.text) .filter(Utterance.text != "") .filter(Utterance.job_id == self.job_name) ) for u_id, u_text in utterances: if tokenizer is None: normalized_text, normalized_character_text = u_text, u_text oovs = [] else: normalized_text, normalized_character_text, oovs = tokenizer(u_text) self.callback( ( { "id": u_id, "oovs": " ".join(sorted(oovs)), "normalized_text": normalized_text, "normalized_character_text": normalized_character_text, }, None, ) ) class ExportKaldiFilesFunction(KaldiFunction): """ Multiprocessing function for normalizing text. Parameters ---------- args: :class:`~montreal_forced_aligner.corpus.multiprocessing.NormalizeTextArguments` Arguments for the function """ def __init__(self, args: ExportKaldiFilesArguments): super().__init__(args) self.split_directory = args.split_directory def output_to_directory(self, session) -> None: """ Output job information to a directory Parameters ---------- split_directory: str Directory to output to """ job = ( session.query(Job) .options(joinedload(Job.corpus, innerjoin=True), subqueryload(Job.dictionaries)) .filter(Job.id == self.job_name) .first() ) base_utterance_query = ( session.query(sqlalchemy.func.count(Utterance.id)) .filter(Utterance.job_id == job.id) .filter(Utterance.ignored == False) # noqa ) if job.corpus.current_subset: base_utterance_query = base_utterance_query.filter(Utterance.in_subset == True) # noqa if not base_utterance_query.scalar(): return if not job.has_dictionaries: utterances = ( session.query( Utterance.id, Utterance.speaker_id, Utterance.features, Utterance.vad_ark, Utterance.ivector_ark, Speaker.cmvn, ) .join(Utterance.speaker) .filter(Utterance.job_id == job.id) .filter(Utterance.ignored == False) # noqa .order_by(Utterance.kaldi_id) ) if job.corpus.current_subset: utterances = utterances.filter(Utterance.in_subset == True) # noqa utt2spk_path = job.construct_path(self.split_directory, "utt2spk", "scp") feats_path = job.construct_path(self.split_directory, "feats", "scp") cmvns_path = job.construct_path(self.split_directory, "cmvn", "scp") spk2utt_path = job.construct_path(self.split_directory, "spk2utt", "scp") vad_path = job.construct_path(self.split_directory, "vad", "scp") ivectors_path = job.construct_path(self.split_directory, "ivectors", "scp") spk2utt = {} vad = {} ivectors = {} feats = {} cmvns = {} utt2spk = {} for ( u_id, s_id, features, vad_ark, ivector_ark, cmvn, ) in utterances: utterance = str(u_id) speaker = str(s_id) utterance = f"{speaker}-{utterance}" if speaker not in spk2utt: spk2utt[speaker] = [] spk2utt[speaker].append(utterance) utt2spk[utterance] = speaker feats[utterance] = features if vad_ark: vad[utterance] = vad_ark if ivector_ark: ivectors[utterance] = ivector_ark cmvns[speaker] = cmvn self.callback(1) with mfa_open(spk2utt_path, "w") as f: for speaker, utts in sorted(spk2utt.items()): utts = " ".join(sorted(utts)) f.write(f"{speaker} {utts}\n") with mfa_open(cmvns_path, "w") as f: for speaker, cmvn in sorted(cmvns.items()): f.write(f"{speaker} {cmvn}\n") with mfa_open(utt2spk_path, "w") as f: for utt, spk in sorted(utt2spk.items()): f.write(f"{utt} {spk}\n") with mfa_open(feats_path, "w") as f: for utt, feat in sorted(feats.items()): f.write(f"{utt} {feat}\n") if vad: with mfa_open(vad_path, "w") as f: for utt, ark in sorted(vad.items()): f.write(f"{utt} {ark}\n") if ivectors: with mfa_open(ivectors_path, "w") as f: for utt, ark in sorted(ivectors.items()): f.write(f"{utt} {ark}\n") else: base_utterance_query = ( session.query( Utterance.id, Utterance.speaker_id, Utterance.features, Speaker.cmvn, ) .join(Utterance.speaker) .filter(Utterance.job_id == job.id) .filter(Utterance.ignored == False) # noqa .order_by(Utterance.kaldi_id) ) if job.corpus.current_subset: base_utterance_query = base_utterance_query.filter( Utterance.in_subset == True # noqa ) utt2spk_paths = job.per_dictionary_utt2spk_scp_paths feats_paths = job.per_dictionary_feats_scp_paths cmvns_paths = job.per_dictionary_cmvn_scp_paths spk2utt_paths = job.per_dictionary_spk2utt_scp_paths for d in job.dictionaries: spk2utt = {} feats = {} cmvns = {} utt2spk = {} utterances = base_utterance_query.filter(Speaker.dictionary_id == d.id) for ( u_id, s_id, features, cmvn, ) in utterances: utterance = str(u_id) speaker = str(s_id) utterance = f"{speaker}-{utterance}" if speaker not in spk2utt: spk2utt[speaker] = [] spk2utt[speaker].append(utterance) utt2spk[utterance] = speaker feats[utterance] = features cmvns[speaker] = cmvn self.callback(1) with mfa_open(spk2utt_paths[d.id], "w") as f: for speaker, utts in sorted(spk2utt.items()): utts = " ".join(sorted(utts)) f.write(f"{speaker} {utts}\n") with mfa_open(cmvns_paths[d.id], "w") as f: for speaker, cmvn in sorted(cmvns.items()): f.write(f"{speaker} {cmvn}\n") with mfa_open(utt2spk_paths[d.id], "w") as f: for utt, spk in sorted(utt2spk.items()): f.write(f"{utt} {spk}\n") with mfa_open(feats_paths[d.id], "w") as f: for utt, feat in sorted(feats.items()): f.write(f"{utt} {feat}\n") def _run(self): """Run the function""" with self.session() as session: self.output_to_directory(session)