Source code for montreal_forced_aligner.vad.multiprocessing
"""Multiprocessing functionality for VAD"""
from __future__ import annotations
import logging
import os
import re
import subprocess
import typing
from typing import TYPE_CHECKING, List, Union
from montreal_forced_aligner.abc import KaldiFunction
from montreal_forced_aligner.data import CtmInterval, MfaArguments
from montreal_forced_aligner.helper import mfa_open
from montreal_forced_aligner.utils import read_feats, thirdparty_binary
try:
import warnings
with warnings.catch_warnings():
warnings.simplefilter("ignore")
torch_logger = logging.getLogger("speechbrain.utils.torch_audio_backend")
torch_logger.setLevel(logging.ERROR)
torch_logger = logging.getLogger("speechbrain.utils.train_logger")
torch_logger.setLevel(logging.ERROR)
from speechbrain.pretrained import VAD
FOUND_SPEECHBRAIN = True
except (ImportError, OSError):
FOUND_SPEECHBRAIN = False
VAD = None
if TYPE_CHECKING:
SpeakerCharacterType = Union[str, int]
from montreal_forced_aligner.abc import MetaDict
[docs]
class SegmentVadArguments(MfaArguments):
"""Arguments for :class:`~montreal_forced_aligner.segmenter.SegmentVadFunction`"""
vad_path: str
segmentation_options: MetaDict
[docs]
def get_initial_segmentation(
frames: List[Union[int, str]], frame_shift: float
) -> List[CtmInterval]:
"""
Compute initial segmentation over voice activity
Parameters
----------
frames: list[Union[int, str]]
List of frames with VAD output
frame_shift: float
Frame shift of features in seconds
Returns
-------
List[CtmInterval]
Initial segmentation
"""
segs = []
cur_seg = None
silent_frames = 0
non_silent_frames = 0
for i, f in enumerate(frames):
if int(f) > 0:
non_silent_frames += 1
if cur_seg is None:
cur_seg = CtmInterval(begin=i * frame_shift, end=0, label="speech")
else:
silent_frames += 1
if cur_seg is not None:
cur_seg.end = (i - 1) * frame_shift
segs.append(cur_seg)
cur_seg = None
if cur_seg is not None:
cur_seg.end = len(frames) * frame_shift
segs.append(cur_seg)
return segs
[docs]
def merge_segments(
segments: List[CtmInterval],
min_pause_duration: float,
max_segment_length: float,
min_segment_length: float,
) -> List[CtmInterval]:
"""
Merge segments together
Parameters
----------
segments: SegmentationType
Initial segments
min_pause_duration: float
Minimum amount of silence time to mark an utterance boundary
max_segment_length: float
Maximum length of segments before they're broken up
snap_boundary_threshold:
Boundary threshold to snap boundaries together
Returns
-------
List[CtmInterval]
Merged segments
"""
merged_segs = []
snap_boundary_threshold = min_pause_duration / 2
for s in segments:
if (
not merged_segs
or s.begin > merged_segs[-1].end + min_pause_duration
or s.end - merged_segs[-1].begin > max_segment_length
):
if s.end - s.begin > min_pause_duration:
if merged_segs and snap_boundary_threshold:
boundary_gap = s.begin - merged_segs[-1].end
if boundary_gap < snap_boundary_threshold:
half_boundary = boundary_gap / 2
else:
half_boundary = snap_boundary_threshold / 2
merged_segs[-1].end += half_boundary
s.begin -= half_boundary
merged_segs.append(s)
else:
merged_segs[-1].end = s.end
return [x for x in merged_segs if x.end - x.begin > min_segment_length]
[docs]
class SegmentVadFunction(KaldiFunction):
"""
Multiprocessing function to generate segments from VAD output.
See Also
--------
:meth:`montreal_forced_aligner.segmenter.Segmenter.segment_vad`
Main function that calls this function in parallel
:meth:`montreal_forced_aligner.segmenter.Segmenter.segment_vad_arguments`
Job method for generating arguments for this function
:kaldi_utils:`segmentation.pl`
Kaldi utility
Parameters
----------
args: :class:`~montreal_forced_aligner.segmenter.SegmentVadArguments`
Arguments for the function
"""
progress_pattern = re.compile(
r"^LOG.*processed (?P<done>\d+) utterances.*(?P<no_feats>\d+) had.*(?P<unvoiced>\d+) were.*"
)
def __init__(self, args: SegmentVadArguments):
super().__init__(args)
self.vad_path = args.vad_path
self.segmentation_options = args.segmentation_options
def _run(self) -> typing.Generator[typing.Tuple[int, float, float]]:
"""Run the function"""
with mfa_open(self.log_path, "w") as log_file:
copy_proc = subprocess.Popen(
[
thirdparty_binary("copy-vector"),
"--binary=false",
f"scp:{self.vad_path}",
"ark,t:-",
],
stdout=subprocess.PIPE,
stderr=log_file,
env=os.environ,
)
for utt_id, frames in read_feats(copy_proc):
initial_segments = get_initial_segmentation(
frames, self.segmentation_options["frame_shift"]
)
merged = merge_segments(
initial_segments,
self.segmentation_options["close_th"],
self.segmentation_options["large_chunk_size"],
self.segmentation_options["len_th"],
)
yield utt_id, merged