Source code for montreal_forced_aligner.vad.models

"""Model classes for Voice Activity Detection"""
from __future__ import annotations

import logging
import os
import sys
import typing
import warnings
from pathlib import Path

import numpy as np
from kalpy.data import Segment
from kalpy.gmm.data import CtmInterval

from montreal_forced_aligner import config

if typing.TYPE_CHECKING:
    from montreal_forced_aligner.abc import MetaDict

try:
    with warnings.catch_warnings():
        warnings.simplefilter("ignore")
        torch_logger = logging.getLogger("speechbrain.utils.torch_audio_backend")
        torch_logger.setLevel(logging.ERROR)
        torch_logger = logging.getLogger("speechbrain.utils.train_logger")
        torch_logger.setLevel(logging.ERROR)
        import torch
        import torchaudio

        try:
            from speechbrain.pretrained import VAD
        except ImportError:  # speechbrain 1.0
            from speechbrain.inference.VAD import VAD

    FOUND_SPEECHBRAIN = True
except (ImportError, OSError):
    FOUND_SPEECHBRAIN = False
    VAD = object


logger = logging.getLogger("mfa")


[docs] def get_initial_segmentation(frames: np.ndarray, frame_shift: float) -> typing.List[CtmInterval]: """ Compute initial segmentation over voice activity Parameters ---------- frames: list[Union[int, str]] List of frames with VAD output frame_shift: float Frame shift of features in seconds Returns ------- List[:class:`~kalpy.gmm.data.CtmInterval`] Initial segmentation """ segments = [] cur_segment = None silent_frames = 0 non_silent_frames = 0 for i in range(frames.shape[0]): f = frames[i] if int(f) > 0: non_silent_frames += 1 if cur_segment is None: cur_segment = CtmInterval(i * frame_shift, 0, "speech") else: silent_frames += 1 if cur_segment is not None: cur_segment.end = (i - 1) * frame_shift segments.append(cur_segment) cur_segment = None if cur_segment is not None: cur_segment.end = len(frames) * frame_shift segments.append(cur_segment) return segments
[docs] def merge_segments( segments: typing.List[CtmInterval], min_pause_duration: float, max_segment_length: float, min_segment_length: float, snap_boundaries: bool = True, ) -> typing.List[CtmInterval]: """ Merge segments together Parameters ---------- segments: SegmentationType Initial segments min_pause_duration: float Minimum amount of silence time to mark an utterance boundary max_segment_length: float Maximum length of segments before they're broken up min_segment_length: float Minimum length of segments returned Returns ------- List[:class:`~kalpy.gmm.data.CtmInterval`] Merged segments """ merged_segments = [] snap_boundary_threshold = 0 if snap_boundaries: snap_boundary_threshold = min_pause_duration / 2 for s in segments: if ( not merged_segments or s.begin > merged_segments[-1].end + min_pause_duration or s.end - merged_segments[-1].begin > max_segment_length ): if merged_segments and snap_boundary_threshold: boundary_gap = s.begin - merged_segments[-1].end if boundary_gap < snap_boundary_threshold: half_boundary = boundary_gap / 2 else: half_boundary = snap_boundary_threshold / 2 merged_segments[-1].end += half_boundary s.begin -= half_boundary merged_segments.append(s) else: merged_segments[-1].end = s.end return [x for x in merged_segments if x.end - x.begin > min_segment_length]
class MfaVAD(VAD): def energy_VAD( self, audio_file: typing.Union[str, Path, np.ndarray, torch.Tensor], segments, activation_threshold=0.5, deactivation_threshold=0.0, eps=1e-6, ): """Applies energy-based VAD within the detected speech segments.The neural network VAD often creates longer segments and tends to merge segments that are close with each other. The energy VAD post-processes can be useful for having a fine-grained voice activity detection. The energy VAD computes the energy within the small chunks. The energy is normalized within the segment to have mean 0.5 and +-0.5 of std. This helps to set the energy threshold. Arguments --------- audio_file: path Path of the audio file containing the recording. The file is read with torchaudio. segments: list[:class:`~kalpy.gmm.data.CtmInterval`] torch.Tensor containing the speech boundaries. It can be derived using the get_boundaries method. activation_threshold: float A new speech segment is started it the energy is above activation_th. deactivation_threshold: float The segment is considered ended when the energy is <= deactivation_th. eps: float Small constant for numerical stability. Returns ------- new_boundaries The new boundaries that are post-processed by the energy VAD. """ if isinstance(audio_file, (str, Path)): # Getting the total size of the input file sample_rate, audio_len = self._get_audio_info(audio_file) if sample_rate != self.sample_rate: raise ValueError( "The detected sample rate is different from that set in the hparam file" ) else: sample_rate = self.sample_rate # Computing the chunk length of the energy window chunk_len = int(self.time_resolution * sample_rate) new_segments = [] # Processing speech segments for segment in segments: begin_sample = int(segment.begin * sample_rate) end_sample = int(segment.end * sample_rate) seg_len = end_sample - begin_sample if seg_len < chunk_len: continue if not isinstance(audio_file, torch.Tensor): # Reading the speech segment audio, _ = torchaudio.load( audio_file, frame_offset=begin_sample, num_frames=seg_len ) else: audio = audio_file[:, begin_sample : begin_sample + seg_len] # Create chunks segment_chunks = self.create_chunks( audio, chunk_size=chunk_len, chunk_stride=chunk_len ) # Energy computation within each chunk energy_chunks = segment_chunks.abs().sum(-1) + eps energy_chunks = energy_chunks.log() # Energy normalization energy_chunks = ( (energy_chunks - energy_chunks.mean()) / (2 * energy_chunks.std()) ) + 0.5 energy_chunks = energy_chunks # Apply threshold based on the energy value new_segments.extend( self.generate_segments( energy_chunks, activation_threshold=activation_threshold, deactivation_threshold=deactivation_threshold, begin=segment.begin, end=segment.end, ) ) return new_segments def double_check_speech_segments(self, boundaries, audio_file, speech_th=0.5): """Takes in input the boundaries of the detected speech segments and double checks (using the neural VAD) that they actually contain speech. Arguments --------- boundaries: torch.Tensor torch.Tensor containing the boundaries of the speech segments. audio_file: path The original audio file used to compute vad_out. speech_th: float Threshold on the mean posterior probability over which speech is confirmed. Below that threshold, the segment is re-assigned to a non-speech region. Returns ------- new_boundaries The boundaries of the segments where speech activity is confirmed. """ if isinstance(audio_file, (str, Path)): # Getting the total size of the input file sample_rate, audio_len = self._get_audio_info(audio_file) if sample_rate != self.sample_rate: raise ValueError( "The detected sample rate is different from that set in the hparam file" ) else: sample_rate = self.sample_rate # Double check the segments new_boundaries = [] for i in range(boundaries.shape[0]): beg_sample = int(boundaries[i, 0] * sample_rate) end_sample = int(boundaries[i, 1] * sample_rate) len_seg = end_sample - beg_sample if not isinstance(audio_file, torch.Tensor): # Read the candidate speech segment segment, fs = torchaudio.load( str(audio_file), frame_offset=beg_sample, num_frames=len_seg ) else: segment = audio_file[:, beg_sample : beg_sample + len_seg] speech_prob = self.get_speech_prob_chunk(segment) if speech_prob.mean() > speech_th: # Accept this as a speech segment new_boundaries.append([boundaries[i, 0], boundaries[i, 1]]) # Convert boundaries from list to tensor new_boundaries = torch.FloatTensor(new_boundaries).to(boundaries.device) return new_boundaries def segment_utterance( self, segment: typing.Union[Segment, np.ndarray], apply_energy_vad: bool = False, min_pause_duration: float = 0.333, max_segment_length: float = 30, min_segment_length: float = 0.333, activation_threshold: float = 0.5, deactivation_threshold: float = 0.25, energy_activation_threshold: float = 0.5, energy_deactivation_threshold: float = 0.4, **kwargs, ) -> typing.List[typing.Union[Segment, CtmInterval]]: if isinstance(segment, Segment): y = torch.tensor(segment.wave[np.newaxis, :]) else: if len(segment.shape) == 1: y = torch.tensor(segment[np.newaxis, :]) elif not torch.is_tensor(segment): y = torch.tensor(segment) else: y = segment prob_chunks = self.get_speech_prob_chunk(y).float().cpu().numpy()[0, ...] # Compute the boundaries of the speech segments segments = self.generate_segments( prob_chunks, activation_threshold=activation_threshold, deactivation_threshold=deactivation_threshold, begin=segment.begin if isinstance(segment, Segment) and segment.begin is not None else None, end=segment.end if isinstance(segment, Segment) and segment.end is not None else None, ) # Apply energy-based VAD on the detected speech segments if apply_energy_vad: segments = self.energy_VAD( y, segments, activation_threshold=energy_activation_threshold, deactivation_threshold=energy_deactivation_threshold, ) # Merge short segments segments = merge_segments( segments, min_pause_duration=min_pause_duration, max_segment_length=max_segment_length, min_segment_length=min_segment_length, snap_boundaries=False, ) # Padding for i, s in enumerate(segments): begin, end = s.begin, s.end begin -= min_pause_duration / 2 end += min_pause_duration / 2 if i == 0: begin = max(begin, 0) if i == len(segments) - 1: end = min( end, segment.shape[0] / self.sample_rate if not isinstance(segment, Segment) else segment.end, ) s.begin = begin s.end = end if isinstance(segment, Segment): segments[i] = Segment(segment.file_path, s.begin, s.end, segment.channel) return segments def generate_segments( self, vad_prob, activation_threshold=0.5, deactivation_threshold=0.25, begin=None, end=None ): """Scans the frame-level speech probabilities and applies a threshold on them. Speech starts when a value larger than activation_th is detected, while it ends when observing a value lower than the deactivation_th. Arguments --------- vad_prob: numpy.ndarray Frame-level speech probabilities. activation_threshold: float Threshold for starting a speech segment. deactivation_threshold: float Threshold for ending a speech segment. Returns ------- segments: list[CtmInterval] Segments generated from VAD """ if begin is None: begin = 0 # Loop over batches and time steps is_active = vad_prob[0] > activation_threshold start = 0 boundaries = [] for time_step in range(1, vad_prob.shape[0] - 1): y = vad_prob[time_step] if is_active: if y < deactivation_threshold: e = self.time_resolution * (time_step - 1) boundaries.append(CtmInterval(start + begin, e + begin, "speech")) is_active = False elif y > activation_threshold: is_active = True start = self.time_resolution * time_step if is_active: if end is not None: e = end else: e = self.time_resolution * vad_prob.shape[0] e += begin boundaries.append(CtmInterval(start + begin, e, "speech")) return boundaries def get_speech_prob_chunk(self, wavs, wav_lens=None): """Outputs the frame-level posterior probability for the input audio chunks Outputs close to zero refers to time steps with a low probability of speech activity, while outputs closer to one likely contain speech. Arguments --------- wavs : torch.Tensor Batch of waveforms [batch, time, channels] or [batch, time] depending on the model. Make sure the sample rate is fs=16000 Hz. wav_lens : torch.Tensor Lengths of the waveforms relative to the longest one in the batch, tensor of shape [batch]. The longest one should have relative length 1.0 and others len(waveform) / max_length. Used for ignoring padding. Returns ------- torch.Tensor The encoded batch """ # Manage single waveforms in input if len(wavs.shape) == 1: wavs = wavs.unsqueeze(0) # Assign full length if wav_lens is not assigned if wav_lens is None: wav_lens = torch.ones(wavs.shape[0], device=self.device) # Storing waveform in the specified device wavs, wav_lens = wavs.to(self.device), wav_lens.to(self.device) wavs = wavs.float() # Computing features and embeddings feats = self.mods.compute_features(wavs) feats = self.mods.mean_var_norm(feats, wav_lens) outputs = self.mods.cnn(feats) outputs = outputs.reshape( outputs.shape[0], outputs.shape[1], outputs.shape[2] * outputs.shape[3], ) outputs, h = self.mods.rnn(outputs) outputs = self.mods.dnn(outputs) output_prob = torch.sigmoid(outputs) return output_prob def segment_for_whisper( self, segment: typing.Union[torch.Tensor, np.ndarray], apply_energy_vad: bool = True, max_segment_length: float = 30, min_segment_length: float = 0.333, min_pause_duration: float = 0.333, activation_threshold: float = 0.5, deactivation_threshold: float = 0.25, en_activation_threshold: float = 0.5, en_deactivation_threshold: float = 0.4, **kwargs, ) -> typing.List[typing.Dict[str, float]]: if isinstance(segment, Segment): y = torch.tensor(segment.wave[np.newaxis, :]) else: if len(segment.shape) == 1: y = torch.tensor(segment[np.newaxis, :]) elif not torch.is_tensor(segment): y = torch.tensor(segment) else: y = segment segments = self.segment_utterance( segment, apply_energy_vad=apply_energy_vad, max_segment_length=max_segment_length, min_segment_length=min_segment_length, min_pause_duration=min_pause_duration, activation_threshold=activation_threshold, deactivation_threshold=deactivation_threshold, en_activation_threshold=en_activation_threshold, en_deactivation_threshold=en_deactivation_threshold, **kwargs, ) # Padding segments_for_whisper = [] for i, s in enumerate(segments): begin, end = s.begin, s.end f1 = int(round(begin, 3) * self.sample_rate) f2 = int(round(end, 3) * self.sample_rate) segments_for_whisper.append( {"start": float(begin), "end": float(end), "inputs": y[0, f1:f2]} ) return segments_for_whisper class SegmenterMixin: def __init__( self, max_segment_length: float = 30, min_segment_length: float = 0.333, min_pause_duration: float = 0.333, activation_threshold: float = 0.5, deactivation_threshold: float = 0.25, energy_activation_threshold: float = 0.5, energy_deactivation_threshold: float = 0.4, **kwargs, ): self.max_segment_length = max_segment_length self.min_segment_length = min_segment_length self.min_pause_duration = min_pause_duration self.activation_threshold = activation_threshold self.deactivation_threshold = deactivation_threshold self.energy_activation_threshold = energy_activation_threshold self.energy_deactivation_threshold = energy_deactivation_threshold super().__init__(**kwargs) @property def segmentation_options(self) -> MetaDict: """Options for segmentation""" return { "max_segment_length": self.max_segment_length, "min_segment_length": self.min_segment_length, "activation_threshold": self.activation_threshold, "deactivation_threshold": self.deactivation_threshold, "energy_activation_threshold": self.energy_activation_threshold, "energy_deactivation_threshold": self.energy_deactivation_threshold, "min_pause_duration": self.min_pause_duration, } class SpeechbrainSegmenterMixin(SegmenterMixin): def __init__( self, apply_energy_vad: bool = True, double_check: bool = False, speech_threshold: float = 0.5, cuda: bool = False, **kwargs, ): if not FOUND_SPEECHBRAIN: logger.error( "Could not import speechbrain, please ensure it is installed via `pip install speechbrain`" ) sys.exit(1) super().__init__(**kwargs) self.apply_energy_vad = apply_energy_vad self.double_check = double_check self.speech_threshold = speech_threshold self.cuda = cuda self.speechbrain = True self.vad_model = None model_dir = os.path.join(config.TEMPORARY_DIRECTORY, "models", "VAD") os.makedirs(model_dir, exist_ok=True) run_opts = None if self.cuda: run_opts = {"device": "cuda"} self.vad_model = MfaVAD.from_hparams( source="speechbrain/vad-crdnn-libriparty", savedir=model_dir, run_opts=run_opts ) @property def segmentation_options(self) -> MetaDict: """Options for segmentation""" options = super().segmentation_options options.update( { "apply_energy_vad": self.apply_energy_vad, "double_check": self.double_check, "speech_threshold": self.speech_threshold, } ) return options