"""Classes for corpora that use ivectors as features"""
import logging
import math
import os
import time
import typing
from pathlib import Path
from typing import List
import numpy as np
import sqlalchemy
from _kalpy.ivector import (
Plda,
PldaEstimationConfig,
PldaEstimator,
PldaStats,
PldaUnsupervisedAdaptor,
PldaUnsupervisedAdaptorConfig,
ivector_normalize_length,
ivector_subtract_mean,
)
from _kalpy.matrix import DoubleMatrix, DoubleVector, FloatVector
from _kalpy.util import BaseFloatVectorWriter, SequentialBaseFloatVectorReader
from kalpy.data import KaldiMapping
from kalpy.ivector.data import IvectorArchive
from kalpy.utils import (
generate_read_specifier,
generate_write_specifier,
kalpy_logger,
read_kaldi_object,
write_kaldi_object,
)
from tqdm.rich import tqdm
from montreal_forced_aligner import config
from montreal_forced_aligner.corpus.acoustic_corpus import AcousticCorpusMixin
from montreal_forced_aligner.corpus.features import (
ExtractIvectorsArguments,
ExtractIvectorsFunction,
IvectorConfigMixin,
)
from montreal_forced_aligner.data import WorkflowType
from montreal_forced_aligner.db import Corpus, Speaker, Utterance, bulk_update
from montreal_forced_aligner.exceptions import IvectorTrainingError
from montreal_forced_aligner.helper import mfa_open
from montreal_forced_aligner.utils import run_kaldi_function
__all__ = ["IvectorCorpusMixin"]
logger = logging.getLogger("mfa")
[docs]
class IvectorCorpusMixin(AcousticCorpusMixin, IvectorConfigMixin):
"""
Abstract corpus mixin for corpora that extract ivectors
See Also
--------
:class:`~montreal_forced_aligner.corpus.acoustic_corpus.AcousticCorpusMixin`
For corpus parsing parameters
:class:`~montreal_forced_aligner.corpus.features.IvectorConfigMixin`
For ivector extraction parameters
"""
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.plda: typing.Optional[Plda] = None
self.transcriptions_required = False
@property
def ie_path(self) -> Path:
"""Ivector extractor ie path"""
return self.working_directory.joinpath("final.ie")
@property
def dubm_path(self) -> Path:
"""DUBM model path"""
return self.working_directory.joinpath("final.dubm")
@property
def utterance_ivector_path(self) -> Path:
"""Path to scp file containing all ivectors"""
return self.corpus_output_directory.joinpath("ivectors.scp")
@property
def adapted_plda_path(self) -> Path:
"""Path to adapted PLDA model"""
return self.working_directory.joinpath("plda_adapted")
@property
def plda_path(self) -> Path:
"""Path to trained PLDA model"""
if self.adapted_plda_path.exists():
return self.adapted_plda_path
return self.working_directory.joinpath("plda")
[docs]
def adapt_plda(self) -> None:
"""Adapted a trained PLDA model with new ivectors"""
if not os.path.exists(self.utterance_ivector_path):
self.extract_ivectors()
config = PldaUnsupervisedAdaptorConfig()
plda = read_kaldi_object(Plda, self.plda_path)
adaptor = PldaUnsupervisedAdaptor()
reader = SequentialBaseFloatVectorReader(
generate_read_specifier(self.utterance_ivector_path)
)
while not reader.Done():
adaptor.AddStats(1.0, reader.Value())
reader.Next()
reader.Close()
adaptor.UpdatePlda(config, plda)
write_kaldi_object(plda, self.adapted_plda_path)
[docs]
def compute_speaker_ivectors(self) -> None:
"""Calculated and save per-speaker ivectors as the mean over their utterances"""
if not self.has_ivectors():
self.extract_ivectors()
speaker_ivector_ark_path = os.path.join(
self.working_directory, "current_speaker_ivectors.ark"
)
self._write_spk2utt()
spk2utt_path = os.path.join(self.corpus_output_directory, "spk2utt.scp")
log_path = self.working_log_directory.joinpath("speaker_ivectors.log")
num_utts_path = self.working_directory.joinpath("current_num_utts.ark")
logger.info("Computing speaker ivectors...")
if self.stopped.is_set():
logger.debug("Speaker ivector computation stopped early.")
return
with self.session() as session, tqdm(
total=self.num_utterances, disable=config.QUIET
) as pbar, mfa_open(num_utts_path, "w") as num_utts_archive, kalpy_logger(
"kalpy.ivector", log_path
):
speaker_mean_archive = BaseFloatVectorWriter(
generate_write_specifier(speaker_ivector_ark_path)
)
spk2utt = KaldiMapping(list_mapping=True)
spk2utt.load(spk2utt_path)
query = (
session.query(Speaker.id, Utterance.ivector_ark)
.join(Utterance.speaker)
.order_by(Speaker.id)
)
current_speaker = None
utt_count = 0
speaker_mean = FloatVector(config.IVECTOR_DIMENSION)
for speaker_id, ivector_path in query:
if current_speaker is None:
current_speaker = speaker_id
if speaker_id != current_speaker:
speaker_mean.Scale(1.0 / utt_count)
speaker_mean_archive.Write(str(speaker_id), speaker_mean)
num_utts_archive.write(f"{speaker_id} {utt_count}\n")
speaker_mean = FloatVector(config.IVECTOR_DIMENSION)
utt_count = 0
current_speaker = speaker_id
pbar.update(1)
ivector = read_kaldi_object(FloatVector, ivector_path)
# ivector-normalize-length
ivector_normalize_length(ivector)
utt_count += 1
speaker_mean.AddVec(1.0, ivector)
speaker_mean_archive.Close()
self.collect_speaker_ivectors()
[docs]
def compute_plda(self, minimum_utterance_count: int = 1) -> None:
"""Train a PLDA model"""
if not self.has_any_ivectors():
raise Exception("Must have either ivectors or xvectors calculated to compute PLDA.")
begin = time.time()
self.create_new_current_workflow(WorkflowType.speaker_diarization)
plda_path = self.working_directory.joinpath("plda")
log_path = self.working_log_directory.joinpath("plda.log")
logger.info("Computing PLDA...")
self.stopped.clear()
if self.stopped.is_set():
logger.debug("PLDA computation stopped early.")
return
use_xvectors = self.has_xvectors()
if use_xvectors:
dim = config.XVECTOR_DIMENSION
else:
dim = config.IVECTOR_DIMENSION
with tqdm(
total=self.num_utterances, disable=config.QUIET
) as pbar, self.session() as session, kalpy_logger(
"kalpy.ivector", log_path
) as ivector_logger:
plda_config = PldaEstimationConfig()
plda_stats = PldaStats()
num_utt_done = 0
num_spk_done = 0
num_spk_err = 0
num_utt_err = 0
c = session.query(Corpus).first()
c.plda_calculated = False
update_mapping = []
speaker_query = (
session.query(Speaker.id)
.join(Utterance.speaker)
.filter(c.utterance_ivector_column != None) # noqa
.group_by(Speaker.id)
.having(sqlalchemy.func.count() >= minimum_utterance_count)
)
for (speaker_id,) in speaker_query:
utterance_query = session.query(c.utterance_ivector_column).filter(
Utterance.speaker_id == speaker_id,
)
num_utterances = utterance_query.count()
if num_utterances < minimum_utterance_count:
num_spk_err += 1
continue
ivector_mat = DoubleMatrix(num_utterances, dim)
for i, (ivector,) in enumerate(utterance_query):
if ivector is None:
num_utt_err += 1
continue
pbar.update(1)
vector = DoubleVector()
vector.from_numpy(ivector)
ivector_normalize_length(vector)
ivector_mat.Row(i).CopyFromVec(vector)
num_utt_done += 1
update_mapping.append({"id": speaker_id, "num_utterances": num_utterances})
plda_stats.AddSamples(1.0, ivector_mat)
num_spk_done += 1
if num_spk_done == 0:
raise IvectorTrainingError("No stats accumulated, unable to estimate PLDA.")
if num_utt_done <= plda_stats.Dim():
raise IvectorTrainingError(
"Number of training iVectors is not greater than their "
"dimension, unable to estimate PLDA."
)
if num_spk_done == num_utt_done:
raise IvectorTrainingError(
"No speakers with multiple utterances, unable to estimate PLDA."
)
ivector_logger.info(
f"Accumulated stats from {num_spk_done} speakers "
f"({num_spk_err} with no utterances), consisting of {num_utt_done} utterances "
f"({num_utt_err} absent from input)."
)
if update_mapping:
bulk_update(session, Speaker, update_mapping)
plda_stats.Sort()
plda_estimator = PldaEstimator(plda_stats)
plda = Plda()
plda_estimator.Estimate(plda_config, plda)
write_kaldi_object(plda, plda_path)
self.plda = plda
logger.debug(f"Computing PLDA took {time.time() - begin:.3f} seconds.")
def transform_ivectors(self):
plda_transform_path = self.working_directory.joinpath("plda.pkl")
if self.has_ivectors() and os.path.exists(plda_transform_path):
return
if not self.plda_path.exists():
logger.info("Missing plda, skipping speaker ivector transformation")
return
self.plda = read_kaldi_object(Plda, self.plda_path)
self.adapt_plda()
self.plda = read_kaldi_object(Plda, self.plda_path)
with self.session() as session:
query = session.query(Utterance.id, Utterance.ivector).filter(
Utterance.ivector != None # noqa
)
ivectors = np.empty((query.count(), config.IVECTOR_DIMENSION))
update_mapping = []
for i, (u_id, ivector) in enumerate(query):
kaldi_ivector = FloatVector()
kaldi_ivector.from_numpy(ivector)
update_mapping.append(
{
"id": u_id,
"plda_vector": self.plda.transform_ivector(kaldi_ivector, 1).numpy(),
}
)
ivectors[i, :] = ivector
bulk_update(session, Utterance, update_mapping)
session.commit()
def _write_ivectors(self) -> None:
"""Collect single scp file for all ivectors"""
with self.session() as session, mfa_open(self.utterance_ivector_path, "w") as outf:
utterances = (
session.query(Utterance.kaldi_id, Utterance.ivector_ark)
.join(Utterance.speaker)
.filter(Utterance.ivector_ark != None, Speaker.name != "MFA_UNKNOWN") # noqa,
)
for utt_id, ivector_ark in utterances:
outf.write(f"{utt_id} {ivector_ark}\n")
[docs]
def collect_utterance_ivectors(self) -> None:
"""Collect trained per-utterance ivectors"""
logger.info("Collecting ivectors...")
with self.session() as session, tqdm(
total=self.num_utterances, disable=config.QUIET
) as pbar:
update_mapping = {}
ivector_sum = FloatVector(config.IVECTOR_DIMENSION)
ivectors = {}
count = 0
for j in self.jobs:
ivector_scp_path = j.construct_path(self.split_directory, "ivectors", "scp")
with mfa_open(ivector_scp_path) as f:
for line in f:
line = line.strip()
utt_id, ivector_ark_path = line.split(maxsplit=1)
utt_id = int(utt_id.split("-")[-1])
ivector = read_kaldi_object(FloatVector, ivector_ark_path)
ivectors[utt_id] = ivector
update_mapping[utt_id] = {"id": utt_id, "ivector_ark": ivector_ark_path}
ivector_normalize_length(ivector)
count += 1
ivector_sum.AddVec(1.0, ivector)
for utt_id, ivector in ivectors.items():
ivector.AddVec(-1.0 / count, ivector_sum)
ivector_normalize_length(ivector)
update_mapping[utt_id]["ivector"] = ivector.numpy()
pbar.update(1)
bulk_update(session, Utterance, list(update_mapping.values()))
session.flush()
n_lists = int(math.sqrt(self.num_utterances))
n_probe = int(math.sqrt(n_lists))
session.execute(
sqlalchemy.text(
f"CREATE INDEX IF NOT EXISTS utterance_ivector_index ON utterance USING ivfflat (ivector vector_cosine_ops) WITH (lists = {n_lists});"
)
)
session.execute(sqlalchemy.text(f"SET ivfflat.probes = {n_probe};"))
session.query(Corpus).update({Corpus.ivectors_calculated: True})
session.commit()
self._write_ivectors()
self.transform_ivectors()
[docs]
def collect_speaker_ivectors(self) -> None:
"""Collect trained per-speaker ivectors"""
if self.plda is None:
self.collect_utterance_ivectors()
logger.info("Collecting speaker ivectors...")
speaker_ivector_ark_path = os.path.join(
self.working_directory, "current_speaker_ivectors.ark"
)
num_utts_path = self.working_directory.joinpath("current_num_utts.ark")
if not os.path.exists(speaker_ivector_ark_path):
self.compute_speaker_ivectors()
with self.session() as session, tqdm(
total=self.num_speakers, disable=config.QUIET
) as pbar:
ivector_archive = IvectorArchive(
speaker_ivector_ark_path, num_utterances_file_name=num_utts_path
)
ivectors = []
speaker_ids = []
num_utts = []
for speaker_id, ivector, utts in ivector_archive:
speaker_ids.append(speaker_id)
num_utts.append(utts)
ivector_normalize_length(ivector)
ivectors.append(DoubleVector(ivector))
ivector_subtract_mean(ivectors)
update_mapping = []
for i in range(len(speaker_ids)):
ivector = ivectors[i]
ivector_normalize_length(ivector)
update_mapping.append(
{
"id": speaker_ids[i],
"ivector": ivector.numpy(),
"plda_vector": self.plda.transform_ivector(ivector, num_utts[i]).numpy(),
}
)
pbar.update(1)
bulk_update(session, Speaker, update_mapping)
session.flush()
session.execute(
sqlalchemy.text(
"CREATE INDEX IF NOT EXISTS speaker_ivector_index ON speaker USING ivfflat (ivector vector_cosine_ops);"
)
)
session.execute(
sqlalchemy.text(
"CREATE INDEX IF NOT EXISTS speaker_plda_vector_index ON speaker USING ivfflat (plda_vector vector_cosine_ops);"
)
)
session.commit()