"""Model classes for Voice Activity Detection"""
from __future__ import annotations
import logging
import os
import sys
import typing
import warnings
from pathlib import Path
import numpy as np
from kalpy.data import Segment
from montreal_forced_aligner import config
from montreal_forced_aligner.data import CtmInterval
if typing.TYPE_CHECKING:
from montreal_forced_aligner.abc import MetaDict
try:
with warnings.catch_warnings():
warnings.simplefilter("ignore")
torch_logger = logging.getLogger("speechbrain.utils.torch_audio_backend")
torch_logger.setLevel(logging.ERROR)
torch_logger = logging.getLogger("speechbrain.utils.train_logger")
torch_logger.setLevel(logging.ERROR)
import torch
import torchaudio
try:
from speechbrain.pretrained import VAD
except ImportError: # speechbrain 1.0
from speechbrain.inference.VAD import VAD
FOUND_SPEECHBRAIN = True
except (ImportError, OSError):
FOUND_SPEECHBRAIN = False
VAD = object
logger = logging.getLogger("mfa")
[docs]
def get_initial_segmentation(frames: np.ndarray, frame_shift: float) -> typing.List[CtmInterval]:
"""
Compute initial segmentation over voice activity
Parameters
----------
frames: list[Union[int, str]]
List of frames with VAD output
frame_shift: float
Frame shift of features in seconds
Returns
-------
List[CtmInterval]
Initial segmentation
"""
segments = []
cur_segment = None
silent_frames = 0
non_silent_frames = 0
for i in range(frames.shape[0]):
f = frames[i]
if int(f) > 0:
non_silent_frames += 1
if cur_segment is None:
cur_segment = CtmInterval(begin=i * frame_shift, end=0, label="speech")
else:
silent_frames += 1
if cur_segment is not None:
cur_segment.end = (i - 1) * frame_shift
segments.append(cur_segment)
cur_segment = None
if cur_segment is not None:
cur_segment.end = len(frames) * frame_shift
segments.append(cur_segment)
return segments
[docs]
def merge_segments(
segments: typing.List[CtmInterval],
min_pause_duration: float,
max_segment_length: float,
min_segment_length: float,
snap_boundaries: bool = True,
) -> typing.List[CtmInterval]:
"""
Merge segments together
Parameters
----------
segments: SegmentationType
Initial segments
min_pause_duration: float
Minimum amount of silence time to mark an utterance boundary
max_segment_length: float
Maximum length of segments before they're broken up
min_segment_length: float
Minimum length of segments returned
Returns
-------
List[CtmInterval]
Merged segments
"""
merged_segments = []
snap_boundary_threshold = 0
if snap_boundaries:
snap_boundary_threshold = min_pause_duration / 2
for s in segments:
if (
not merged_segments
or s.begin > merged_segments[-1].end + min_pause_duration
or s.end - merged_segments[-1].begin > max_segment_length
):
if merged_segments and snap_boundary_threshold:
boundary_gap = s.begin - merged_segments[-1].end
if boundary_gap < snap_boundary_threshold:
half_boundary = boundary_gap / 2
else:
half_boundary = snap_boundary_threshold / 2
merged_segments[-1].end += half_boundary
s.begin -= half_boundary
merged_segments.append(s)
else:
merged_segments[-1].end = s.end
return [x for x in merged_segments if x.end - x.begin > min_segment_length]
class MfaVAD(VAD):
def energy_VAD(
self,
audio_file: typing.Union[str, Path, np.ndarray, torch.Tensor],
segments,
activation_threshold=0.5,
deactivation_threshold=0.0,
eps=1e-6,
):
"""Applies energy-based VAD within the detected speech segments.The neural
network VAD often creates longer segments and tends to merge segments that
are close with each other.
The energy VAD post-processes can be useful for having a fine-grained voice
activity detection.
The energy VAD computes the energy within the small chunks. The energy is
normalized within the segment to have mean 0.5 and +-0.5 of std.
This helps to set the energy threshold.
Arguments
---------
audio_file: path
Path of the audio file containing the recording. The file is read
with torchaudio.
segments: list[CtmInterval]
torch.Tensor containing the speech boundaries. It can be derived using the
get_boundaries method.
activation_threshold: float
A new speech segment is started it the energy is above activation_th.
deactivation_threshold: float
The segment is considered ended when the energy is <= deactivation_th.
eps: float
Small constant for numerical stability.
Returns
-------
new_boundaries
The new boundaries that are post-processed by the energy VAD.
"""
if isinstance(audio_file, (str, Path)):
# Getting the total size of the input file
sample_rate, audio_len = self._get_audio_info(audio_file)
if sample_rate != self.sample_rate:
raise ValueError(
"The detected sample rate is different from that set in the hparam file"
)
else:
sample_rate = self.sample_rate
# Computing the chunk length of the energy window
chunk_len = int(self.time_resolution * sample_rate)
new_segments = []
# Processing speech segments
for segment in segments:
begin_sample = int(segment.begin * sample_rate)
end_sample = int(segment.end * sample_rate)
seg_len = end_sample - begin_sample
if seg_len < chunk_len:
continue
if not isinstance(audio_file, torch.Tensor):
# Reading the speech segment
audio, _ = torchaudio.load(
audio_file, frame_offset=begin_sample, num_frames=seg_len
)
else:
audio = audio_file[:, begin_sample : begin_sample + seg_len]
# Create chunks
segment_chunks = self.create_chunks(
audio, chunk_size=chunk_len, chunk_stride=chunk_len
)
# Energy computation within each chunk
energy_chunks = segment_chunks.abs().sum(-1) + eps
energy_chunks = energy_chunks.log()
# Energy normalization
energy_chunks = (
(energy_chunks - energy_chunks.mean()) / (2 * energy_chunks.std())
) + 0.5
energy_chunks = energy_chunks
# Apply threshold based on the energy value
new_segments.extend(
self.generate_segments(
energy_chunks,
activation_threshold=activation_threshold,
deactivation_threshold=deactivation_threshold,
begin=segment.begin,
end=segment.end,
)
)
return new_segments
def double_check_speech_segments(self, boundaries, audio_file, speech_th=0.5):
"""Takes in input the boundaries of the detected speech segments and
double checks (using the neural VAD) that they actually contain speech.
Arguments
---------
boundaries: torch.Tensor
torch.Tensor containing the boundaries of the speech segments.
audio_file: path
The original audio file used to compute vad_out.
speech_th: float
Threshold on the mean posterior probability over which speech is
confirmed. Below that threshold, the segment is re-assigned to a
non-speech region.
Returns
-------
new_boundaries
The boundaries of the segments where speech activity is confirmed.
"""
if isinstance(audio_file, (str, Path)):
# Getting the total size of the input file
sample_rate, audio_len = self._get_audio_info(audio_file)
if sample_rate != self.sample_rate:
raise ValueError(
"The detected sample rate is different from that set in the hparam file"
)
else:
sample_rate = self.sample_rate
# Double check the segments
new_boundaries = []
for i in range(boundaries.shape[0]):
beg_sample = int(boundaries[i, 0] * sample_rate)
end_sample = int(boundaries[i, 1] * sample_rate)
len_seg = end_sample - beg_sample
if not isinstance(audio_file, torch.Tensor):
# Read the candidate speech segment
segment, fs = torchaudio.load(
str(audio_file), frame_offset=beg_sample, num_frames=len_seg
)
else:
segment = audio_file[:, beg_sample : beg_sample + len_seg]
speech_prob = self.get_speech_prob_chunk(segment)
if speech_prob.mean() > speech_th:
# Accept this as a speech segment
new_boundaries.append([boundaries[i, 0], boundaries[i, 1]])
# Convert boundaries from list to tensor
new_boundaries = torch.FloatTensor(new_boundaries).to(boundaries.device)
return new_boundaries
def segment_utterance(
self,
segment: typing.Union[Segment, np.ndarray],
apply_energy_vad: bool = False,
min_pause_duration: float = 0.333,
max_segment_length: float = 30,
min_segment_length: float = 0.333,
activation_threshold: float = 0.5,
deactivation_threshold: float = 0.25,
energy_activation_threshold: float = 0.5,
energy_deactivation_threshold: float = 0.4,
**kwargs,
) -> typing.List[Segment]:
if isinstance(segment, Segment):
y = torch.tensor(segment.wave[np.newaxis, :])
else:
if len(segment.shape) == 1:
y = torch.tensor(segment[np.newaxis, :])
elif not torch.is_tensor(segment):
y = torch.tensor(segment)
else:
y = segment
prob_chunks = self.get_speech_prob_chunk(y).float().cpu().numpy()[0, ...]
# Compute the boundaries of the speech segments
segments = self.generate_segments(
prob_chunks,
activation_threshold=activation_threshold,
deactivation_threshold=deactivation_threshold,
begin=segment.begin
if isinstance(segment, Segment) and segment.begin is not None
else None,
end=segment.end if isinstance(segment, Segment) and segment.end is not None else None,
)
# Apply energy-based VAD on the detected speech segments
if apply_energy_vad:
segments = self.energy_VAD(
y,
segments,
activation_threshold=energy_activation_threshold,
deactivation_threshold=energy_deactivation_threshold,
)
# Merge short segments
segments = merge_segments(
segments,
min_pause_duration=min_pause_duration,
max_segment_length=max_segment_length,
min_segment_length=min_segment_length,
snap_boundaries=False,
)
# Padding
for i, s in enumerate(segments):
begin, end = s.begin, s.end
begin -= min_pause_duration / 2
end += min_pause_duration / 2
if i == 0:
begin = max(begin, 0)
if i == len(segments) - 1:
end = min(
end,
segment.shape[0] / self.sample_rate
if not isinstance(segment, Segment)
else segment.end,
)
s.begin = begin
s.end = end
if isinstance(segment, Segment):
segments[i] = Segment(segment.file_path, s.begin, s.end, segment.channel)
return segments
def generate_segments(
self, vad_prob, activation_threshold=0.5, deactivation_threshold=0.25, begin=None, end=None
):
"""Scans the frame-level speech probabilities and applies a threshold
on them. Speech starts when a value larger than activation_th is
detected, while it ends when observing a value lower than
the deactivation_th.
Arguments
---------
vad_prob: numpy.ndarray
Frame-level speech probabilities.
activation_threshold: float
Threshold for starting a speech segment.
deactivation_threshold: float
Threshold for ending a speech segment.
Returns
-------
vad_th: torch.Tensor
torch.Tensor containing 1 for speech regions and 0 for non-speech regions.
"""
if begin is None:
begin = 0
# Loop over batches and time steps
is_active = vad_prob[0] > activation_threshold
start = 0
boundaries = []
for time_step in range(1, vad_prob.shape[0] - 1):
y = vad_prob[time_step]
if is_active:
if y < deactivation_threshold:
e = self.time_resolution * (time_step - 1)
boundaries.append(
CtmInterval(begin=start + begin, end=e + begin, label="speech")
)
is_active = False
elif y > activation_threshold:
is_active = True
start = self.time_resolution * time_step
if is_active:
if end is not None:
e = end
else:
e = self.time_resolution * vad_prob.shape[0]
e += begin
boundaries.append(CtmInterval(begin=start + begin, end=e, label="speech"))
return boundaries
def get_speech_prob_chunk(self, wavs, wav_lens=None):
"""Outputs the frame-level posterior probability for the input audio chunks
Outputs close to zero refers to time steps with a low probability of speech
activity, while outputs closer to one likely contain speech.
Arguments
---------
wavs : torch.Tensor
Batch of waveforms [batch, time, channels] or [batch, time]
depending on the model. Make sure the sample rate is fs=16000 Hz.
wav_lens : torch.Tensor
Lengths of the waveforms relative to the longest one in the
batch, tensor of shape [batch]. The longest one should have
relative length 1.0 and others len(waveform) / max_length.
Used for ignoring padding.
Returns
-------
torch.Tensor
The encoded batch
"""
# Manage single waveforms in input
if len(wavs.shape) == 1:
wavs = wavs.unsqueeze(0)
# Assign full length if wav_lens is not assigned
if wav_lens is None:
wav_lens = torch.ones(wavs.shape[0], device=self.device)
# Storing waveform in the specified device
wavs, wav_lens = wavs.to(self.device), wav_lens.to(self.device)
wavs = wavs.float()
# Computing features and embeddings
feats = self.mods.compute_features(wavs)
feats = self.mods.mean_var_norm(feats, wav_lens)
outputs = self.mods.cnn(feats)
outputs = outputs.reshape(
outputs.shape[0],
outputs.shape[1],
outputs.shape[2] * outputs.shape[3],
)
outputs, h = self.mods.rnn(outputs)
outputs = self.mods.dnn(outputs)
output_prob = torch.sigmoid(outputs)
return output_prob
def segment_for_whisper(
self,
segment: typing.Union[torch.Tensor, np.ndarray],
apply_energy_vad: bool = True,
max_segment_length: float = 30,
min_segment_length: float = 0.333,
min_pause_duration: float = 0.333,
activation_threshold: float = 0.5,
deactivation_threshold: float = 0.25,
en_activation_threshold: float = 0.5,
en_deactivation_threshold: float = 0.4,
**kwargs,
) -> typing.List[typing.Dict[str, float]]:
if isinstance(segment, Segment):
y = torch.tensor(segment.wave[np.newaxis, :])
else:
if len(segment.shape) == 1:
y = torch.tensor(segment[np.newaxis, :])
elif not torch.is_tensor(segment):
y = torch.tensor(segment)
else:
y = segment
segments = self.segment_utterance(
segment,
apply_energy_vad=apply_energy_vad,
max_segment_length=max_segment_length,
min_segment_length=min_segment_length,
min_pause_duration=min_pause_duration,
activation_threshold=activation_threshold,
deactivation_threshold=deactivation_threshold,
en_activation_threshold=en_activation_threshold,
en_deactivation_threshold=en_deactivation_threshold,
**kwargs,
)
# Padding
segments_for_whisper = []
for i, s in enumerate(segments):
begin, end = s.begin, s.end
f1 = int(round(begin, 3) * self.sample_rate)
f2 = int(round(end, 3) * self.sample_rate)
segments_for_whisper.append(
{"start": float(begin), "end": float(end), "inputs": y[0, f1:f2]}
)
return segments_for_whisper
class SegmenterMixin:
def __init__(
self,
max_segment_length: float = 30,
min_segment_length: float = 0.333,
min_pause_duration: float = 0.333,
activation_threshold: float = 0.5,
deactivation_threshold: float = 0.25,
energy_activation_threshold: float = 0.5,
energy_deactivation_threshold: float = 0.4,
**kwargs,
):
self.max_segment_length = max_segment_length
self.min_segment_length = min_segment_length
self.min_pause_duration = min_pause_duration
self.activation_threshold = activation_threshold
self.deactivation_threshold = deactivation_threshold
self.energy_activation_threshold = energy_activation_threshold
self.energy_deactivation_threshold = energy_deactivation_threshold
super().__init__(**kwargs)
@property
def segmentation_options(self) -> MetaDict:
"""Options for segmentation"""
return {
"max_segment_length": self.max_segment_length,
"min_segment_length": self.min_segment_length,
"activation_threshold": self.activation_threshold,
"deactivation_threshold": self.deactivation_threshold,
"energy_activation_threshold": self.energy_activation_threshold,
"energy_deactivation_threshold": self.energy_deactivation_threshold,
"min_pause_duration": self.min_pause_duration,
}
class SpeechbrainSegmenterMixin(SegmenterMixin):
def __init__(
self,
apply_energy_vad: bool = True,
double_check: bool = False,
speech_threshold: float = 0.5,
cuda: bool = False,
**kwargs,
):
if not FOUND_SPEECHBRAIN:
logger.error(
"Could not import speechbrain, please ensure it is installed via `pip install speechbrain`"
)
sys.exit(1)
super().__init__(**kwargs)
self.apply_energy_vad = apply_energy_vad
self.double_check = double_check
self.speech_threshold = speech_threshold
self.cuda = cuda
self.speechbrain = True
self.vad_model = None
model_dir = os.path.join(config.TEMPORARY_DIRECTORY, "models", "VAD")
os.makedirs(model_dir, exist_ok=True)
run_opts = None
if self.cuda:
run_opts = {"device": "cuda"}
self.vad_model = MfaVAD.from_hparams(
source="speechbrain/vad-crdnn-libriparty", savedir=model_dir, run_opts=run_opts
)
@property
def segmentation_options(self) -> MetaDict:
"""Options for segmentation"""
options = super().segmentation_options
options.update(
{
"apply_energy_vad": self.apply_energy_vad,
"double_check": self.double_check,
"speech_threshold": self.speech_threshold,
}
)
return options