"""Class definition for TrainableIvectorExtractor"""
from __future__ import annotations
import logging
import os
import re
import shutil
import subprocess
import time
import typing
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
from tqdm.rich import tqdm
from montreal_forced_aligner.abc import MetaDict, ModelExporterMixin, TopLevelMfaWorker
from montreal_forced_aligner.acoustic_modeling.base import AcousticModelTrainingMixin
from montreal_forced_aligner.config import GLOBAL_CONFIG, PLDA_DIMENSION
from montreal_forced_aligner.corpus.features import IvectorConfigMixin
from montreal_forced_aligner.corpus.ivector_corpus import IvectorCorpusMixin
from montreal_forced_aligner.data import WorkflowType
from montreal_forced_aligner.db import CorpusWorkflow
from montreal_forced_aligner.exceptions import ConfigError, KaldiProcessingError
from montreal_forced_aligner.helper import load_configuration, mfa_open
from montreal_forced_aligner.ivector.multiprocessing import (
AccGlobalStatsArguments,
AccGlobalStatsFunction,
AccIvectorStatsArguments,
AccIvectorStatsFunction,
GaussToPostArguments,
GaussToPostFunction,
GmmGselectArguments,
GmmGselectFunction,
)
from montreal_forced_aligner.models import IvectorExtractorModel
from montreal_forced_aligner.utils import (
log_kaldi_errors,
parse_logs,
run_kaldi_function,
thirdparty_binary,
)
__all__ = [
"TrainableIvectorExtractor",
"DubmTrainer",
"IvectorTrainer",
"IvectorModelTrainingMixin",
]
logger = logging.getLogger("mfa")
[docs]
class IvectorModelTrainingMixin(AcousticModelTrainingMixin):
"""
Abstract mixin for training ivector extractor models
See Also
--------
:class:`~montreal_forced_aligner.acoustic_modeling.base.AcousticModelTrainingMixin`
For acoustic model training parsing parameters
"""
@property
def meta(self) -> MetaDict:
"""Generate metadata for the acoustic model that was trained"""
from datetime import datetime
from ..utils import get_mfa_version
data = {
"version": get_mfa_version(),
"architecture": self.architecture,
"train_date": str(datetime.now()),
"features": self.feature_options,
}
return data
[docs]
def compute_calculated_properties(self) -> None:
"""Not implemented"""
pass
[docs]
def export_model(self, output_model_path: Path) -> None:
"""
Output IvectorExtractor model
Parameters
----------
output_model_path : str
Path to save ivector extractor model
"""
directory = output_model_path.parent
ivector_extractor = IvectorExtractorModel.empty(
output_model_path.stem, self.working_log_directory
)
ivector_extractor.add_meta_file(self)
ivector_extractor.add_model(self.working_directory)
if directory:
os.makedirs(directory, exist_ok=True)
basename, _ = os.path.splitext(output_model_path)
ivector_extractor.dump(basename)
[docs]
class DubmTrainer(IvectorModelTrainingMixin):
"""
Trainer for diagonal universal background models
Parameters
----------
num_iterations : int
Number of training iterations to perform, defaults to 4
num_gselect: int
Number of Gaussian-selection indices to use while training
subsample: int
Subsample factor for feature frames, defaults to 5
num_frames:int
Number of frames to keep in memory for initialization, defaults to 500000
num_gaussians:int
Number of gaussians to use for DUBM training, defaults to 256
num_iterations_init:int
Number of iteration to use when initializing UBM, defaults to 20
initial_gaussian_proportion:float
Proportion of total gaussians to use initially, defaults to 0.5
min_gaussian_weight: float
Defaults to 0.0001
remove_low_count_gaussians: bool
Flag for removing low count gaussians in the final round of training, defaults to True
See Also
--------
:class:`~montreal_forced_aligner.ivector.trainer.IvectorModelTrainingMixin`
For base ivector training parameters
"""
def __init__(
self,
num_iterations: int = 4,
num_gselect: int = 30,
subsample: int = 5,
num_frames: int = 500000,
num_gaussians: int = 256,
num_iterations_init: int = 20,
initial_gaussian_proportion: float = 0.5,
min_gaussian_weight: float = 0.0001,
remove_low_count_gaussians: bool = True,
**kwargs,
):
super().__init__(**kwargs)
self.num_iterations = num_iterations
self.subsample = subsample
self.num_gselect = num_gselect
self.num_frames = num_frames
self.num_gaussians = num_gaussians
self.num_iterations_init = num_iterations_init
self.initial_gaussian_proportion = initial_gaussian_proportion
self.min_gaussian_weight = min_gaussian_weight
self.remove_low_count_gaussians = remove_low_count_gaussians
self.use_alignment_features = False
@property
def train_type(self) -> str:
"""Training identifier"""
return "dubm"
@property
def dubm_options(self) -> MetaDict:
"""Options for DUBM training"""
return {"subsample": self.subsample, "num_gselect": self.num_gselect}
[docs]
def gmm_gselect_arguments(self) -> List[GmmGselectArguments]:
"""
Generate Job arguments for :func:`~montreal_forced_aligner.ivector.trainer.GmmGselectFunction`
Returns
-------
list[:class:`~montreal_forced_aligner.ivector.trainer.GmmGselectArguments`]
Arguments for processing
"""
arguments = []
for j in self.jobs:
arguments.append(
GmmGselectArguments(
j.id,
getattr(self, "db_string", ""),
self.working_log_directory.joinpath(f"gmm_gselect.{j.id}.log"),
self.feature_options,
self.dubm_options,
self.model_path,
j.construct_path(self.working_directory, "gselect", "ark"),
)
)
return arguments
[docs]
def acc_global_stats_arguments(
self,
) -> List[AccGlobalStatsArguments]:
"""
Generate Job arguments for :func:`~montreal_forced_aligner.ivector.trainer.AccGlobalStatsFunction`
Returns
-------
list[:class:`~montreal_forced_aligner.ivector.trainer.AccGlobalStatsArguments`]
Arguments for processing
"""
arguments = []
for j in self.jobs:
arguments.append(
AccGlobalStatsArguments(
j.id,
getattr(self, "db_string", ""),
os.path.join(
self.working_log_directory,
f"acc_global_stats.{self.iteration}.{j.id}.log",
),
self.feature_options,
self.dubm_options,
j.construct_path(self.working_directory, "gselect", "ark"),
j.construct_path(self.working_directory, f"global.{self.iteration}", "acc"),
self.model_path,
)
)
return arguments
[docs]
def gmm_gselect(self) -> None:
"""
Multiprocessing function that stores Gaussian selection indices on disk
See Also
--------
:func:`~montreal_forced_aligner.ivector.trainer.GmmGselectFunction`
Multiprocessing helper function for each job
:meth:`.DubmTrainer.gmm_gselect_arguments`
Job method for generating arguments for the helper function
:kaldi_steps:`train_diag_ubm`
Reference Kaldi script
"""
begin = time.time()
logger.info("Selecting gaussians...")
arguments = self.gmm_gselect_arguments()
with tqdm(
total=int(self.num_current_utterances / 10), disable=GLOBAL_CONFIG.quiet
) as pbar:
for _ in run_kaldi_function(GmmGselectFunction, arguments, pbar.update):
pass
logger.debug(f"Gaussian selection took {time.time() - begin:.3f} seconds")
def _trainer_initialization(self, initial_alignment_directory: Optional[str] = None) -> None:
"""DUBM training initialization"""
log_path = self.working_log_directory.joinpath("gmm_init.log")
with self.session() as session, mfa_open(log_path, "w") as log_file:
alignment_workflow: CorpusWorkflow = (
session.query(CorpusWorkflow)
.filter(CorpusWorkflow.workflow_type == WorkflowType.alignment)
.first()
)
num_gauss_init = int(self.initial_gaussian_proportion * int(self.num_gaussians))
self.iteration = 1
if self.use_alignment_features and alignment_workflow is not None:
model_path = os.path.join(alignment_workflow.working_directory, "final.mdl")
occs_path = os.path.join(alignment_workflow.working_directory, "final.occs")
gmm_init_proc = subprocess.Popen(
[
thirdparty_binary("init-ubm"),
"--fullcov=false",
"--intermediate-num-gauss=2000",
f"--num-frames={self.num_frames}",
f"--ubm-num-gauss={self.num_gaussians}",
model_path,
occs_path,
self.model_path,
],
stderr=log_file,
)
gmm_init_proc.communicate()
else:
job = self.jobs[0]
feature_string = job.construct_online_feature_proc_string()
feature_string = feature_string.replace(f".{job.id}.scp", ".scp")
feature_string = feature_string.replace(
str(job.corpus.current_subset_directory), str(job.corpus.data_directory)
)
gmm_init_proc = subprocess.Popen(
[
thirdparty_binary("gmm-global-init-from-feats"),
"--verbose=4",
f"--num-threads={GLOBAL_CONFIG.num_jobs}",
f"--num-frames={self.num_frames}",
f"--num_gauss={self.num_gaussians}",
f"--num_gauss_init={num_gauss_init}",
f"--num_iters={self.num_iterations_init}",
feature_string,
self.model_path,
],
stderr=log_file,
)
gmm_init_proc.communicate()
# Store Gaussian selection indices on disk
self.gmm_gselect()
parse_logs(self.working_log_directory)
[docs]
def acc_global_stats(self) -> None:
"""
Multiprocessing function that accumulates global GMM stats
See Also
--------
:func:`~montreal_forced_aligner.ivector.trainer.AccGlobalStatsFunction`
Multiprocessing helper function for each job
:meth:`.DubmTrainer.acc_global_stats_arguments`
Job method for generating arguments for the helper function
:kaldi_src:`gmm-global-sum-accs`
Relevant Kaldi binary
:kaldi_steps:`train_diag_ubm`
Reference Kaldi script
"""
begin = time.time()
logger.info("Accumulating global stats...")
arguments = self.acc_global_stats_arguments()
with tqdm(total=self.num_current_utterances, disable=GLOBAL_CONFIG.quiet) as pbar:
for _ in run_kaldi_function(AccGlobalStatsFunction, arguments, pbar.update):
pass
logger.debug(f"Accumulating stats took {time.time() - begin:.3f} seconds")
# Don't remove low-count Gaussians till the last tier,
# or gselect info won't be valid anymore
if self.iteration < self.num_iterations:
opt = "--remove-low-count-gaussians=false"
else:
opt = f"--remove-low-count-gaussians={self.remove_low_count_gaussians}"
log_path = self.working_log_directory.joinpath(f"update.{self.iteration}.log")
with mfa_open(log_path, "w") as log_file:
acc_files = []
for j in arguments:
acc_files.append(j.acc_path)
sum_proc = subprocess.Popen(
[thirdparty_binary("gmm-global-sum-accs"), "-"] + acc_files,
stderr=log_file,
stdout=subprocess.PIPE,
env=os.environ,
)
gmm_global_est_proc = subprocess.Popen(
[
thirdparty_binary("gmm-global-est"),
opt,
f"--min-gaussian-weight={self.min_gaussian_weight}",
self.model_path,
"-",
self.next_model_path,
],
stderr=log_file,
stdin=sum_proc.stdout,
env=os.environ,
)
gmm_global_est_proc.communicate()
# Clean up
if not GLOBAL_CONFIG.debug:
for p in acc_files:
os.remove(p)
@property
def exported_model_path(self) -> str:
"""Temporary model path to save intermediate model"""
return self.working_log_directory.joinpath("dubm_model.zip")
[docs]
def train_iteration(self) -> None:
"""
Run an iteration of UBM training
"""
# Accumulate stats
self.acc_global_stats()
self.iteration += 1
[docs]
def finalize_training(self) -> None:
"""Finalize DUBM training"""
final_dubm_path = self.working_directory.joinpath("final.dubm")
shutil.copy(
self.working_directory.joinpath(f"{self.num_iterations+1}.dubm"),
final_dubm_path,
)
# Update VAD with dubm likelihoods
self.export_model(self.exported_model_path)
wf = self.worker.current_workflow
with self.session() as session:
session.query(CorpusWorkflow).filter(CorpusWorkflow.id == wf.id).update({"done": True})
session.commit()
@property
def model_path(self) -> str:
"""Current iteration's DUBM model path"""
if self.training_complete:
return self.working_directory.joinpath("final.dubm")
return self.working_directory.joinpath(f"{self.iteration}.dubm")
@property
def next_model_path(self) -> str:
"""Next iteration's DUBM model path"""
if self.training_complete:
return self.working_directory.joinpath("final.dubm")
return self.working_directory.joinpath(f"{self.iteration + 1}.dubm")
[docs]
class IvectorTrainer(IvectorModelTrainingMixin, IvectorConfigMixin):
"""
Trainer for a block of ivector extractor training
Parameters
----------
num_iterations: int
Number of iterations, defaults to 10
subsample: int
Subsample factor for feature frames, defaults to 5
gaussian_min_count: int
See Also
--------
:class:`~montreal_forced_aligner.ivector.trainer.IvectorModelTrainingMixin`
For base parameters for ivector training
:class:`~montreal_forced_aligner.corpus.features.IvectorConfigMixin`
For parameters for ivector feature generation
"""
def __init__(
self, num_iterations: int = 10, subsample: int = 5, gaussian_min_count: int = 100, **kwargs
):
super().__init__(**kwargs)
self.subsample = subsample
self.num_iterations = num_iterations
self.gaussian_min_count = gaussian_min_count
@property
def exported_model_path(self) -> str:
"""Temporary directory path that trainer will save ivector extractor model"""
return self.working_log_directory.joinpath("ivector_model.zip")
[docs]
def acc_ivector_stats_arguments(self) -> List[AccIvectorStatsArguments]:
"""
Generate Job arguments for :func:`~montreal_forced_aligner.ivector.trainer.AccIvectorStatsFunction`
Returns
-------
list[:class:`~montreal_forced_aligner.ivector.trainer.AccIvectorStatsArguments`]
Arguments for processing
"""
arguments = []
for j in self.jobs:
arguments.append(
AccIvectorStatsArguments(
j.id,
getattr(self, "db_string", ""),
os.path.join(
self.working_log_directory, f"ivector_acc.{self.iteration}.{j.id}.log"
),
self.feature_options,
self.ivector_options,
self.ie_path,
j.construct_path(self.working_directory, "post", "ark"),
j.construct_path(self.working_directory, "ivector", "acc"),
)
)
return arguments
def _trainer_initialization(self) -> None:
"""Ivector extractor training initialization"""
self.iteration = 1
# Initialize job_name-vector extractor
log_directory = self.working_directory.joinpath("log")
log_path = os.path.join(log_directory, "init.log")
diag_ubm_path = self.working_directory.joinpath("final.dubm")
full_ubm_path = self.working_directory.joinpath("final.ubm")
if not os.path.exists(self.ie_path):
with mfa_open(log_path, "w") as log_file:
subprocess.check_call(
[thirdparty_binary("gmm-global-to-fgmm"), diag_ubm_path, full_ubm_path],
stderr=log_file,
)
subprocess.check_call(
[
thirdparty_binary("ivector-extractor-init"),
f"--ivector-dim={self.ivector_dimension}",
"--use-weights=false",
full_ubm_path,
self.ie_path,
],
stderr=log_file,
)
# Do Gaussian selection and posterior extraction
self.gauss_to_post()
parse_logs(log_directory)
[docs]
def gauss_to_post_arguments(self) -> List[GaussToPostArguments]:
"""
Generate Job arguments for :func:`~montreal_forced_aligner.ivector.trainer.GaussToPostFunction`
Returns
-------
list[:class:`~montreal_forced_aligner.ivector.trainer.GaussToPostArguments`]
Arguments for processing
"""
arguments = []
for j in self.jobs:
arguments.append(
GaussToPostArguments(
j.id,
getattr(self, "db_string", ""),
self.working_log_directory.joinpath(f"gauss_to_post.{j.id}.log"),
self.feature_options,
self.ivector_options,
j.construct_path(self.working_directory, "post", "ark"),
self.dubm_path,
)
)
return arguments
[docs]
def gauss_to_post(self) -> None:
"""
Multiprocessing function that does Gaussian selection and posterior extraction
See Also
--------
:func:`~montreal_forced_aligner.ivector.trainer.GaussToPostFunction`
Multiprocessing helper function for each job
:meth:`.IvectorTrainer.gauss_to_post_arguments`
Job method for generating arguments for the helper function
:kaldi_steps_sid:`train_ivector_extractor`
Reference Kaldi script
"""
begin = time.time()
logger.info("Extracting posteriors...")
arguments = self.gauss_to_post_arguments()
with tqdm(total=self.num_current_utterances, disable=GLOBAL_CONFIG.quiet) as pbar:
for _ in run_kaldi_function(GaussToPostFunction, arguments, pbar.update):
pass
logger.debug(f"Extracting posteriors took {time.time() - begin:.3f} seconds")
@property
def train_type(self) -> str:
"""Training identifier"""
return "ivector"
@property
def ivector_options(self) -> MetaDict:
"""Options for ivector training and extracting"""
options = super().ivector_options
options["subsample"] = self.subsample
return options
@property
def meta(self) -> MetaDict:
"""Metadata information for ivector extractor models"""
from ..utils import get_mfa_version
return {
"version": get_mfa_version(),
"ivector_dimension": self.ivector_dimension,
"num_gselect": self.num_gselect,
"min_post": self.min_post,
"posterior_scale": self.posterior_scale,
"features": self.feature_options,
}
@property
def ie_path(self) -> str:
"""Current ivector extractor model path"""
if self.training_complete:
return self.working_directory.joinpath("final.ie")
return self.working_directory.joinpath(f"{self.iteration}.ie")
@property
def next_ie_path(self) -> str:
"""Next iteration's ivector extractor model path"""
if self.training_complete:
return self.working_directory.joinpath("final.ie")
return self.working_directory.joinpath(f"{self.iteration + 1}.ie")
@property
def dubm_path(self) -> str:
"""DUBM model path"""
return self.working_directory.joinpath("final.dubm")
[docs]
def acc_ivector_stats(self) -> None:
"""
Multiprocessing function that accumulates ivector extraction stats.
See Also
--------
:func:`~montreal_forced_aligner.ivector.trainer.AccIvectorStatsFunction`
Multiprocessing helper function for each job
:meth:`.IvectorTrainer.acc_ivector_stats_arguments`
Job method for generating arguments for the helper function
:kaldi_src:`ivector-extractor-sum-accs`
Relevant Kaldi binary
:kaldi_src:`ivector-extractor-est`
Relevant Kaldi binary
:kaldi_steps_sid:`train_ivector_extractor`
Reference Kaldi script
"""
begin = time.time()
logger.info("Accumulating ivector stats...")
arguments = self.acc_ivector_stats_arguments()
with tqdm(total=self.worker.num_utterances, disable=GLOBAL_CONFIG.quiet) as pbar:
for _ in run_kaldi_function(AccIvectorStatsFunction, arguments, pbar.update):
pass
logger.debug(f"Accumulating stats took {time.time() - begin:.3f} seconds")
log_path = self.working_log_directory.joinpath(f"sum_acc.{self.iteration}.log")
acc_path = self.working_directory.joinpath(f"acc.{self.iteration}")
with mfa_open(log_path, "w") as log_file:
accinits = []
for j in arguments:
accinits.append(j.acc_path)
sum_accs_proc = subprocess.Popen(
[thirdparty_binary("ivector-extractor-sum-accs"), "--parallel=true"]
+ accinits
+ [acc_path],
stderr=log_file,
env=os.environ,
)
sum_accs_proc.communicate()
# clean up
for p in accinits:
if os.path.exists(p):
os.remove(p)
# Est extractor
log_path = self.working_log_directory.joinpath(f"update.{self.iteration}.log")
with mfa_open(log_path, "w") as log_file:
extractor_est_proc = subprocess.Popen(
[
thirdparty_binary("ivector-extractor-est"),
f"--num-threads={len(self.jobs)}",
f"--gaussian-min-count={self.gaussian_min_count}",
self.ie_path,
self.working_directory.joinpath(f"acc.{self.iteration}"),
self.next_ie_path,
],
stderr=subprocess.PIPE,
env=os.environ,
)
iteration_improvement = None
explained_variance = None
for line in extractor_est_proc.stderr:
line = line.decode("utf8")
log_file.write(line)
log_file.flush()
m = re.match(
r"LOG.*Overall objective-function improvement per frame was (?P<improvement>[0-9.]+)",
line,
)
if m:
iteration_improvement = float(m.group("improvement"))
m = re.match(
r"LOG.*variance explained by the iVectors is (?P<variance>[0-9.]+)\.", line
)
if m:
explained_variance = float(m.group("variance"))
extractor_est_proc.wait()
logger.debug(
f"For iteration {self.iteration}, objective-function improvement was {iteration_improvement} per frame and variance explained by ivectors was {explained_variance}."
)
[docs]
def train_iteration(self) -> None:
"""
Run an iteration of training
"""
if not os.path.exists(self.next_ie_path):
# Accumulate stats and sum
self.acc_ivector_stats()
self.iteration += 1
[docs]
def finalize_training(self) -> None:
"""
Finalize ivector extractor training
"""
# Rename to final
shutil.copy(
self.working_directory.joinpath(f"{self.num_iterations}.ie"),
self.working_directory.joinpath("final.ie"),
)
self.export_model(self.exported_model_path)
wf = self.worker.current_workflow
with self.session() as session:
session.query(CorpusWorkflow).filter(CorpusWorkflow.id == wf.id).update({"done": True})
session.commit()
class PldaTrainer(IvectorTrainer):
"""
Trainer for a PLDA models
"""
worker: TrainableIvectorExtractor
def __init__(self, **kwargs):
super().__init__(**kwargs)
def _trainer_initialization(self) -> None:
"""No initialization"""
pass
def compute_lda(self):
lda_path = self.working_directory.joinpath("ivector_lda.mat")
log_path = self.working_log_directory.joinpath("lda.log")
utt2spk_path = os.path.join(self.corpus_output_directory, "utt2spk.scp")
with tqdm(total=self.worker.num_utterances, disable=GLOBAL_CONFIG.quiet) as pbar, mfa_open(
log_path, "w"
) as log_file:
normalize_proc = subprocess.Popen(
[
thirdparty_binary("ivector-normalize-length"),
f"scp:{self.worker.utterance_ivector_path}",
"ark:-",
],
stdout=subprocess.PIPE,
stderr=log_file,
env=os.environ,
)
lda_compute_proc = subprocess.Popen(
[
thirdparty_binary("ivector-compute-lda"),
f"--dim={PLDA_DIMENSION}",
"--total-covariance-factor=0.1",
"ark:-",
f"ark:{utt2spk_path}",
lda_path,
],
stdin=subprocess.PIPE,
stderr=log_file,
env=os.environ,
)
for line in normalize_proc.stdout:
lda_compute_proc.stdin.write(line)
lda_compute_proc.stdin.flush()
pbar.update(1)
lda_compute_proc.stdin.close()
lda_compute_proc.wait()
assert os.path.exists(lda_path)
def train(self):
"""Train PLDA"""
self.compute_lda()
self.worker.compute_plda()
self.worker.compute_speaker_ivectors()
os.rename(
self.working_directory.joinpath("current_speaker_ivectors.ark"),
self.working_directory.joinpath("speaker_ivectors.ark"),
)
os.rename(
self.working_directory.joinpath("current_num_utts.ark"),
self.working_directory.joinpath("num_utts.ark"),
)