"""Class definition for TrainableIvectorExtractor"""
from __future__ import annotations
import logging
import os
import re
import shutil
import subprocess
import time
import typing
from typing import Any, Dict, List, Optional, Tuple
import tqdm
from montreal_forced_aligner.abc import MetaDict, ModelExporterMixin, TopLevelMfaWorker
from montreal_forced_aligner.acoustic_modeling.base import AcousticModelTrainingMixin
from montreal_forced_aligner.config import GLOBAL_CONFIG, PLDA_DIMENSION
from montreal_forced_aligner.corpus.features import IvectorConfigMixin
from montreal_forced_aligner.corpus.ivector_corpus import IvectorCorpusMixin
from montreal_forced_aligner.data import WorkflowType
from montreal_forced_aligner.db import CorpusWorkflow
from montreal_forced_aligner.exceptions import ConfigError, KaldiProcessingError
from montreal_forced_aligner.helper import load_configuration, mfa_open
from montreal_forced_aligner.ivector.multiprocessing import (
AccGlobalStatsArguments,
AccGlobalStatsFunction,
AccIvectorStatsArguments,
AccIvectorStatsFunction,
GaussToPostArguments,
GaussToPostFunction,
GmmGselectArguments,
GmmGselectFunction,
)
from montreal_forced_aligner.models import IvectorExtractorModel
from montreal_forced_aligner.utils import (
log_kaldi_errors,
parse_logs,
run_kaldi_function,
thirdparty_binary,
)
__all__ = [
"TrainableIvectorExtractor",
"DubmTrainer",
"IvectorTrainer",
"IvectorModelTrainingMixin",
]
logger = logging.getLogger("mfa")
[docs]
class IvectorModelTrainingMixin(AcousticModelTrainingMixin):
"""
Abstract mixin for training ivector extractor models
See Also
--------
:class:`~montreal_forced_aligner.acoustic_modeling.base.AcousticModelTrainingMixin`
For acoustic model training parsing parameters
"""
@property
def meta(self) -> MetaDict:
"""Generate metadata for the acoustic model that was trained"""
from datetime import datetime
from ..utils import get_mfa_version
data = {
"version": get_mfa_version(),
"architecture": self.architecture,
"train_date": str(datetime.now()),
"features": self.feature_options,
}
return data
[docs]
def compute_calculated_properties(self) -> None:
"""Not implemented"""
pass
[docs]
def export_model(self, output_model_path: str) -> None:
"""
Output IvectorExtractor model
Parameters
----------
output_model_path : str
Path to save ivector extractor model
"""
directory, filename = os.path.split(output_model_path)
basename, _ = os.path.splitext(filename)
ivector_extractor = IvectorExtractorModel.empty(basename, self.working_log_directory)
ivector_extractor.add_meta_file(self)
ivector_extractor.add_model(self.working_directory)
if directory:
os.makedirs(directory, exist_ok=True)
basename, _ = os.path.splitext(output_model_path)
ivector_extractor.dump(basename)
[docs]
class DubmTrainer(IvectorModelTrainingMixin):
"""
Trainer for diagonal universal background models
Parameters
----------
num_iterations : int
Number of training iterations to perform, defaults to 4
num_gselect: int
Number of Gaussian-selection indices to use while training
subsample: int
Subsample factor for feature frames, defaults to 5
num_frames:int
Number of frames to keep in memory for initialization, defaults to 500000
num_gaussians:int
Number of gaussians to use for DUBM training, defaults to 256
num_iterations_init:int
Number of iteration to use when initializing UBM, defaults to 20
initial_gaussian_proportion:float
Proportion of total gaussians to use initially, defaults to 0.5
min_gaussian_weight: float
Defaults to 0.0001
remove_low_count_gaussians: bool
Flag for removing low count gaussians in the final round of training, defaults to True
See Also
--------
:class:`~montreal_forced_aligner.ivector.trainer.IvectorModelTrainingMixin`
For base ivector training parameters
"""
def __init__(
self,
num_iterations: int = 4,
num_gselect: int = 30,
subsample: int = 5,
num_frames: int = 500000,
num_gaussians: int = 256,
num_iterations_init: int = 20,
initial_gaussian_proportion: float = 0.5,
min_gaussian_weight: float = 0.0001,
remove_low_count_gaussians: bool = True,
**kwargs,
):
super().__init__(**kwargs)
self.num_iterations = num_iterations
self.subsample = subsample
self.num_gselect = num_gselect
self.num_frames = num_frames
self.num_gaussians = num_gaussians
self.num_iterations_init = num_iterations_init
self.initial_gaussian_proportion = initial_gaussian_proportion
self.min_gaussian_weight = min_gaussian_weight
self.remove_low_count_gaussians = remove_low_count_gaussians
self.use_alignment_features = False
@property
def train_type(self) -> str:
"""Training identifier"""
return "dubm"
@property
def dubm_options(self) -> MetaDict:
"""Options for DUBM training"""
return {"subsample": self.subsample, "num_gselect": self.num_gselect}
[docs]
def gmm_gselect_arguments(self) -> List[GmmGselectArguments]:
"""
Generate Job arguments for :func:`~montreal_forced_aligner.ivector.trainer.GmmGselectFunction`
Returns
-------
list[:class:`~montreal_forced_aligner.ivector.trainer.GmmGselectArguments`]
Arguments for processing
"""
arguments = []
for j in self.jobs:
arguments.append(
GmmGselectArguments(
j.id,
getattr(self, "db_string", ""),
os.path.join(self.working_log_directory, f"gmm_gselect.{j.id}.log"),
self.feature_options,
self.dubm_options,
self.model_path,
j.construct_path(self.working_directory, "gselect", "ark"),
)
)
return arguments
[docs]
def acc_global_stats_arguments(
self,
) -> List[AccGlobalStatsArguments]:
"""
Generate Job arguments for :func:`~montreal_forced_aligner.ivector.trainer.AccGlobalStatsFunction`
Returns
-------
list[:class:`~montreal_forced_aligner.ivector.trainer.AccGlobalStatsArguments`]
Arguments for processing
"""
arguments = []
for j in self.jobs:
arguments.append(
AccGlobalStatsArguments(
j.id,
getattr(self, "db_string", ""),
os.path.join(
self.working_log_directory,
f"acc_global_stats.{self.iteration}.{j.id}.log",
),
self.feature_options,
self.dubm_options,
j.construct_path(self.working_directory, "gselect", "ark"),
j.construct_path(self.working_directory, f"global.{self.iteration}", "acc"),
self.model_path,
)
)
return arguments
[docs]
def gmm_gselect(self) -> None:
"""
Multiprocessing function that stores Gaussian selection indices on disk
See Also
--------
:func:`~montreal_forced_aligner.ivector.trainer.GmmGselectFunction`
Multiprocessing helper function for each job
:meth:`.DubmTrainer.gmm_gselect_arguments`
Job method for generating arguments for the helper function
:kaldi_steps:`train_diag_ubm`
Reference Kaldi script
"""
begin = time.time()
logger.info("Selecting gaussians...")
arguments = self.gmm_gselect_arguments()
with tqdm.tqdm(
total=int(self.num_current_utterances / 10), disable=GLOBAL_CONFIG.quiet
) as pbar:
for _ in run_kaldi_function(GmmGselectFunction, arguments, pbar.update):
pass
logger.debug(f"Gaussian selection took {time.time() - begin:.3f} seconds")
def _trainer_initialization(self, initial_alignment_directory: Optional[str] = None) -> None:
"""DUBM training initialization"""
log_path = os.path.join(self.working_log_directory, "gmm_init.log")
with self.session() as session, mfa_open(log_path, "w") as log_file:
alignment_workflow: CorpusWorkflow = (
session.query(CorpusWorkflow)
.filter(CorpusWorkflow.workflow_type == WorkflowType.alignment)
.first()
)
num_gauss_init = int(self.initial_gaussian_proportion * int(self.num_gaussians))
self.iteration = 1
if self.use_alignment_features and alignment_workflow is not None:
model_path = os.path.join(alignment_workflow.working_directory, "final.mdl")
occs_path = os.path.join(alignment_workflow.working_directory, "final.occs")
gmm_init_proc = subprocess.Popen(
[
thirdparty_binary("init-ubm"),
"--fullcov=false",
"--intermediate-num-gauss=2000",
f"--num-frames={self.num_frames}",
f"--ubm-num-gauss={self.num_gaussians}",
model_path,
occs_path,
self.model_path,
],
stderr=log_file,
)
gmm_init_proc.communicate()
else:
job = self.jobs[0]
feature_string = job.construct_online_feature_proc_string()
feature_string = feature_string.replace(f".{job.id}.scp", ".scp")
feature_string = feature_string.replace(
job.corpus.current_subset_directory, job.corpus.data_directory
)
gmm_init_proc = subprocess.Popen(
[
thirdparty_binary("gmm-global-init-from-feats"),
"--verbose=4",
f"--num-threads={GLOBAL_CONFIG.num_jobs}",
f"--num-frames={self.num_frames}",
f"--num_gauss={self.num_gaussians}",
f"--num_gauss_init={num_gauss_init}",
f"--num_iters={self.num_iterations_init}",
feature_string,
self.model_path,
],
stderr=log_file,
)
gmm_init_proc.communicate()
# Store Gaussian selection indices on disk
self.gmm_gselect()
parse_logs(self.working_log_directory)
[docs]
def acc_global_stats(self) -> None:
"""
Multiprocessing function that accumulates global GMM stats
See Also
--------
:func:`~montreal_forced_aligner.ivector.trainer.AccGlobalStatsFunction`
Multiprocessing helper function for each job
:meth:`.DubmTrainer.acc_global_stats_arguments`
Job method for generating arguments for the helper function
:kaldi_src:`gmm-global-sum-accs`
Relevant Kaldi binary
:kaldi_steps:`train_diag_ubm`
Reference Kaldi script
"""
begin = time.time()
logger.info("Accumulating global stats...")
arguments = self.acc_global_stats_arguments()
with tqdm.tqdm(total=self.num_current_utterances, disable=GLOBAL_CONFIG.quiet) as pbar:
for _ in run_kaldi_function(AccGlobalStatsFunction, arguments, pbar.update):
pass
logger.debug(f"Accumulating stats took {time.time() - begin:.3f} seconds")
# Don't remove low-count Gaussians till the last tier,
# or gselect info won't be valid anymore
if self.iteration < self.num_iterations:
opt = "--remove-low-count-gaussians=false"
else:
opt = f"--remove-low-count-gaussians={self.remove_low_count_gaussians}"
log_path = os.path.join(self.working_log_directory, f"update.{self.iteration}.log")
with mfa_open(log_path, "w") as log_file:
acc_files = []
for j in arguments:
acc_files.append(j.acc_path)
sum_proc = subprocess.Popen(
[thirdparty_binary("gmm-global-sum-accs"), "-"] + acc_files,
stderr=log_file,
stdout=subprocess.PIPE,
env=os.environ,
)
gmm_global_est_proc = subprocess.Popen(
[
thirdparty_binary("gmm-global-est"),
opt,
f"--min-gaussian-weight={self.min_gaussian_weight}",
self.model_path,
"-",
self.next_model_path,
],
stderr=log_file,
stdin=sum_proc.stdout,
env=os.environ,
)
gmm_global_est_proc.communicate()
# Clean up
if not GLOBAL_CONFIG.debug:
for p in acc_files:
os.remove(p)
@property
def exported_model_path(self) -> str:
"""Temporary model path to save intermediate model"""
return os.path.join(self.working_log_directory, "dubm_model.zip")
[docs]
def train_iteration(self) -> None:
"""
Run an iteration of UBM training
"""
# Accumulate stats
self.acc_global_stats()
self.iteration += 1
[docs]
def finalize_training(self) -> None:
"""Finalize DUBM training"""
final_dubm_path = os.path.join(self.working_directory, "final.dubm")
shutil.copy(
os.path.join(self.working_directory, f"{self.num_iterations+1}.dubm"),
final_dubm_path,
)
# Update VAD with dubm likelihoods
self.export_model(self.exported_model_path)
wf = self.worker.current_workflow
with self.session() as session:
session.query(CorpusWorkflow).filter(CorpusWorkflow.id == wf.id).update({"done": True})
session.commit()
@property
def model_path(self) -> str:
"""Current iteration's DUBM model path"""
if self.training_complete:
return os.path.join(self.working_directory, "final.dubm")
return os.path.join(self.working_directory, f"{self.iteration}.dubm")
@property
def next_model_path(self) -> str:
"""Next iteration's DUBM model path"""
if self.training_complete:
return os.path.join(self.working_directory, "final.dubm")
return os.path.join(self.working_directory, f"{self.iteration + 1}.dubm")
[docs]
class IvectorTrainer(IvectorModelTrainingMixin, IvectorConfigMixin):
"""
Trainer for a block of ivector extractor training
Parameters
----------
num_iterations: int
Number of iterations, defaults to 10
subsample: int
Subsample factor for feature frames, defaults to 5
gaussian_min_count: int
See Also
--------
:class:`~montreal_forced_aligner.ivector.trainer.IvectorModelTrainingMixin`
For base parameters for ivector training
:class:`~montreal_forced_aligner.corpus.features.IvectorConfigMixin`
For parameters for ivector feature generation
"""
def __init__(
self, num_iterations: int = 10, subsample: int = 5, gaussian_min_count: int = 100, **kwargs
):
super().__init__(**kwargs)
self.subsample = subsample
self.num_iterations = num_iterations
self.gaussian_min_count = gaussian_min_count
@property
def exported_model_path(self) -> str:
"""Temporary directory path that trainer will save ivector extractor model"""
return os.path.join(self.working_log_directory, "ivector_model.zip")
[docs]
def acc_ivector_stats_arguments(self) -> List[AccIvectorStatsArguments]:
"""
Generate Job arguments for :func:`~montreal_forced_aligner.ivector.trainer.AccIvectorStatsFunction`
Returns
-------
list[:class:`~montreal_forced_aligner.ivector.trainer.AccIvectorStatsArguments`]
Arguments for processing
"""
arguments = []
for j in self.jobs:
arguments.append(
AccIvectorStatsArguments(
j.id,
getattr(self, "db_string", ""),
os.path.join(
self.working_log_directory, f"ivector_acc.{self.iteration}.{j.id}.log"
),
self.feature_options,
self.ivector_options,
self.ie_path,
j.construct_path(self.working_directory, "post", "ark"),
j.construct_path(self.working_directory, "ivector", "acc"),
)
)
return arguments
def _trainer_initialization(self) -> None:
"""Ivector extractor training initialization"""
self.iteration = 1
# Initialize job_name-vector extractor
log_directory = os.path.join(self.working_directory, "log")
log_path = os.path.join(log_directory, "init.log")
diag_ubm_path = os.path.join(self.working_directory, "final.dubm")
full_ubm_path = os.path.join(self.working_directory, "final.ubm")
if not os.path.exists(self.ie_path):
with mfa_open(log_path, "w") as log_file:
subprocess.check_call(
[thirdparty_binary("gmm-global-to-fgmm"), diag_ubm_path, full_ubm_path],
stderr=log_file,
)
subprocess.check_call(
[
thirdparty_binary("ivector-extractor-init"),
f"--ivector-dim={self.ivector_dimension}",
"--use-weights=false",
full_ubm_path,
self.ie_path,
],
stderr=log_file,
)
# Do Gaussian selection and posterior extraction
self.gauss_to_post()
parse_logs(log_directory)
[docs]
def gauss_to_post_arguments(self) -> List[GaussToPostArguments]:
"""
Generate Job arguments for :func:`~montreal_forced_aligner.ivector.trainer.GaussToPostFunction`
Returns
-------
list[:class:`~montreal_forced_aligner.ivector.trainer.GaussToPostArguments`]
Arguments for processing
"""
arguments = []
for j in self.jobs:
arguments.append(
GaussToPostArguments(
j.id,
getattr(self, "db_string", ""),
os.path.join(self.working_log_directory, f"gauss_to_post.{j.id}.log"),
self.feature_options,
self.ivector_options,
j.construct_path(self.working_directory, "post", "ark"),
self.dubm_path,
)
)
return arguments
[docs]
def gauss_to_post(self) -> None:
"""
Multiprocessing function that does Gaussian selection and posterior extraction
See Also
--------
:func:`~montreal_forced_aligner.ivector.trainer.GaussToPostFunction`
Multiprocessing helper function for each job
:meth:`.IvectorTrainer.gauss_to_post_arguments`
Job method for generating arguments for the helper function
:kaldi_steps_sid:`train_ivector_extractor`
Reference Kaldi script
"""
begin = time.time()
logger.info("Extracting posteriors...")
arguments = self.gauss_to_post_arguments()
with tqdm.tqdm(total=self.num_current_utterances, disable=GLOBAL_CONFIG.quiet) as pbar:
for _ in run_kaldi_function(GaussToPostFunction, arguments, pbar.update):
pass
logger.debug(f"Extracting posteriors took {time.time() - begin:.3f} seconds")
@property
def train_type(self) -> str:
"""Training identifier"""
return "ivector"
@property
def ivector_options(self) -> MetaDict:
"""Options for ivector training and extracting"""
options = super().ivector_options
options["subsample"] = self.subsample
return options
@property
def meta(self) -> MetaDict:
"""Metadata information for ivector extractor models"""
from ..utils import get_mfa_version
return {
"version": get_mfa_version(),
"ivector_dimension": self.ivector_dimension,
"num_gselect": self.num_gselect,
"min_post": self.min_post,
"posterior_scale": self.posterior_scale,
"features": self.feature_options,
}
@property
def ie_path(self) -> str:
"""Current ivector extractor model path"""
if self.training_complete:
return os.path.join(self.working_directory, "final.ie")
return os.path.join(self.working_directory, f"{self.iteration}.ie")
@property
def next_ie_path(self) -> str:
"""Next iteration's ivector extractor model path"""
if self.training_complete:
return os.path.join(self.working_directory, "final.ie")
return os.path.join(self.working_directory, f"{self.iteration + 1}.ie")
@property
def dubm_path(self) -> str:
"""DUBM model path"""
return os.path.join(self.working_directory, "final.dubm")
[docs]
def acc_ivector_stats(self) -> None:
"""
Multiprocessing function that accumulates ivector extraction stats.
See Also
--------
:func:`~montreal_forced_aligner.ivector.trainer.AccIvectorStatsFunction`
Multiprocessing helper function for each job
:meth:`.IvectorTrainer.acc_ivector_stats_arguments`
Job method for generating arguments for the helper function
:kaldi_src:`ivector-extractor-sum-accs`
Relevant Kaldi binary
:kaldi_src:`ivector-extractor-est`
Relevant Kaldi binary
:kaldi_steps_sid:`train_ivector_extractor`
Reference Kaldi script
"""
begin = time.time()
logger.info("Accumulating ivector stats...")
arguments = self.acc_ivector_stats_arguments()
with tqdm.tqdm(total=self.worker.num_utterances, disable=GLOBAL_CONFIG.quiet) as pbar:
for _ in run_kaldi_function(AccIvectorStatsFunction, arguments, pbar.update):
pass
logger.debug(f"Accumulating stats took {time.time() - begin:.3f} seconds")
log_path = os.path.join(self.working_log_directory, f"sum_acc.{self.iteration}.log")
acc_path = os.path.join(self.working_directory, f"acc.{self.iteration}")
with mfa_open(log_path, "w") as log_file:
accinits = []
for j in arguments:
accinits.append(j.acc_path)
sum_accs_proc = subprocess.Popen(
[thirdparty_binary("ivector-extractor-sum-accs"), "--parallel=true"]
+ accinits
+ [acc_path],
stderr=log_file,
env=os.environ,
)
sum_accs_proc.communicate()
# clean up
for p in accinits:
if os.path.exists(p):
os.remove(p)
# Est extractor
log_path = os.path.join(self.working_log_directory, f"update.{self.iteration}.log")
with mfa_open(log_path, "w") as log_file:
extractor_est_proc = subprocess.Popen(
[
thirdparty_binary("ivector-extractor-est"),
f"--num-threads={len(self.jobs)}",
f"--gaussian-min-count={self.gaussian_min_count}",
self.ie_path,
os.path.join(self.working_directory, f"acc.{self.iteration}"),
self.next_ie_path,
],
stderr=subprocess.PIPE,
env=os.environ,
)
iteration_improvement = None
explained_variance = None
for line in extractor_est_proc.stderr:
line = line.decode("utf8")
log_file.write(line)
log_file.flush()
m = re.match(
r"LOG.*Overall objective-function improvement per frame was (?P<improvement>[0-9.]+)",
line,
)
if m:
iteration_improvement = float(m.group("improvement"))
m = re.match(
r"LOG.*variance explained by the iVectors is (?P<variance>[0-9.]+)\.", line
)
if m:
explained_variance = float(m.group("variance"))
extractor_est_proc.wait()
logger.debug(
f"For iteration {self.iteration}, objective-function improvement was {iteration_improvement} per frame and variance explained by ivectors was {explained_variance}."
)
[docs]
def train_iteration(self) -> None:
"""
Run an iteration of training
"""
if not os.path.exists(self.next_ie_path):
# Accumulate stats and sum
self.acc_ivector_stats()
self.iteration += 1
[docs]
def finalize_training(self) -> None:
"""
Finalize ivector extractor training
"""
# Rename to final
shutil.copy(
os.path.join(self.working_directory, f"{self.num_iterations}.ie"),
os.path.join(self.working_directory, "final.ie"),
)
self.export_model(self.exported_model_path)
wf = self.worker.current_workflow
with self.session() as session:
session.query(CorpusWorkflow).filter(CorpusWorkflow.id == wf.id).update({"done": True})
session.commit()
class PldaTrainer(IvectorTrainer):
"""
Trainer for a PLDA models
"""
worker: TrainableIvectorExtractor
def __init__(self, **kwargs):
super().__init__(**kwargs)
def _trainer_initialization(self) -> None:
"""No initialization"""
pass
def compute_lda(self):
lda_path = os.path.join(self.working_directory, "ivector_lda.mat")
log_path = os.path.join(self.working_log_directory, "lda.log")
utt2spk_path = os.path.join(self.corpus_output_directory, "utt2spk.scp")
with tqdm.tqdm(
total=self.worker.num_utterances, disable=GLOBAL_CONFIG.quiet
) as pbar, mfa_open(log_path, "w") as log_file:
normalize_proc = subprocess.Popen(
[
thirdparty_binary("ivector-normalize-length"),
f"scp:{self.worker.utterance_ivector_path}",
"ark:-",
],
stdout=subprocess.PIPE,
stderr=log_file,
env=os.environ,
)
lda_compute_proc = subprocess.Popen(
[
thirdparty_binary("ivector-compute-lda"),
f"--dim={PLDA_DIMENSION}",
"--total-covariance-factor=0.1",
"ark:-",
f"ark:{utt2spk_path}",
lda_path,
],
stdin=subprocess.PIPE,
stderr=log_file,
env=os.environ,
)
for line in normalize_proc.stdout:
lda_compute_proc.stdin.write(line)
lda_compute_proc.stdin.flush()
pbar.update(1)
lda_compute_proc.stdin.close()
lda_compute_proc.wait()
assert os.path.exists(lda_path)
def train(self):
"""Train PLDA"""
self.compute_lda()
self.worker.compute_plda()
self.worker.compute_speaker_ivectors()
os.rename(
os.path.join(self.working_directory, "current_speaker_ivectors.ark"),
os.path.join(self.working_directory, "speaker_ivectors.ark"),
)
os.rename(
os.path.join(self.working_directory, "current_num_utts.ark"),
os.path.join(self.working_directory, "num_utts.ark"),
)