Source code for montreal_forced_aligner.vad.multiprocessing

"""Multiprocessing functionality for VAD"""
from __future__ import annotations

import typing
from pathlib import Path
from typing import TYPE_CHECKING, Union

import pynini
import pywrapfst
from _kalpy.decoder import LatticeFasterDecoder, LatticeFasterDecoderConfig
from _kalpy.fstext import GetLinearSymbolSequence
from _kalpy.gmm import DecodableAmDiagGmmScaled
from _kalpy.matrix import DoubleMatrix, FloatMatrix
from _kalpy.util import SequentialBaseFloatVectorReader
from kalpy.data import Segment
from kalpy.decoder.training_graphs import TrainingGraphCompiler
from kalpy.feat.cmvn import CmvnComputer
from kalpy.feat.mfcc import MfccComputer
from kalpy.feat.vad import VadComputer
from kalpy.fstext.lexicon import LexiconCompiler
from kalpy.utils import generate_read_specifier, read_kaldi_object
from kalpy.utterance import Utterance as KalpyUtterance
from sqlalchemy.orm import joinedload, subqueryload

from montreal_forced_aligner.abc import KaldiFunction
from montreal_forced_aligner.data import MfaArguments
from montreal_forced_aligner.db import File, Job, Speaker, Utterance
from montreal_forced_aligner.exceptions import SegmenterError
from montreal_forced_aligner.models import AcousticModel, G2PModel
from montreal_forced_aligner.vad.models import MfaVAD, get_initial_segmentation, merge_segments

if TYPE_CHECKING:
    SpeakerCharacterType = Union[str, int]
    from dataclasses import dataclass

    from montreal_forced_aligner.abc import MetaDict
else:
    from dataclassy import dataclass

__all__ = [
    "SegmentTranscriptArguments",
    "SegmentVadArguments",
    "SegmentTranscriptFunction",
    "SegmentVadFunction",
    "segment_utterance_transcript",
    "segment_utterance_vad",
]


[docs] @dataclass class SegmentVadArguments(MfaArguments): """Arguments for :class:`~montreal_forced_aligner.segmenter.SegmentVadFunction`""" vad_path: Path segmentation_options: MetaDict
@dataclass class SegmentTranscriptArguments(MfaArguments): """Arguments for :class:`~montreal_forced_aligner.segmenter.SegmentTranscriptFunction`""" acoustic_model: AcousticModel vad_model: typing.Optional[MfaVAD] lexicon_compilers: typing.Dict[int, LexiconCompiler] mfcc_options: MetaDict vad_options: MetaDict segmentation_options: MetaDict decode_options: MetaDict def segment_utterance( segment: Segment, vad_model: typing.Optional[MfaVAD], segmentation_options: MetaDict, mfcc_options: MetaDict = None, vad_options: MetaDict = None, allow_empty: bool = True, ) -> typing.List[Segment]: """ Split an utterance and its transcript into multiple transcribed utterances Parameters ---------- segment: :class:`~kalpy.data.Segment` Segment to split vad_model: :class:`~montreal_forced_aligner.vad.multiprocessing.VAD` or None VAD model from SpeechBrain, if None, then Kaldi's energy-based VAD is used segmentation_options: dict[str, Any] Segmentation options mfcc_options: dict[str, Any], optional MFCC options for energy based VAD vad_options: dict[str, Any], optional Options for energy based VAD Returns ------- list[:class:`~kalpy.data.Segment`] Split segments """ if vad_model is None: segments = segment_utterance_vad(segment, mfcc_options, vad_options, segmentation_options) else: segments = vad_model.segment_utterance( segment, **segmentation_options, allow_empty=allow_empty ) if not segments: return [segment] return segments
[docs] def segment_utterance_transcript( acoustic_model: AcousticModel, utterance: KalpyUtterance, lexicon_compiler: LexiconCompiler, vad_model: MfaVAD, segmentation_options: MetaDict, cmvn: DoubleMatrix = None, fmllr_trans: FloatMatrix = None, mfcc_options: MetaDict = None, vad_options: MetaDict = None, g2p_model: G2PModel = None, interjection_words: typing.List[str] = None, acoustic_scale: float = 0.1, beam: float = 16.0, lattice_beam: float = 10.0, max_active: int = 7000, min_active: int = 200, prune_interval: int = 25, beam_delta: float = 0.5, hash_ratio: float = 2.0, prune_scale: float = 0.1, boost_silence: float = 1.0, ): """ Split an utterance and its transcript into multiple transcribed utterances Parameters ---------- acoustic_model: :class:`~montreal_forced_aligner.models.AcousticModel` Acoustic model to use in splitting transcriptions utterance: :class:`~kalpy.utterance.Utterance` Utterance to split lexicon_compiler: :class:`~kalpy.fstext.lexicon.LexiconCompiler` Lexicon compiler vad_model: :class:`~speechbrain.pretrained.VAD` or None VAD model from SpeechBrain, if None, then Kaldi's energy-based VAD is used segmentation_options: dict[str, Any] Segmentation options cmvn: :class:`~_kalpy.matrix.DoubleMatrix` CMVN stats to apply fmllr_trans: :class:`~_kalpy.matrix.FloatMatrix` fMLLR transformation matrix for speaker adaptation mfcc_options: dict[str, Any], optional MFCC options for energy based VAD vad_options: dict[str, Any], optional Options for energy based VAD acoustic_scale: float, optional Defaults to 0.1 beam: float, optional Defaults to 16 lattice_beam: float, optional Defaults to 10 max_active: int, optional Defaults to 7000 min_active: int, optional Defaults to 250 prune_interval: int, optional Defaults to 25 beam_delta: float, optional Defaults to 0.5 hash_ratio: float, optional Defaults to 2.0 prune_scale: float, optional Defaults to 0.1 boost_silence: float, optional Defaults to 1.0 Returns ------- list[:class:`~kalpy.utterance.Utterance`] Split utterances """ graph_compiler = TrainingGraphCompiler( acoustic_model.alignment_model_path, acoustic_model.tree_path, lexicon_compiler, ) if utterance.cmvn_string: cmvn = read_kaldi_object(DoubleMatrix, utterance.cmvn_string) if utterance.fmllr_string: fmllr_trans = read_kaldi_object(FloatMatrix, utterance.fmllr_string) if cmvn is None and acoustic_model.uses_cmvn: utterance.generate_mfccs(acoustic_model.mfcc_computer) cmvn_computer = CmvnComputer() cmvn = cmvn_computer.compute_cmvn_from_features([utterance.mfccs]) current_transcript = utterance.transcript segments = segment_utterance( utterance.segment, vad_model, segmentation_options, mfcc_options, vad_options ) if not segments: return [utterance] config = LatticeFasterDecoderConfig() config.beam = beam config.lattice_beam = lattice_beam config.max_active = max_active config.min_active = min_active config.prune_interval = prune_interval config.beam_delta = beam_delta config.hash_ratio = hash_ratio config.prune_scale = prune_scale new_utts = [] am, transition_model = acoustic_model.acoustic_model, acoustic_model.transition_model if boost_silence != 1.0: am.boost_silence(transition_model, lexicon_compiler.silence_symbols, boost_silence) for seg in segments: new_utt = KalpyUtterance(seg, current_transcript) new_utt.generate_mfccs(acoustic_model.mfcc_computer) if acoustic_model.uses_cmvn: new_utt.apply_cmvn(cmvn) feats = new_utt.generate_features( acoustic_model.mfcc_computer, acoustic_model.pitch_computer, lda_mat=acoustic_model.lda_mat, fmllr_trans=fmllr_trans, ) unknown_words = [] unknown_word_index = 0 for w in new_utt.transcript.split(): if not lexicon_compiler.word_table.member(w): unknown_words.append(w) fst = graph_compiler.compile_fst(new_utt.transcript, interjection_words) decodable = DecodableAmDiagGmmScaled(am, transition_model, feats, acoustic_scale) d = LatticeFasterDecoder(fst, config) ans = d.Decode(decodable) if not ans: raise SegmenterError(f"Did not successfully decode: {current_transcript}") ans, decoded = d.GetBestPath() if decoded.NumStates() == 0: raise SegmenterError("Error getting best path from decoder for utterance") alignment, words, weight = GetLinearSymbolSequence(decoded) words = words[:-1] new_transcript = [] for w in words: w = lexicon_compiler.word_table.find(w) if w == lexicon_compiler.oov_word: w = unknown_words[unknown_word_index] unknown_word_index += 1 new_transcript.append(w) transcript = " ".join(new_transcript) if interjection_words: current_transcript = align_interjection_words( transcript, current_transcript, interjection_words, lexicon_compiler ) else: current_transcript = " ".join(current_transcript.split()[len(words) :]) new_utt.transcript = transcript new_utt.mfccs = None new_utt.cmvn_string = utterance.cmvn_string new_utt.fmllr_string = utterance.fmllr_string new_utts.append(new_utt) if current_transcript: new_utts[-1].transcript += " " + current_transcript return new_utts
def align_interjection_words( transcript, original_transcript, interjection_words: typing.List[str], lexicon_compiler: LexiconCompiler, ): g = pynini.Fst() start_state = g.add_state() g.set_start(start_state) for w in original_transcript.split(): word_symbol = lexicon_compiler.to_int(w) word_initial_state = g.add_state() for iw in interjection_words: if not lexicon_compiler.word_table.member(iw): continue iw_symbol = lexicon_compiler.to_int(iw) g.add_arc( word_initial_state - 1, pywrapfst.Arc( iw_symbol, lexicon_compiler.word_table.find("<eps>"), pywrapfst.Weight(g.weight_type(), 4.0), word_initial_state, ), ) word_final_state = g.add_state() g.add_arc( word_initial_state, pywrapfst.Arc( word_symbol, word_symbol, pywrapfst.Weight.one(g.weight_type()), word_final_state ), ) g.add_arc( word_initial_state - 1, pywrapfst.Arc( word_symbol, word_symbol, pywrapfst.Weight.one(g.weight_type()), word_final_state ), ) g.set_final(word_initial_state, pywrapfst.Weight.one(g.weight_type())) g.set_final(word_final_state, pywrapfst.Weight.one(g.weight_type())) a = pynini.accep( " ".join( [ x if lexicon_compiler.word_table.member(x) else lexicon_compiler.oov_word for x in transcript.split() ] ), token_type=lexicon_compiler.word_table, ) interjections_removed = ( pynini.compose(a, g).project("output").string(lexicon_compiler.word_table) ) return " ".join(original_transcript.split()[len(interjections_removed.split()) :])
[docs] def segment_utterance_vad( segment: Segment, mfcc_options: MetaDict, vad_options: MetaDict, segmentation_options: MetaDict, adaptive: bool = True, allow_empty: bool = True, ) -> typing.List[Segment]: mfcc_options["use_energy"] = True mfcc_options["raw_energy"] = False mfcc_options["dither"] = 0.0 mfcc_options["energy_floor"] = 0.0 mfcc_computer = MfccComputer(**mfcc_options) feats = mfcc_computer.compute_mfccs_for_export(segment, compress=False) if adaptive: vad_options["energy_mean_scale"] = 0.0 mfccs = feats.numpy() vad_options["energy_threshold"] = mfccs[:, 0].mean() vad_computer = VadComputer(**vad_options) vad = vad_computer.compute_vad(feats).numpy() segments = get_initial_segmentation(vad, mfcc_computer.frame_shift) segments = merge_segments( segments, segmentation_options["min_pause_duration"], segmentation_options["max_segment_length"], segmentation_options["min_segment_length"] if allow_empty else 0.02, ) new_segments = [] for s in segments: seg = Segment( segment.file_path, s.begin + segment.begin, s.end + segment.begin, segment.channel, ) new_segments.append(seg) return new_segments
[docs] class SegmentVadFunction(KaldiFunction): """ Multiprocessing function to generate segments from VAD output. See Also -------- :meth:`montreal_forced_aligner.segmenter.Segmenter.segment_vad` Main function that calls this function in parallel :meth:`montreal_forced_aligner.segmenter.VadSegmenter.segment_vad_arguments` Job method for generating arguments for this function :kaldi_utils:`segmentation.pl` Kaldi utility Parameters ---------- args: :class:`~montreal_forced_aligner.segmenter.SegmentVadArguments` Arguments for the function """ def __init__(self, args: SegmentVadArguments): super().__init__(args) self.vad_path = args.vad_path self.segmentation_options = args.segmentation_options def _run(self): """Run the function""" reader = SequentialBaseFloatVectorReader(generate_read_specifier(self.vad_path)) while not reader.Done(): utt_id = reader.Key() frames = reader.Value() initial_segments = get_initial_segmentation( frames.numpy(), self.segmentation_options["frame_shift"] ) merged = merge_segments( initial_segments, self.segmentation_options["min_pause_duration"], self.segmentation_options["max_segment_length"], self.segmentation_options["min_segment_length"], ) self.callback((int(utt_id.split("-")[-1]), merged)) reader.Next() reader.Close()
class SegmentTranscriptFunction(KaldiFunction): """ Multiprocessing function to segment utterances with transcripts from VAD output. See Also -------- :meth:`montreal_forced_aligner.segmenter.Segmenter.segment_vad` Main function that calls this function in parallel :meth:`montreal_forced_aligner.segmenter.TranscriptionSegmenter.segment_transcript_arguments` Job method for generating arguments for this function :kaldi_utils:`segmentation.pl` Kaldi utility Parameters ---------- args: :class:`~montreal_forced_aligner.segmenter.SegmentTranscriptArguments` Arguments for the function """ def __init__(self, args: SegmentTranscriptArguments): super().__init__(args) self.acoustic_model = args.acoustic_model self.vad_model = args.vad_model self.lexicon_compilers = args.lexicon_compilers self.segmentation_options = args.segmentation_options self.mfcc_options = args.mfcc_options self.vad_options = args.vad_options self.decode_options = args.decode_options self.speechbrain = self.vad_model is not None def _run(self): """Run the function""" with self.session() as session: job: Job = ( session.query(Job) .options(joinedload(Job.corpus, innerjoin=True), subqueryload(Job.dictionaries)) .filter(Job.id == self.job_name) .first() ) for d in job.dictionaries: utterances = ( session.query(Utterance) .join(Utterance.speaker) .options( joinedload(Utterance.file).joinedload(File.sound_file), joinedload(Utterance.speaker), ) .filter( Utterance.job_id == self.job_name, Utterance.duration >= 0.1, Speaker.dictionary_id == d.id, ) .order_by(Utterance.kaldi_id) ) for u in utterances: new_utterances = segment_utterance_transcript( self.acoustic_model, u.to_kalpy(), self.lexicon_compilers[d.id], self.vad_model if self.speechbrain else None, self.segmentation_options, mfcc_options=self.mfcc_options if not self.speechbrain else None, vad_options=self.vad_options if not self.speechbrain else None, **self.decode_options, ) self.callback((u.id, new_utterances))