"""Classes for configuring feature generation"""
from __future__ import annotations
import io
import logging
import math
import os
import re
import subprocess
import typing
from abc import abstractmethod
from io import BytesIO
from pathlib import Path
from typing import TYPE_CHECKING, Any, Dict, List, Union
import dataclassy
import librosa
import numba
import numpy as np
import soundfile
import sqlalchemy
from numba import njit
from scipy.sparse import csr_matrix
from sqlalchemy.orm import Session, joinedload
from montreal_forced_aligner.abc import KaldiFunction
from montreal_forced_aligner.config import IVECTOR_DIMENSION, PLDA_DIMENSION
from montreal_forced_aligner.data import M_LOG_2PI, MfaArguments
from montreal_forced_aligner.db import File, Job, SoundFile, Utterance
from montreal_forced_aligner.exceptions import KaldiProcessingError
from montreal_forced_aligner.helper import mfa_open
from montreal_forced_aligner.utils import read_feats, thirdparty_binary
if TYPE_CHECKING:
SpeakerCharacterType = Union[str, int]
from montreal_forced_aligner.abc import MetaDict
__all__ = [
"FeatureConfigMixin",
"VadConfigMixin",
"IvectorConfigMixin",
"CalcFmllrFunction",
"ComputeVadFunction",
"VadArguments",
"MfccFunction",
"MfccArguments",
"CalcFmllrArguments",
"ExtractIvectorsFunction",
"ExtractIvectorsArguments",
"PldaModel",
"plda_distance",
"plda_log_likelihood",
"score_plda",
"online_feature_proc",
"compute_transform_process",
]
logger = logging.getLogger("mfa")
# noinspection PyUnresolvedReferences
[docs]
@dataclassy.dataclass(slots=True)
class VadArguments(MfaArguments):
"""Arguments for :class:`~montreal_forced_aligner.corpus.features.ComputeVadFunction`"""
feats_scp_path: Path
vad_scp_path: Path
vad_options: MetaDict
# noinspection PyUnresolvedReferences
[docs]
@dataclassy.dataclass(slots=True)
class MfccArguments(MfaArguments):
"""
Arguments for :class:`~montreal_forced_aligner.corpus.features.MfccFunction`
"""
data_directory: Path
mfcc_options: MetaDict
pitch_options: MetaDict
# noinspection PyUnresolvedReferences
@dataclassy.dataclass(slots=True)
class FinalFeatureArguments(MfaArguments):
"""
Arguments for :class:`~montreal_forced_aligner.corpus.features.FinalFeatureFunction`
"""
data_directory: Path
uses_cmvn: bool
voiced_only: bool
subsample_feats: int
# noinspection PyUnresolvedReferences
@dataclassy.dataclass(slots=True)
class PitchArguments(MfaArguments):
"""
Arguments for :class:`~montreal_forced_aligner.corpus.features.MfccFunction`
"""
data_directory: Path
pitch_options: MetaDict
# noinspection PyUnresolvedReferences
@dataclassy.dataclass(slots=True)
class PitchRangeArguments(MfaArguments):
"""
Arguments for :class:`~montreal_forced_aligner.corpus.features.MfccFunction`
"""
data_directory: Path
pitch_options: MetaDict
# noinspection PyUnresolvedReferences
[docs]
@dataclassy.dataclass(slots=True)
class CalcFmllrArguments(MfaArguments):
"""Arguments for :class:`~montreal_forced_aligner.corpus.features.CalcFmllrFunction`"""
dictionaries: List[str]
feature_strings: Dict[str, str]
ali_paths: Dict[str, Path]
ali_model_path: Path
model_path: Path
spk2utt_paths: Dict[str, Path]
trans_paths: Dict[str, Path]
fmllr_options: MetaDict
# noinspection PyUnresolvedReferences
# noinspection PyUnresolvedReferences
@dataclassy.dataclass(slots=True)
class ExportIvectorsArguments(MfaArguments):
"""Arguments for :class:`~montreal_forced_aligner.corpus.features.ExportIvectorsFunction`"""
use_xvector: bool
def feature_make_safe(value: Any) -> str:
"""
Transform an arbitrary value into a string
Parameters
----------
value: Any
Value to make safe
Returns
-------
str
Safe value
"""
if isinstance(value, bool):
return str(value).lower()
return str(value)
def compute_mfcc_process(
log_file: io.FileIO,
wav_path: Path,
segments: typing.Union[str, subprocess.Popen, subprocess.PIPE],
mfcc_options: MetaDict,
min_length=0.1,
) -> subprocess.Popen:
"""
Construct processes for computing features
Parameters
----------
log_file: io.FileIO
File for logging stderr
wav_path: str
Wav scp to use
segments: str
Segments scp to use
mfcc_options: dict[str, Any]
Options for computing MFCC features
min_length: float
Minimum length of segments in seconds
Returns
-------
subprocess.Popen
MFCC process
"""
mfcc_base_command = [thirdparty_binary("compute-mfcc-feats")]
for k, v in mfcc_options.items():
mfcc_base_command.append(f"--{k.replace('_', '-')}={feature_make_safe(v)}")
if isinstance(segments, str) and os.path.exists(segments):
mfcc_base_command += ["ark:-", "ark,t:-"]
seg_proc = subprocess.Popen(
[
thirdparty_binary("extract-segments"),
f"--min-segment-length={min_length}",
f"scp:{wav_path}",
segments,
"ark:-",
],
stdout=subprocess.PIPE,
stderr=log_file,
env=os.environ,
)
mfcc_proc = subprocess.Popen(
mfcc_base_command,
stdout=subprocess.PIPE,
stderr=log_file,
stdin=seg_proc.stdout,
env=os.environ,
)
elif isinstance(segments, subprocess.Popen):
mfcc_base_command += ["ark,s,cs:-", "ark,t:-"]
mfcc_proc = subprocess.Popen(
mfcc_base_command,
stdout=subprocess.PIPE,
stderr=log_file,
stdin=segments.stdout,
env=os.environ,
)
elif segments == subprocess.PIPE:
mfcc_base_command += ["ark,s,cs:-", "ark,t:-"]
mfcc_proc = subprocess.Popen(
mfcc_base_command,
stdout=subprocess.PIPE,
stderr=log_file,
stdin=segments,
env=os.environ,
)
else:
mfcc_base_command += [f"scp,p:{wav_path}", "ark:-"]
mfcc_proc = subprocess.Popen(
mfcc_base_command,
stdout=subprocess.PIPE,
stderr=log_file,
env=os.environ,
)
return mfcc_proc
def compute_pitch_process(
log_file: io.FileIO,
wav_path: Path,
segments: typing.Union[str, subprocess.Popen, subprocess.PIPE],
pitch_options: MetaDict,
min_length=0.1,
) -> subprocess.Popen:
"""
Construct processes for computing features
Parameters
----------
log_file: io.FileIO
File for logging stderr
wav_path: str
Wav scp to use
segments: str
Segments scp to use
pitch_options: dict[str, Any]
Options for computing pitch features
min_length: float
Minimum length of segments in seconds
Returns
-------
subprocess.Popen
Pitch process
"""
use_pitch = pitch_options.pop("use-pitch")
use_voicing = pitch_options.pop("use-voicing")
use_delta_pitch = pitch_options.pop("use-delta-pitch")
normalize = pitch_options.pop("normalize", True)
pitch_command = [
thirdparty_binary("compute-and-process-kaldi-pitch-feats"),
]
for k, v in pitch_options.items():
pitch_command.append(f"--{k.replace('_', '-')}={feature_make_safe(v)}")
if k == "delta-pitch":
pitch_command.append(f"--delta-pitch-noise-stddev={feature_make_safe(v)}")
if use_pitch:
if normalize:
pitch_command.append("--add-normalized-log-pitch=true")
else:
pitch_command.append("--add-raw-log-pitch=true")
else:
pitch_command.append("--add-normalized-log-pitch=false")
pitch_command.append("--add-raw-log-pitch=false")
if use_delta_pitch:
pitch_command.append("--add-delta-pitch=true")
pitch_command.append("--add-pov-feature=true")
else:
pitch_command.append("--add-delta-pitch=false")
if use_voicing:
pitch_command.append("--add-pov-feature=true")
else:
pitch_command.append("--add-pov-feature=false")
if isinstance(segments, str) and os.path.exists(segments):
pitch_command += ["ark:-", "ark,t:-"]
seg_proc = subprocess.Popen(
[
thirdparty_binary("extract-segments"),
f"--min-segment-length={min_length}",
f"scp:{wav_path}",
segments,
"ark:-",
],
stdout=subprocess.PIPE,
stderr=log_file,
env=os.environ,
)
pitch_proc = subprocess.Popen(
pitch_command,
stdout=subprocess.PIPE,
stderr=log_file,
stdin=seg_proc.stdout,
env=os.environ,
)
elif isinstance(segments, subprocess.Popen):
pitch_command += ["ark:-", "ark,t:-"]
pitch_proc = subprocess.Popen(
pitch_command,
stdout=subprocess.PIPE,
stderr=log_file,
stdin=segments.stdout,
env=os.environ,
)
elif segments == subprocess.PIPE:
pitch_command += ["ark:-", "ark,t:-"]
pitch_proc = subprocess.Popen(
pitch_command,
stdout=subprocess.PIPE,
stderr=log_file,
stdin=segments,
env=os.environ,
)
else:
pitch_command += [f"scp,p:{wav_path}", "ark,t:-"]
pitch_proc = subprocess.Popen(
pitch_command,
stdout=subprocess.PIPE,
stderr=log_file,
env=os.environ,
)
return pitch_proc
def compute_transform_process(
log_file: io.FileIO,
feat_proc: typing.Union[subprocess.Popen, Path],
lda_mat_path: typing.Optional[Path],
lda_options: MetaDict,
fmllr_path: Path = None,
utt2spk_path: Path = None,
) -> subprocess.Popen:
"""
Construct feature transformation process
Parameters
----------
log_file: io.FileIO
File for logging stderr
feat_proc: subprocess.Popen
Feature generation process
lda_mat_path: :class:`~pathlib.Path`
LDA matrix file path
lda_options: dict[str, Any]
Options for LDA
fmllr_path: :class:`~pathlib.Path`, optional
fMLLR transform file path
utt2spk_path: :class:`~pathlib.Path`, optional
Utterance to speaker SCP file path
Returns
-------
subprocess.Popen
Processing for transforming features
"""
if isinstance(feat_proc, (str, Path)):
feat_input = f"ark,s,cs:{feat_proc}"
use_stdin = False
else:
feat_input = "ark,s,cs:-"
use_stdin = True
if lda_mat_path is not None:
splice_proc = subprocess.Popen(
[
"splice-feats",
f'--left-context={lda_options["splice_left_context"]}',
f'--right-context={lda_options["splice_right_context"]}',
feat_input,
"ark:-",
],
env=os.environ,
stdin=feat_proc.stdout if use_stdin else None,
stdout=subprocess.PIPE,
stderr=log_file,
)
delta_proc = subprocess.Popen(
["transform-feats", lda_mat_path, "ark,s,cs:-", "ark:-"],
env=os.environ,
stdin=splice_proc.stdout,
stdout=subprocess.PIPE,
stderr=log_file,
)
else:
delta_proc = subprocess.Popen(
["add-deltas", feat_input, "ark:-"],
env=os.environ,
stdin=feat_proc.stdout if use_stdin else None,
stdout=subprocess.PIPE,
stderr=log_file,
)
if fmllr_path is None or not fmllr_path.exists():
return delta_proc
if fmllr_path.suffix == ".scp":
fmllr_ark = f"scp:{fmllr_path}"
else:
fmllr_ark = f"ark:{fmllr_path}"
fmllr_proc = subprocess.Popen(
[
"transform-feats",
f"--utt2spk=ark:{utt2spk_path}",
fmllr_ark,
"ark,s,cs:-",
"ark,t:-",
],
env=os.environ,
stdin=delta_proc.stdout,
stdout=subprocess.PIPE,
stderr=log_file,
)
return fmllr_proc
[docs]
class MfccFunction(KaldiFunction):
"""
Multiprocessing function for generating MFCC features
See Also
--------
:meth:`.AcousticCorpusMixin.mfcc`
Main function that calls this function in parallel
:meth:`.AcousticCorpusMixin.mfcc_arguments`
Job method for generating arguments for this function
:kaldi_src:`compute-mfcc-feats`
Relevant Kaldi binary
:kaldi_src:`extract-segments`
Relevant Kaldi binary
:kaldi_src:`copy-feats`
Relevant Kaldi binary
:kaldi_src:`feat-to-len`
Relevant Kaldi binary
Parameters
----------
args: :class:`~montreal_forced_aligner.corpus.features.MfccArguments`
Arguments for the function
"""
progress_pattern = re.compile(r"^LOG.* Processed (?P<num_utterances>\d+) utterances")
def __init__(self, args: MfccArguments):
super().__init__(args)
self.data_directory = args.data_directory
self.pitch_options = args.pitch_options
self.mfcc_options = args.mfcc_options
def _run(self) -> typing.Generator[int]:
"""Run the function"""
with Session(self.db_engine()) as session, mfa_open(self.log_path, "w") as log_file:
log_file.write(f"Using: {self.db_string}\n")
job: typing.Optional[Job] = session.get(Job, self.job_name)
feats_scp_path = job.construct_path(self.data_directory, "feats", "scp")
pitch_scp_path = job.construct_path(self.data_directory, "pitch", "scp")
wav_path = job.construct_path(self.data_directory, "wav", "scp")
raw_ark_path = job.construct_path(self.data_directory, "feats", "ark")
raw_pitch_ark_path = job.construct_path(self.data_directory, "pitch", "ark")
if os.path.exists(raw_ark_path):
return
mfcc_proc = compute_mfcc_process(
log_file, wav_path, subprocess.PIPE, self.mfcc_options
)
mfcc_copy_proc = subprocess.Popen(
[
thirdparty_binary("copy-feats"),
"--compress=true",
"ark:-",
f"ark,scp:{raw_ark_path},{feats_scp_path}",
],
stdin=mfcc_proc.stdout,
stderr=log_file,
env=os.environ,
)
use_pitch = self.pitch_options["use-pitch"] or self.pitch_options["use-voicing"]
if use_pitch:
pitch_proc = compute_pitch_process(
log_file, wav_path, subprocess.PIPE, self.pitch_options
)
pitch_copy_proc = subprocess.Popen(
[
thirdparty_binary("copy-feats"),
"--compress=true",
"ark:-",
f"ark,scp:{raw_pitch_ark_path},{pitch_scp_path}",
],
stdin=pitch_proc.stdout,
stderr=log_file,
env=os.environ,
)
min_length = 0.1
utterances = (
session.query(Utterance, SoundFile)
.join(Utterance.file)
.join(File.sound_file)
.filter(
Utterance.job_id == self.job_name,
Utterance.ignored == False, # noqa
Utterance.duration >= min_length,
)
.order_by(Utterance.kaldi_id)
)
for u, sf in utterances:
wave, _ = librosa.load(
sf.sound_file_path,
sr=16000,
offset=u.begin,
duration=u.duration,
mono=False,
)
if len(wave.shape) == 2:
wave = wave[u.channel, :]
bio = BytesIO()
soundfile.write(bio, wave, samplerate=16000, format="WAV")
mfcc_proc.stdin.write(f"{u.kaldi_id}\t".encode("utf8"))
mfcc_proc.stdin.write(bio.getvalue())
mfcc_proc.stdin.flush()
if use_pitch:
pitch_proc.stdin.write(f"{u.kaldi_id}\t".encode("utf8"))
pitch_proc.stdin.write(bio.getvalue())
pitch_proc.stdin.flush()
yield 1
mfcc_proc.stdin.close()
if use_pitch:
pitch_proc.stdin.close()
mfcc_proc.wait()
if use_pitch:
pitch_proc.wait()
self.check_call(mfcc_copy_proc)
if use_pitch:
self.check_call(pitch_copy_proc)
class FinalFeatureFunction(KaldiFunction):
"""
Multiprocessing function for generating MFCC features
See Also
--------
:meth:`.AcousticCorpusMixin.mfcc`
Main function that calls this function in parallel
:meth:`.AcousticCorpusMixin.mfcc_arguments`
Job method for generating arguments for this function
:kaldi_src:`compute-mfcc-feats`
Relevant Kaldi binary
:kaldi_src:`extract-segments`
Relevant Kaldi binary
:kaldi_src:`copy-feats`
Relevant Kaldi binary
:kaldi_src:`feat-to-len`
Relevant Kaldi binary
Parameters
----------
args: :class:`~montreal_forced_aligner.corpus.features.MfccArguments`
Arguments for the function
"""
progress_pattern = re.compile(r"^LOG.* Processed (?P<num_utterances>\d+) utterances")
def __init__(self, args: FinalFeatureArguments):
super().__init__(args)
self.data_directory = args.data_directory
self.voiced_only = args.voiced_only
self.uses_cmvn = args.uses_cmvn
self.subsample_feats = args.subsample_feats
def _run(self) -> typing.Generator[int]:
"""Run the function"""
with Session(self.db_engine()) as session, mfa_open(self.log_path, "w") as log_file:
job: typing.Optional[Job] = session.get(Job, self.job_name)
feats_scp_path = job.construct_path(self.data_directory, "feats", "scp")
temp_scp_path = job.construct_path(self.data_directory, "final_features", "scp")
utt2spk_path = job.construct_path(self.data_directory, "utt2spk", "scp")
cmvn_scp_path = job.construct_path(self.data_directory, "cmvn", "scp")
pitch_scp_path = job.construct_path(self.data_directory, "pitch", "scp")
pitch_ark_path = job.construct_path(self.data_directory, "pitch", "ark")
vad_scp_path = job.construct_path(self.data_directory, "vad", "scp")
raw_ark_path = job.construct_path(self.data_directory, "feats", "ark")
temp_ark_path = job.construct_path(self.data_directory, "final_features", "ark")
if os.path.exists(cmvn_scp_path):
cmvn_proc = subprocess.Popen(
[
thirdparty_binary("apply-cmvn"),
f"--utt2spk=ark:{utt2spk_path}",
f"scp:{cmvn_scp_path}",
f"scp:{feats_scp_path}",
"ark:-",
],
stdout=subprocess.PIPE,
stderr=log_file,
env=os.environ,
)
else:
cmvn_proc = subprocess.Popen(
[
thirdparty_binary("apply-cmvn-sliding"),
"--norm-vars=false",
"--center=true",
"--cmn-window=300",
f"scp:{feats_scp_path}",
"ark:-",
],
stdout=subprocess.PIPE,
stderr=log_file,
env=os.environ,
)
if os.path.exists(pitch_scp_path):
paste_proc = subprocess.Popen(
[
thirdparty_binary("paste-feats"),
"--length-tolerance=2",
"ark:-",
f"scp:{pitch_scp_path}",
"ark:-",
],
stdin=cmvn_proc.stdout,
stdout=subprocess.PIPE,
stderr=log_file,
env=os.environ,
)
else:
paste_proc = cmvn_proc
if self.voiced_only and os.path.exists(vad_scp_path):
voiced_proc = subprocess.Popen(
[
thirdparty_binary("select-voiced-frames"),
"ark:-",
f"scp:{vad_scp_path}",
"ark:-",
],
stdin=paste_proc.stdout,
stdout=subprocess.PIPE,
stderr=log_file,
env=os.environ,
)
if self.subsample_feats:
final_proc = subprocess.Popen(
[
thirdparty_binary("subsample-feats"),
f"--n={self.subsample_feats}",
"ark:-",
"ark:-",
],
stdin=voiced_proc.stdout,
stdout=subprocess.PIPE,
stderr=log_file,
env=os.environ,
)
else:
final_proc = voiced_proc
else:
final_proc = paste_proc
copy_proc = subprocess.Popen(
[
thirdparty_binary("copy-feats"),
"--compress=true",
"ark:-",
f"ark,scp:{temp_ark_path},{temp_scp_path}",
],
stdin=subprocess.PIPE,
stderr=log_file,
env=os.environ,
)
for line in final_proc.stdout:
copy_proc.stdin.write(line)
copy_proc.stdin.flush()
if re.search(rb"\d+-\d+ ", line):
yield 1
copy_proc.stdin.close()
self.check_call(copy_proc)
os.remove(raw_ark_path)
os.remove(feats_scp_path)
os.rename(temp_scp_path, feats_scp_path)
if os.path.exists(pitch_scp_path):
os.remove(pitch_scp_path)
os.remove(pitch_ark_path)
class PitchFunction(KaldiFunction):
"""
Multiprocessing function for generating MFCC features
See Also
--------
:meth:`.AcousticCorpusMixin.mfcc`
Main function that calls this function in parallel
:meth:`.AcousticCorpusMixin.mfcc_arguments`
Job method for generating arguments for this function
:kaldi_src:`compute-mfcc-feats`
Relevant Kaldi binary
:kaldi_src:`extract-segments`
Relevant Kaldi binary
:kaldi_src:`copy-feats`
Relevant Kaldi binary
:kaldi_src:`feat-to-len`
Relevant Kaldi binary
Parameters
----------
args: :class:`~montreal_forced_aligner.corpus.features.MfccArguments`
Arguments for the function
"""
progress_pattern = re.compile(r"^LOG.* Processed (?P<num_utterances>\d+) utterances")
def __init__(self, args: PitchArguments):
super().__init__(args)
self.data_directory = args.data_directory
self.pitch_options = args.pitch_options
def _run(self) -> typing.Generator[int]:
"""Run the function"""
with Session(self.db_engine()) as session, mfa_open(self.log_path, "w") as log_file:
job: typing.Optional[Job] = session.get(Job, self.job_name)
feats_scp_path = job.construct_path(self.data_directory, "pitch", "scp")
raw_ark_path = job.construct_path(self.data_directory, "pitch", "ark")
wav_path = job.construct_path(self.data_directory, "wav", "scp")
segments_path = job.construct_path(self.data_directory, "segments", "scp")
if os.path.exists(raw_ark_path):
return
copy_proc = subprocess.Popen(
[
thirdparty_binary("copy-feats"),
"--compress=true",
"ark,t:-",
f"ark,scp:{raw_ark_path},{feats_scp_path}",
],
stdin=subprocess.PIPE,
stderr=log_file,
env=os.environ,
)
pitch_proc = compute_pitch_process(
log_file, wav_path, segments_path, self.pitch_options
)
for line in pitch_proc.stdout:
copy_proc.stdin.write(line)
copy_proc.stdin.flush()
if re.match(rb"^\d+-", line):
yield 1
pitch_proc.wait()
copy_proc.stdin.close()
self.check_call(copy_proc)
class PitchRangeFunction(KaldiFunction):
"""
Multiprocessing function for generating MFCC features
See Also
--------
:meth:`.AcousticCorpusMixin.mfcc`
Main function that calls this function in parallel
:meth:`.AcousticCorpusMixin.mfcc_arguments`
Job method for generating arguments for this function
:kaldi_src:`compute-mfcc-feats`
Relevant Kaldi binary
:kaldi_src:`extract-segments`
Relevant Kaldi binary
:kaldi_src:`copy-feats`
Relevant Kaldi binary
:kaldi_src:`feat-to-len`
Relevant Kaldi binary
Parameters
----------
args: :class:`~montreal_forced_aligner.corpus.features.MfccArguments`
Arguments for the function
"""
progress_pattern = re.compile(r"^LOG.* Processed (?P<num_utterances>\d+) utterances")
def __init__(self, args: PitchRangeArguments):
super().__init__(args)
self.data_directory = args.data_directory
self.pitch_options = args.pitch_options
def _run(self) -> typing.Generator[int]:
"""Run the function"""
with Session(self.db_engine()) as session, mfa_open(self.log_path, "w") as log_file:
job: typing.Optional[Job] = session.get(Job, self.job_name)
wav_path = job.construct_path(self.data_directory, "wav", "scp")
segment_path = job.construct_path(self.data_directory, "segments", "scp")
min_length = 0.1
seg_proc = subprocess.Popen(
[
thirdparty_binary("extract-segments"),
f"--min-segment-length={min_length}",
f"scp:{wav_path}",
segment_path,
"ark:-",
],
stdout=subprocess.PIPE,
stderr=log_file,
env=os.environ,
)
pitch_command = [
thirdparty_binary("compute-kaldi-pitch-feats"),
]
for k, v in self.pitch_options.items():
if k in {"use-pitch", "use-voicing", "normalize"}:
continue
pitch_command.append(f"--{k.replace('_', '-')}={feature_make_safe(v)}")
pitch_command += ["ark:-", "ark,t:-"]
pitch_proc = subprocess.Popen(
pitch_command,
stdout=subprocess.PIPE,
stdin=seg_proc.stdout,
stderr=log_file,
env=os.environ,
)
current_speaker = None
pitch_points = []
for ids, pitch_features in read_feats(pitch_proc, raw_id=True):
speaker_id, utt_id = ids.split("-")
speaker_id = int(speaker_id)
if current_speaker is None:
current_speaker = speaker_id
if current_speaker != speaker_id:
pitch_points = np.array(pitch_points)
mean_f0 = np.mean(pitch_points)
min_f0 = mean_f0 / 2
max_f0 = mean_f0 * 2
yield current_speaker, max(min_f0, 50), min(max_f0, 1500)
pitch_points = []
current_speaker = speaker_id
indices = np.where(pitch_features[:, 0] > 0.5)
pitch_points.extend(pitch_features[indices[0], 1])
self.check_call(pitch_proc)
[docs]
class ComputeVadFunction(KaldiFunction):
"""
Multiprocessing function to compute voice activity detection
See Also
--------
:meth:`.AcousticCorpusMixin.compute_vad`
Main function that calls this function in parallel
:meth:`.AcousticCorpusMixin.compute_vad_arguments`
Job method for generating arguments for this function
:kaldi_src:`compute-vad`
Relevant Kaldi binary
Parameters
----------
args: :class:`~montreal_forced_aligner.corpus.features.VadArguments`
Arguments for the function
"""
progress_pattern = re.compile(
r"^LOG.*processed (?P<done>\d+) utterances.*(?P<no_feats>\d+) had.*(?P<unvoiced>\d+) were.*"
)
def __init__(self, args: VadArguments):
super().__init__(args)
self.feats_scp_path = args.feats_scp_path
self.vad_scp_path = args.vad_scp_path
self.vad_options = args.vad_options
def _run(self) -> typing.Generator[typing.Tuple[int, int, int]]:
"""Run the function"""
with mfa_open(self.log_path, "w") as log_file:
feats_scp_path = self.feats_scp_path
vad_scp_path = self.vad_scp_path
vad_ark_path = self.vad_scp_path.with_suffix(".ark")
vad_proc = subprocess.Popen(
[
thirdparty_binary("compute-vad"),
f"--vad-energy-mean-scale={self.vad_options['energy_mean_scale']}",
f"--vad-energy-threshold={self.vad_options['energy_threshold']}",
f"scp:{feats_scp_path}",
f"ark,scp:{vad_ark_path},{vad_scp_path}",
],
stderr=subprocess.PIPE,
encoding="utf8",
env=os.environ,
)
for line in vad_proc.stderr:
log_file.write(line)
m = self.progress_pattern.match(line.strip())
if m:
yield int(m.group("done")), int(m.group("no_feats")), int(m.group("unvoiced"))
self.check_call(vad_proc)
[docs]
class CalcFmllrFunction(KaldiFunction):
"""
Multiprocessing function for calculating fMLLR transforms
See Also
--------
:meth:`.AcousticCorpusMixin.calc_fmllr`
Main function that calls this function in parallel
:meth:`.AcousticCorpusMixin.calc_fmllr_arguments`
Job method for generating arguments for this function
:kaldi_src:`gmm-est-fmllr`
Relevant Kaldi binary
:kaldi_src:`gmm-est-fmllr-gpost`
Relevant Kaldi binary
:kaldi_src:`gmm-post-to-gpost`
Relevant Kaldi binary
:kaldi_src:`ali-to-post`
Relevant Kaldi binary
:kaldi_src:`weight-silence-post`
Relevant Kaldi binary
:kaldi_src:`compose-transforms`
Relevant Kaldi binary
:kaldi_src:`transform-feats`
Relevant Kaldi binary
Parameters
----------
args: :class:`~montreal_forced_aligner.corpus.features.CalcFmllrArguments`
Arguments for the function
"""
progress_pattern = re.compile(r"^LOG.*For speaker (?P<speaker>.*),.*$")
memory_error_pattern = re.compile(
r"^ERROR \(gmm-est-fmllr-gpost.*Failed to read vector from stream..*$"
)
def __init__(self, args: CalcFmllrArguments):
super().__init__(args)
self.dictionaries = args.dictionaries
self.feature_strings = args.feature_strings
self.ali_paths = args.ali_paths
self.ali_model_path = args.ali_model_path
self.model_path = args.model_path
self.spk2utt_paths = args.spk2utt_paths
self.trans_paths = args.trans_paths
self.fmllr_options = args.fmllr_options
def _run(self) -> typing.Generator[str]:
"""Run the function"""
with mfa_open(self.log_path, "w") as log_file:
for dict_id in self.dictionaries:
while True:
feature_string = self.feature_strings[dict_id]
ali_path = self.ali_paths[dict_id]
spk2utt_path = self.spk2utt_paths[dict_id]
trans_path = self.trans_paths[dict_id]
initial = True
if trans_path.exists():
initial = False
post_proc = subprocess.Popen(
[thirdparty_binary("ali-to-post"), f"ark,s,cs:{ali_path}", "ark:-"],
stderr=log_file,
stdout=subprocess.PIPE,
env=os.environ,
)
weight_proc = subprocess.Popen(
[
thirdparty_binary("weight-silence-post"),
"0.0",
self.fmllr_options["silence_csl"],
self.ali_model_path,
"ark,s,cs:-",
"ark:-",
],
stderr=log_file,
stdin=post_proc.stdout,
stdout=subprocess.PIPE,
env=os.environ,
)
temp_trans_path = trans_path.with_suffix(trans_path.suffix + ".tmp")
if self.ali_model_path != self.model_path:
post_gpost_proc = subprocess.Popen(
[
thirdparty_binary("gmm-post-to-gpost"),
self.ali_model_path,
feature_string,
"ark,s,cs:-",
"ark:-",
],
stderr=log_file,
stdin=weight_proc.stdout,
stdout=subprocess.PIPE,
env=os.environ,
)
est_proc = subprocess.Popen(
[
thirdparty_binary("gmm-est-fmllr-gpost"),
"--verbose=4",
f"--fmllr-update-type={self.fmllr_options['fmllr_update_type']}",
f"--spk2utt=ark:{spk2utt_path}",
self.model_path,
feature_string,
"ark,s,cs:-",
f"ark:{trans_path}",
],
stderr=subprocess.PIPE,
encoding="utf8",
stdin=post_gpost_proc.stdout,
env=os.environ,
)
else:
if not initial:
temp_composed_trans_path = trans_path.with_suffix(".cmp.tmp")
est_proc = subprocess.Popen(
[
thirdparty_binary("gmm-est-fmllr"),
"--verbose=4",
f"--fmllr-update-type={self.fmllr_options['fmllr_update_type']}",
f"--spk2utt=ark,s,cs:{spk2utt_path}",
self.model_path,
feature_string,
"ark,s,cs:-",
f"ark:{temp_trans_path}",
],
stderr=subprocess.PIPE,
encoding="utf8",
stdin=weight_proc.stdout,
stdout=subprocess.PIPE,
env=os.environ,
)
else:
est_proc = subprocess.Popen(
[
thirdparty_binary("gmm-est-fmllr"),
"--verbose=4",
f"--fmllr-update-type={self.fmllr_options['fmllr_update_type']}",
f"--spk2utt=ark,s,cs:{spk2utt_path}",
self.model_path,
feature_string,
"ark,s,cs:-",
f"ark:{trans_path}",
],
stderr=subprocess.PIPE,
encoding="utf8",
stdin=weight_proc.stdout,
env=os.environ,
)
for line in est_proc.stderr:
log_file.write(line)
m = self.progress_pattern.match(line.strip())
if m:
yield m.group("speaker")
try:
self.check_call(est_proc)
break
except KaldiProcessingError: # Try to recover from Memory exception
with mfa_open(self.log_path, "r") as f:
for line in f:
if self.memory_error_pattern.match(line):
os.remove(trans_path)
break
else:
raise
if not initial:
compose_proc = subprocess.Popen(
[
thirdparty_binary("compose-transforms"),
"--b-is-affine=true",
f"ark:{temp_trans_path}",
f"ark:{trans_path}",
f"ark:{temp_composed_trans_path}",
],
stderr=log_file,
env=os.environ,
)
compose_proc.communicate()
self.check_call(compose_proc)
os.remove(trans_path)
os.remove(temp_trans_path)
os.rename(temp_composed_trans_path, trans_path)
[docs]
class FeatureConfigMixin:
"""
Class to store configuration information about MFCC generation
Attributes
----------
feature_type : str
Feature type, defaults to "mfcc"
use_energy : bool
Flag for whether first coefficient should be used, defaults to False
frame_shift : int
number of milliseconds between frames, defaults to 10
snip_edges : bool
Flag for enabling Kaldi's snip edges, should be better time precision
use_pitch : bool
Flag for including pitch in features, defaults to False
low_frequency : int
Frequency floor
high_frequency : int
Frequency ceiling
sample_frequency : int
Sampling frequency
allow_downsample : bool
Flag for whether to allow downsampling, default is True
allow_upsample : bool
Flag for whether to allow upsampling, default is True
uses_cmvn : bool
Flag for whether to use CMVN, default is True
uses_deltas : bool
Flag for whether to use delta features, default is True
uses_splices : bool
Flag for whether to use splices and LDA transformations, default is False
uses_speaker_adaptation : bool
Flag for whether to use speaker adaptation, default is False
fmllr_update_type : str
Type of fMLLR estimation, defaults to "full"
silence_weight : float
Weight of silence in calculating LDA or fMLLR
splice_left_context : int or None
Number of frames to splice on the left for calculating LDA
splice_right_context : int or None
Number of frames to splice on the right for calculating LDA
"""
def __init__(
self,
feature_type: str = "mfcc",
use_energy: bool = False,
frame_shift: int = 10,
frame_length: int = 25,
snip_edges: bool = True,
low_frequency: int = 20,
high_frequency: int = 7800,
sample_frequency: int = 16000,
allow_downsample: bool = True,
allow_upsample: bool = True,
dither: int = 1,
energy_floor: float = 0,
num_coefficients: int = 13,
num_mel_bins: int = 23,
cepstral_lifter: float = 22,
preemphasis_coefficient: float = 0.97,
uses_cmvn: bool = True,
uses_deltas: bool = True,
uses_splices: bool = False,
uses_voiced: bool = False,
adaptive_pitch_range: bool = False,
uses_speaker_adaptation: bool = False,
fmllr_update_type: str = "full",
silence_weight: float = 0.0,
splice_left_context: int = 3,
splice_right_context: int = 3,
use_pitch: bool = False,
use_voicing: bool = False,
use_delta_pitch: bool = False,
min_f0: float = 50,
max_f0: float = 800,
delta_pitch: float = 0.005,
penalty_factor: float = 0.1,
**kwargs,
):
super().__init__(**kwargs)
self.feature_type = feature_type
self.uses_cmvn = uses_cmvn
self.uses_deltas = uses_deltas
self.uses_splices = uses_splices
self.uses_voiced = uses_voiced
self.uses_speaker_adaptation = uses_speaker_adaptation
self.frame_shift = frame_shift
self.export_frame_shift = round(frame_shift / 1000, 4)
self.frame_length = frame_length
self.snip_edges = snip_edges
# MFCC options
self.use_energy = use_energy
self.low_frequency = low_frequency
self.high_frequency = high_frequency
self.sample_frequency = sample_frequency
self.allow_downsample = allow_downsample
self.allow_upsample = allow_upsample
self.dither = dither
self.energy_floor = energy_floor
self.num_coefficients = num_coefficients
self.num_mel_bins = num_mel_bins
self.cepstral_lifter = cepstral_lifter
self.preemphasis_coefficient = preemphasis_coefficient
# fMLLR options
self.fmllr_update_type = fmllr_update_type
self.silence_weight = silence_weight
# Splicing options
self.splice_left_context = splice_left_context
self.splice_right_context = splice_right_context
# Pitch features
self.adaptive_pitch_range = adaptive_pitch_range
self.use_pitch = use_pitch
self.use_voicing = use_voicing
self.use_delta_pitch = use_delta_pitch
self.min_f0 = min_f0
self.max_f0 = max_f0
self.delta_pitch = delta_pitch
self.penalty_factor = penalty_factor
self.normalize_pitch = True
if self.adaptive_pitch_range:
self.min_f0 = 50
self.max_f0 = 1200
@property
def vad_options(self) -> MetaDict:
"""Abstract method for VAD options"""
raise NotImplementedError
@property
def alignment_model_path(self) -> str: # needed for fmllr
"""Abstract method for alignment model path"""
raise NotImplementedError
@property
def model_path(self) -> str: # needed for fmllr
"""Abstract method for model path"""
raise NotImplementedError
@property
def working_directory(self) -> Path:
"""Abstract method for working directory"""
raise NotImplementedError
@property
def corpus_output_directory(self) -> str:
"""Abstract method for working directory of corpus"""
raise NotImplementedError
@property
def data_directory(self) -> str:
"""Abstract method for corpus data directory"""
raise NotImplementedError
@property
def feature_options(self) -> MetaDict:
"""Parameters for feature generation"""
options = {
"type": self.feature_type,
"use_energy": self.use_energy,
"frame_shift": self.frame_shift,
"frame_length": self.frame_length,
"snip_edges": self.snip_edges,
"low_frequency": self.low_frequency,
"high_frequency": self.high_frequency,
"sample_frequency": self.sample_frequency,
"allow_downsample": self.allow_downsample,
"allow_upsample": self.allow_upsample,
"dither": self.dither,
"energy_floor": self.energy_floor,
"num_coefficients": self.num_coefficients,
"num_mel_bins": self.num_mel_bins,
"cepstral_lifter": self.cepstral_lifter,
"preemphasis_coefficient": self.preemphasis_coefficient,
"uses_cmvn": self.uses_cmvn,
"uses_deltas": self.uses_deltas,
"uses_voiced": self.uses_voiced,
"uses_splices": self.uses_splices,
"uses_speaker_adaptation": self.uses_speaker_adaptation,
"use_pitch": self.use_pitch,
"use_voicing": self.use_voicing,
"min_f0": self.min_f0,
"max_f0": self.max_f0,
"delta_pitch": self.delta_pitch,
"penalty_factor": self.penalty_factor,
"silence_weight": self.silence_weight,
"splice_left_context": self.splice_left_context,
"splice_right_context": self.splice_right_context,
}
return options
[docs]
def calc_fmllr(self) -> None:
"""Abstract method for calculating fMLLR transforms"""
raise NotImplementedError
@property
def fmllr_options(self) -> MetaDict:
"""Options for use in calculating fMLLR transforms"""
return {
"fmllr_update_type": self.fmllr_update_type,
"silence_weight": self.silence_weight,
"silence_csl": getattr(
self, "silence_csl", ""
), # If we have silence phones from a dictionary, use them
}
@property
def lda_options(self) -> MetaDict:
"""Options for computing LDA"""
if getattr(self, "acoustic_model", None) is not None:
return self.acoustic_model.lda_options
return {
"splice_left_context": self.splice_left_context,
"splice_right_context": self.splice_right_context,
}
@property
def mfcc_options(self) -> MetaDict:
"""Parameters to use in computing MFCC features."""
if getattr(self, "acoustic_model", None) is not None:
return self.acoustic_model.mfcc_options
return {
"use-energy": self.use_energy,
"dither": self.dither,
"energy-floor": self.energy_floor,
"num-ceps": self.num_coefficients,
"num-mel-bins": self.num_mel_bins,
"cepstral-lifter": self.cepstral_lifter,
"preemphasis-coefficient": self.preemphasis_coefficient,
"frame-shift": self.frame_shift,
"frame-length": self.frame_length,
"low-freq": self.low_frequency,
"high-freq": self.high_frequency,
"sample-frequency": self.sample_frequency,
"allow-downsample": self.allow_downsample,
"allow-upsample": self.allow_upsample,
"snip-edges": self.snip_edges,
}
@property
def pitch_options(self) -> MetaDict:
"""Parameters to use in computing MFCC features."""
if getattr(self, "acoustic_model", None) is not None:
return self.acoustic_model.pitch_options
return {
"use-pitch": self.use_pitch,
"use-voicing": self.use_voicing,
"use-delta-pitch": self.use_delta_pitch,
"frame-shift": self.frame_shift,
"frame-length": self.frame_length,
"min-f0": self.min_f0,
"max-f0": self.max_f0,
"sample-frequency": self.sample_frequency,
"penalty-factor": self.penalty_factor,
"delta-pitch": self.delta_pitch,
"snip-edges": self.snip_edges,
"normalize": self.normalize_pitch,
}
[docs]
class VadConfigMixin(FeatureConfigMixin):
"""
Abstract mixin class for performing voice activity detection
Parameters
----------
use_energy: bool
Flag for using the first coefficient of MFCCs
energy_threshold: float
Energy threshold above which a frame will be counted as voiced
energy_mean_scale: float
Proportion of the mean energy of the file that should be added to the energy_threshold
See Also
--------
:class:`~montreal_forced_aligner.corpus.features.FeatureConfigMixin`
For feature generation parameters
"""
def __init__(self, energy_threshold=5.5, energy_mean_scale=0.5, **kwargs):
super().__init__(**kwargs)
self.energy_threshold = energy_threshold
self.energy_mean_scale = energy_mean_scale
@property
def vad_options(self) -> MetaDict:
"""Options for performing VAD"""
return {
"energy_threshold": self.energy_threshold,
"energy_mean_scale": self.energy_mean_scale,
}
[docs]
class IvectorConfigMixin(VadConfigMixin):
"""
Mixin class for ivector features
Parameters
----------
ivector_dimension: int
Dimension of ivectors
num_gselect: int
Gaussian-selection using diagonal model: number of Gaussians to select
posterior_scale: float
Scale on the acoustic posteriors, intended to account for inter-frame correlations
min_post : float
Minimum posterior to use (posteriors below this are pruned out)
max_count: int
The use of this option (e.g. --max-count 100) can make iVectors more consistent for different lengths of
utterance, by scaling up the prior term when the data-count exceeds this value. The data-count is after
posterior-scaling, so assuming the posterior-scale is 0.1, --max-count 100 starts having effect after 1000
frames, or 10 seconds of data.
See Also
--------
:class:`~montreal_forced_aligner.corpus.features.FeatureConfigMixin`
For feature generation parameters
"""
def __init__(
self,
num_gselect: int = 20,
posterior_scale: float = 1.0,
min_post: float = 0.025,
max_count: int = 100,
**kwargs,
):
super().__init__(**kwargs)
self.ivector_dimension = IVECTOR_DIMENSION
self.num_gselect = num_gselect
self.posterior_scale = posterior_scale
self.min_post = min_post
self.max_count = max_count
self.normalize_pitch = False
@property
def ivector_options(self) -> MetaDict:
"""Options for ivector training and extracting"""
return {
"num_gselect": self.num_gselect,
"posterior_scale": self.posterior_scale,
"min_post": self.min_post,
"silence_weight": self.silence_weight,
"max_count": self.max_count,
"ivector_dimension": self.ivector_dimension,
"silence_csl": getattr(
self, "silence_csl", ""
), # If we have silence phones from a dictionary, use them,
}
@njit
def plda_distance(train_ivector: np.ndarray, test_ivector: np.ndarray, psi: np.ndarray):
"""
Distance formulation of PLDA log likelihoods. Positive log likelihood ratios are transformed
into 1 / log likelihood ratio and negative log likelihood ratios are made positive.
Parameters
----------
train_ivector: numpy.ndarray
Utterance ivector to use as reference
test_ivector: numpy.ndarray
Utterance ivector to compare
psi: numpy.ndarray
Input psi from :class:`~montreal_forced_aligner.corpus.features.PldaModel`
Returns
-------
float
PLDA distance
"""
max_log_likelihood = 40.0
loglike = plda_log_likelihood(train_ivector, test_ivector, psi)
if loglike >= max_log_likelihood:
return 0.0
return max_log_likelihood - loglike
@njit(cache=True)
def plda_variance_given(psi: np.ndarray, train_count: int = None):
if train_count is not None:
variance_given = 1.0 + psi / (train_count * psi + 1.0)
else:
variance_given = 1.0 + psi / (psi + 1.0)
logdet_given = np.sum(np.log(variance_given))
variance_given = 1.0 / variance_given
return logdet_given, variance_given
@njit(cache=True)
def plda_variance_without(psi: np.ndarray):
variance_without = 1.0 + psi
logdet_without = np.sum(np.log(variance_without))
variance_without = 1.0 / variance_without
return logdet_without, variance_without
@njit
def plda_log_likelihood(
train_ivector: np.ndarray, test_ivector: np.ndarray, psi: np.ndarray, train_count: int = None
):
"""
Calculate log likelihood of two ivectors belonging to the same class
Parameters
----------
train_ivector: numpy.ndarray
Speaker or utterance ivector to use as reference
test_ivector: numpy.ndarray
Utterance ivector to compare
psi: numpy.ndarray
Input psi from :class:`~montreal_forced_aligner.corpus.features.PldaModel`
train_count: int, optional
Count of training ivector, if it represents a speaker
Returns
-------
float
Log likelihood ratio of same class hypothesis compared to difference class hypothesis
"""
train_ivector = train_ivector.astype("float64")
test_ivector = test_ivector.astype("float64")
psi = psi.astype("float64")
if train_count is not None:
mean = (train_count * psi) / (train_count * psi + 1.0)
mean *= train_ivector # N X D , X[0]- Train ivectors
else:
mean = (psi) / (psi + 1.0)
mean *= train_ivector # N X D , X[0]- Train ivectors
logdet_given, variance_given = plda_variance_given(psi, train_count)
# without class computation
logdet_without, variance_without = plda_variance_without(psi)
sqdiff_given = test_ivector - mean
sqdiff_given = sqdiff_given**2
loglikes = -0.5 * (
logdet_given + M_LOG_2PI * PLDA_DIMENSION + np.dot(sqdiff_given, variance_given)
)
sqdiff_without = test_ivector**2
loglike_without_class = -0.5 * (
logdet_without + M_LOG_2PI * PLDA_DIMENSION + np.dot(sqdiff_without, variance_without)
)
return loglikes - loglike_without_class
@njit(parallel=True)
def plda_distance_matrix(
train_ivectors: np.ndarray,
test_ivectors: np.ndarray,
psi: np.ndarray,
) -> np.ndarray:
"""
Adapted from https://github.com/prachiisc/PLDA_scoring/blob/master/PLDA_scoring.py#L177
Computes plda affinity matrix using Loglikelihood function
Parameters
----------
train_ivectors : numpy.ndarray
Ivectors to compare test ivectors against against 1 X N X D
test_ivectors : numpy.ndarray
Ivectors to compare against training examples 1 X M X D
psi: numpy.ndarray
Psi matrix from PLDA model
Returns
-------
np.ndarray
Affinity matrix, shape is number of train ivectors by the number of test ivectors (M X N)
"""
num_train = train_ivectors.shape[0]
num_test = test_ivectors.shape[0]
distance_matrix = np.zeros((num_test, num_train))
for i in numba.prange(num_train):
for j in numba.prange(num_test):
distance_matrix[i, j] = plda_log_likelihood(train_ivectors[i], test_ivectors[j], psi)
return distance_matrix
def pairwise_plda_distance_matrix(
ivectors: np.ndarray,
psi: np.ndarray,
) -> csr_matrix:
"""
Adapted from https://github.com/prachiisc/PLDA_scoring/blob/master/PLDA_scoring.py#L177
Computes plda affinity matrix using Loglikelihood function
Parameters
----------
ivectors : numpy.ndarray
Ivectors to compare pairwise
psi: numpy.ndarray
Psi matrix from PLDA model
Returns
-------
np.ndarray
Affinity matrix, shape is number of train ivectors by the number of test ivectors (M X N)
"""
full = plda_distance_matrix(ivectors, ivectors, psi)
return csr_matrix(full[np.where(full > 5)])
@njit(parallel=True)
def score_plda(
train_ivectors: np.ndarray,
test_ivectors: np.ndarray,
psi: np.ndarray,
normalize=False,
distance=False,
) -> np.ndarray:
"""
Adapted from https://github.com/prachiisc/PLDA_scoring/blob/master/PLDA_scoring.py#L177
Computes plda affinity matrix using Loglikelihood function
Parameters
----------
train_ivectors : numpy.ndarray
Ivectors to compare test ivectors against against 1 X N X D
test_ivectors : numpy.ndarray
Ivectors to compare against training examples 1 X M X D
normalize: bool
Flag for normalizing matrix by the maximum value
distance: bool
Flag for converting PLDA log likelihood ratios into a distance metric
Returns
-------
np.ndarray
Affinity matrix, shape is number of train ivectors by the number of test ivectors (M X N)
"""
mean = (psi) / (psi + 1.0)
mean = mean.reshape(1, -1) * train_ivectors
# given class computation
variance_given = 1.0 + psi / (psi + 1.0)
logdet_given = np.sum(np.log(variance_given))
variance_given = 1.0 / variance_given
# without class computation
variance_without = 1.0 + psi
logdet_without = np.sum(np.log(variance_without))
variance_without = 1.0 / variance_without
sqdiff = test_ivectors # ---- Test x-vectors
num_train = train_ivectors.shape[0]
num_test = test_ivectors.shape[0]
dim = test_ivectors.shape[1]
loglikes = np.zeros((num_test, num_train))
sqdiff_without = sqdiff**2
loglike_without_class = -0.5 * (
logdet_without + M_LOG_2PI * dim + (sqdiff_without @ variance_without)
)
for i in numba.prange(num_train):
sqdiff_given = sqdiff - mean[i]
sqdiff_given = sqdiff_given**2
loglikes[:, i] = (
-0.5 * (logdet_given + M_LOG_2PI * dim + (sqdiff_given @ variance_given))
) - loglike_without_class
if distance:
threshold = np.max(loglikes)
loglikes -= threshold
loglikes *= -1
if normalize:
loglikes /= threshold
return loglikes
@njit
def compute_classification_stats(
speaker_ivectors: np.ndarray, psi: np.ndarray, counts: np.ndarray
) -> typing.Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray, np.ndarray]:
"""
Precomputes necessary stats for training ivectors to save time on classification in
:func:`~montreal_forced_aligner.corpus.features.classify_plda`.
Parameters
----------
speaker_ivectors: numpy.ndarray
Training speaker ivectors
psi: numpy.ndarray
Psi matrix from PLDA model
counts: numpy.ndarray
Utterance counts for each speaker
Returns
-------
numpy.ndarray
PLDA mean vector
numpy.ndarray
Variance for given class
numpy.ndarray
Logdet for given class
numpy.ndarray
Variance for no class
numpy.ndarray
Logdet for no class
"""
mean = (counts.reshape(-1, 1) * psi.reshape(1, -1)) / (
counts.reshape(-1, 1) * psi.reshape(1, -1) + 1.0
)
mean = mean * speaker_ivectors # N X D , X[0]- Train ivectors
# given class computation
variance_given = 1.0 + psi / (counts.reshape(-1, 1) * psi.reshape(1, -1) + 1.0)
logdet_given = np.sum(np.log(variance_given), axis=1)
variance_given = 1.0 / variance_given
# without class computation
variance_without = 1.0 + psi
logdet_without = np.sum(np.log(variance_without))
variance_without = 1.0 / variance_without
return mean, variance_given, logdet_given, variance_without, logdet_without
@njit(parallel=True)
def classify_plda(
utterance_ivector: np.ndarray,
mean: np.ndarray,
variance_given: np.ndarray,
logdet_given: np.ndarray,
variance_without: np.ndarray,
logdet_without: np.ndarray,
) -> typing.Tuple[int, float]:
"""
Adapted from https://github.com/prachiisc/PLDA_scoring/blob/master/PLDA_scoring.py#L177
Computes plda affinity matrix using Loglikelihood function
Parameters
----------
utterance_ivector : numpy.ndarray
Utterance ivector to compare against
mean: numpy.ndarray
From :func:`~montreal_forced_aligner.corpus.features.compute_classification_stats`
variance_given: numpy.ndarray
From :func:`~montreal_forced_aligner.corpus.features.compute_classification_stats`
logdet_given: numpy.ndarray
From :func:`~montreal_forced_aligner.corpus.features.compute_classification_stats`
variance_without: numpy.ndarray
From :func:`~montreal_forced_aligner.corpus.features.compute_classification_stats`
logdet_without: numpy.ndarray
From :func:`~montreal_forced_aligner.corpus.features.compute_classification_stats`
Returns
-------
int
Best speaker index
float
Best speaker PLDA score
"""
num_speakers = mean.shape[0]
sqdiff_without = utterance_ivector**2
loglike_without_class = -0.5 * (
logdet_without + M_LOG_2PI * PLDA_DIMENSION + (sqdiff_without @ variance_without)
)
loglikes = np.zeros((num_speakers,))
for i in numba.prange(num_speakers):
sqdiff_given = utterance_ivector - mean[i]
sqdiff_given = sqdiff_given**2
logdet = logdet_given[i]
variance = variance_given[i]
loglikes[i] = (
-0.5 * (logdet + M_LOG_2PI * PLDA_DIMENSION + (sqdiff_given @ variance))
) - loglike_without_class
ind = loglikes.argmax()
return ind, loglikes[ind]
@njit(parallel=True)
def score_plda_train_counts(
train_ivectors: np.ndarray, test_ivectors: np.ndarray, psi: np.ndarray, counts: np.ndarray
) -> np.ndarray:
"""
Adapted from https://github.com/prachiisc/PLDA_scoring/blob/master/PLDA_scoring.py#L177
Computes plda affinity matrix using Loglikelihood function
Parameters
----------
train_ivectors : numpy.ndarray
Ivectors to compare test ivectors against against 1 X N X D
test_ivectors : numpy.ndarray
Ivectors to compare against training examples 1 X M X D
psi: numpy.ndarray
Psi matrix from PLDA model
counts: numpy.ndarray
Utterance counts for each speaker
Returns
-------
np.ndarray
Affinity matrix, shape is number of train ivectors by the number of test ivectors (M X N)
"""
num_train = train_ivectors.shape[0]
num_test = test_ivectors.shape[0]
loglikes = np.zeros((num_test, num_train))
for i in numba.prange(num_train):
for j in numba.prange(num_test):
loglikes[j, i] = plda_log_likelihood(
train_ivectors[i], test_ivectors[j], psi, counts[i]
)
return loglikes
@dataclassy.dataclass(slots=True)
class PldaModel:
"""PLDA model for transforming and scoring ivectors based on log likelihood ratios"""
mean: np.ndarray
diagonalizing_transform: np.ndarray
psi: np.ndarray
offset: typing.Optional[np.ndarray] = None
pca_transform: typing.Optional[np.ndarray] = None
transformed_mean: typing.Optional[np.ndarray] = None
transformed_diagonalizing_transform: typing.Optional[np.ndarray] = None
@classmethod
def load(cls, plda_path: Path):
"""
Instantiate a PLDA model from a trained model file
Parameters
----------
plda_path: :class:`~pathlib.Path`
Path to trained PLDA model
Returns
-------
:class:`~montreal_forced_aligner.corpus.features.PldaModel`
Instantiated object
"""
mean = None
diagonalizing_transform = None
diagonalizing_transform_lines = []
psi = None
copy_proc = subprocess.Popen(
[thirdparty_binary("ivector-copy-plda"), "--binary=false", plda_path, "-"],
stderr=subprocess.DEVNULL,
stdout=subprocess.PIPE,
env=os.environ,
encoding="utf8",
)
for line in copy_proc.stdout:
if mean is None:
line = line.replace("<Plda>", "").strip()[2:-2]
mean = np.fromstring(line, sep=" ")
elif diagonalizing_transform is None:
if "[" in line:
continue
end_mat = "]" in line
line = line.replace("[", "").replace("]", "").strip()
row = np.fromstring(line, sep=" ")
diagonalizing_transform_lines.append(row)
if end_mat:
diagonalizing_transform = np.array(diagonalizing_transform_lines)
elif psi is None:
line = line.strip()[2:-2]
psi = np.fromstring(line, sep=" ")
copy_proc.wait()
offset = -diagonalizing_transform @ mean.reshape(-1, 1)
return PldaModel(mean, diagonalizing_transform, psi, offset)
def distance(self, train_ivector: np.ndarray, test_ivector: np.ndarray):
"""
Distance formulation of PLDA log likelihoods. Positive log likelihood ratios are transformed
into 1 / log likelihood ratio and negative log likelihood ratios are made positive.
Parameters
----------
train_ivector: numpy.ndarray
Utterance ivector to use as reference
test_ivector: numpy.ndarray
Utterance ivector to compare
Returns
-------
float
PLDA distance
"""
return plda_distance(train_ivector, test_ivector, self.psi)
def log_likelihood(self, train_ivector: np.ndarray, test_ivector: np.ndarray, count: int = 1):
"""
Calculate log likelihood of two ivectors belonging to the same class
Parameters
----------
train_ivector: numpy.ndarray
Speaker or utterance ivector to use as reference
test_ivector: numpy.ndarray
Utterance ivector to compare
count: int, optional
Count of training ivector, if it represents a speaker
Returns
-------
float
Log likelihood ratio of same class hypothesis compared to difference class hypothesis
"""
return plda_log_likelihood(train_ivector, test_ivector, self.psi, count)
def process_ivectors(self, ivectors: np.ndarray, counts: np.ndarray = None) -> np.ndarray:
"""
Transform ivectors to PLDA space
Parameters
----------
ivectors: numpy.ndarray
Ivectors to process
counts: numpy.ndarray, optional
Number of utterances if ivectors are per-speaker
Returns
-------
numpy.ndarray
Transformed ivectors
"""
# ivectors = self.preprocess_ivectors(ivectors)
# ivectors = self.compute_pca_transform(ivectors)
ivectors = self.transform_ivectors(ivectors, counts=counts)
return ivectors
def preprocess_ivectors(self, ivectors: np.ndarray) -> np.ndarray:
"""
Adapted from https://github.com/prachiisc/PLDA_scoring/blob/master/PLDA_scoring.py#L25
Parameters
----------
ivectors: numpy.ndarray
Input ivectors
Returns
-------
numpy.ndarray
Preprocessed ivectors
"""
ivectors = ivectors.T # DX N
dim = ivectors.shape[1]
# preprocessing
# mean subtraction
ivectors = ivectors - self.mean[:, np.newaxis]
# PCA transform
# ivectors = self.diagonalizing_transform @ ivectors
l2_norm = np.linalg.norm(ivectors, axis=0, keepdims=True)
l2_norm = l2_norm / math.sqrt(dim)
ivectors_new = ivectors / l2_norm
return ivectors_new.T
def compute_pca_transform(self, ivectors: np.ndarray) -> np.ndarray:
"""
Adapted from https://github.com/prachiisc/PLDA_scoring/blob/master/PLDA_scoring.py#L53
Apply transform on mean shifted ivectors
Parameters
----------
ivectors: numpy.ndarray
Input ivectors
Returns
----------
numpy.ndarray
Transformed ivectors
"""
if PLDA_DIMENSION == IVECTOR_DIMENSION:
return ivectors
if self.pca_transform is not None:
return ivectors @ self.pca_transform
num_rows = ivectors.shape[0]
mean = np.mean(ivectors, 0, keepdims=True)
S = np.matmul(ivectors.T, ivectors)
S = S / num_rows
S = S - mean.T @ mean
ev_s, eig_s, _ = np.linalg.svd(S, full_matrices=True)
energy_percent = np.sum(eig_s[:PLDA_DIMENSION]) / np.sum(eig_s)
logger.debug(f"PLDA PCA transform energy with: {energy_percent*100:.2f}%")
transform = ev_s[:, :PLDA_DIMENSION]
transxvec = ivectors @ transform
newX = transxvec
self.pca_transform = transform
self.apply_transform()
return newX
def apply_transform(self) -> None:
"""
Adapted from https://github.com/prachiisc/PLDA_scoring/blob/master/PLDA_scoring.py#L101
"""
mean_plda = self.mean
# transfomed mean vector
transform_in = self.pca_transform.T
new_mean = transform_in @ mean_plda[:, np.newaxis]
D = self.diagonalizing_transform
psi = self.psi
D_inv = np.linalg.inv(D)
# within class and between class covarinace
phi_b = (D_inv * psi.reshape(1, -1)) @ D_inv.T
phi_w = D_inv @ D_inv.T
# transformed with class and between class covariance
new_phi_b = transform_in @ phi_b @ transform_in.T
new_phi_w = transform_in @ phi_w @ transform_in.T
ev_w, eig_w, _ = np.linalg.svd(new_phi_w)
eig_w_inv = 1 / np.sqrt(eig_w)
Dnew = eig_w_inv.reshape(-1, 1) * ev_w.T
new_phi_b_proj = Dnew @ new_phi_b @ Dnew.T
ev_b, eig_b, _ = np.linalg.svd(new_phi_b_proj)
psi_new = eig_b
Dnew = ev_b.T @ Dnew
self.transformed_mean = new_mean
self.transformed_diagonalizing_transform = Dnew
self.psi = psi_new
self.offset = -Dnew @ new_mean.reshape(-1, 1)
def transform_ivectors(self, ivectors: np.ndarray, counts: np.ndarray = None) -> np.ndarray:
"""
Adapted from https://github.com/prachiisc/PLDA_scoring/blob/master/PLDA_scoring.py#L142
Apply plda mean and diagonalizing transform to ivectors for scoring
Parameters
----------
ivectors : numpy.ndarray
Input ivectors
counts : numpy.ndarray, optional
Utterance counts per speaker
Returns
-------
numpy.ndarray
transformed ivectors
"""
offset = self.offset
offset = offset.T
if PLDA_DIMENSION == IVECTOR_DIMENSION:
D = self.diagonalizing_transform
else:
D = self.transformed_diagonalizing_transform
Dnew = D.T
X_new = ivectors @ Dnew
X_new = X_new + offset
# Get normalizing factor
# Defaults : normalize_length(true), simple_length_norm(false)
X_new_sq = X_new**2
if counts is not None:
dot_prod = np.zeros((X_new.shape[0], 1))
for i in range(dot_prod.shape[0]):
inv_covar = self.psi + (1.0 / counts[i])
inv_covar = 1.0 / inv_covar
dot_prod[i] = np.dot(X_new_sq[i], inv_covar)
else:
inv_covar = (1.0 / (1.0 + self.psi)).reshape(-1, 1)
dot_prod = X_new_sq @ inv_covar # N X 1
Dim = D.shape[0]
normfactor = np.sqrt(Dim / dot_prod)
X_new = X_new * normfactor
return X_new
class ExportIvectorsFunction(KaldiFunction):
"""
Multiprocessing function to compute voice activity detection
See Also
--------
:meth:`.AcousticCorpusMixin.compute_vad`
Main function that calls this function in parallel
:meth:`.AcousticCorpusMixin.compute_vad_arguments`
Job method for generating arguments for this function
:kaldi_src:`compute-vad`
Relevant Kaldi binary
Parameters
----------
args: :class:`~montreal_forced_aligner.corpus.features.VadArguments`
Arguments for the function
"""
def __init__(self, args: ExportIvectorsArguments):
super().__init__(args)
self.use_xvector = args.use_xvector
def _run(self) -> typing.Generator[typing.Tuple[int, int, int]]:
"""Run the function"""
engine = sqlalchemy.create_engine(
self.db_string,
poolclass=sqlalchemy.NullPool,
pool_reset_on_return=None,
isolation_level="AUTOCOMMIT",
logging_name=f"{type(self).__name__}_engine",
).execution_options(logging_token=f"{type(self).__name__}_engine")
with sqlalchemy.orm.Session(engine) as session, mfa_open(self.log_path, "w") as log_file:
job: Job = (
session.query(Job)
.options(joinedload(Job.corpus, innerjoin=True))
.filter(Job.id == self.job_name)
.first()
)
if self.use_xvector:
ivector_column = Utterance.xvector
else:
ivector_column = Utterance.ivector
query = (
session.query(Utterance.kaldi_id, ivector_column)
.filter(ivector_column != None, Utterance.job_id == job.id) # noqa
.order_by(Utterance.kaldi_id)
)
ivector_scp_path = job.construct_path(job.corpus.split_directory, "ivectors", "scp")
ivector_ark_path = job.construct_path(job.corpus.split_directory, "ivectors", "ark")
input_proc = subprocess.Popen(
[
thirdparty_binary("copy-vector"),
"--binary=true",
"ark,t:-",
f"ark,scp:{ivector_ark_path},{ivector_scp_path}",
],
stdin=subprocess.PIPE,
stderr=log_file,
env=os.environ,
)
for utt_id, ivector in query:
if ivector is None:
continue
ivector = " ".join([format(x, ".12g") for x in ivector])
in_line = f"{utt_id} [ {ivector} ]\n".encode("utf8")
input_proc.stdin.write(in_line)
input_proc.stdin.flush()
input_proc.stdin.close()
self.check_call(input_proc)
with mfa_open(ivector_scp_path) as f:
for line in f:
line = line.strip()
utt_id, ark_path = line.split(maxsplit=1)
utt_id = int(utt_id.split("-")[1])
yield utt_id, ark_path
def online_feature_proc(
working_directory: Path,
wav_path: Path,
segment_path: Path,
mfcc_options: MetaDict,
pitch_options: MetaDict,
lda_options: MetaDict,
log_file: io.FileIO,
) -> subprocess.Popen:
"""
Generate a subprocess Popen object that processes features for online alignment, decoding, etc.
Parameters
----------
working_directory: :class:`~pathlib.Path`
wav_path: :class:`~pathlib.Path`
segment_path: :class:`~pathlib.Path`
mfcc_options: dict[str, Any]
pitch_options: dict[str, Any]
lda_options: dict[str, Any]
log_file: writable buffer
Returns
-------
subprocess.Popen
Process that
"""
mfcc_ark_path = working_directory.joinpath("mfcc.ark")
pitch_ark_path = working_directory.joinpath("pitch.ark")
feats_ark_path = working_directory.joinpath("feats.ark")
lda_mat_path = working_directory.joinpath("lda.mat")
trans_scp_path = working_directory.joinpath("trans.scp")
cmvn_scp_path = working_directory.joinpath("cmvn.scp")
utt2spk_scp_path = working_directory.joinpath("utt2spk.scp")
seg_proc = subprocess.Popen(
[
thirdparty_binary("extract-segments"),
"--min-segment-length=0.1",
f"scp:{wav_path}",
segment_path,
"ark:-",
],
stdout=subprocess.PIPE,
stderr=log_file,
env=os.environ,
)
mfcc_proc = compute_mfcc_process(log_file, wav_path, subprocess.PIPE, mfcc_options)
if cmvn_scp_path.exists():
cmvn_proc = subprocess.Popen(
[
thirdparty_binary("apply-cmvn"),
f"--utt2spk=ark:{utt2spk_scp_path}",
f"scp:{cmvn_scp_path}",
"ark:-",
f"ark:{mfcc_ark_path}",
],
stdin=mfcc_proc.stdout,
stderr=log_file,
env=os.environ,
)
else:
cmvn_proc = subprocess.Popen(
[
"apply-cmvn-sliding",
"--norm-vars=false",
"--center=true",
"--cmn-window=300",
"ark:-",
f"ark:{mfcc_ark_path}",
],
env=os.environ,
stdin=mfcc_proc.stdout,
stderr=log_file,
)
use_pitch = pitch_options["use-pitch"] or pitch_options["use-voicing"]
if use_pitch:
pitch_proc = compute_pitch_process(log_file, wav_path, subprocess.PIPE, pitch_options)
pitch_copy_proc = subprocess.Popen(
[
thirdparty_binary("copy-feats"),
"--compress=true",
"ark:-",
f"ark:{pitch_ark_path}",
],
stdin=pitch_proc.stdout,
stderr=log_file,
env=os.environ,
)
for line in seg_proc.stdout:
mfcc_proc.stdin.write(line)
mfcc_proc.stdin.flush()
if use_pitch:
pitch_proc.stdin.write(line) # noqa
pitch_proc.stdin.flush()
mfcc_proc.stdin.close()
if use_pitch:
pitch_proc.stdin.close()
cmvn_proc.wait()
if use_pitch:
pitch_copy_proc.wait() # noqa
if use_pitch:
paste_proc = subprocess.Popen(
[
thirdparty_binary("paste-feats"),
"--length-tolerance=2",
f"ark:{mfcc_ark_path}",
f"ark:{pitch_ark_path}",
f"ark:{feats_ark_path}",
],
stderr=log_file,
env=os.environ,
)
paste_proc.wait()
else:
feats_ark_path = mfcc_ark_path
trans_proc = compute_transform_process(
log_file,
feats_ark_path,
lda_mat_path,
lda_options,
fmllr_path=trans_scp_path,
utt2spk_path=utt2spk_scp_path,
)
return trans_proc