Source code for montreal_forced_aligner.diarization.multiprocessing

"""Multiprocessing functionality for speaker diarization"""
from __future__ import annotations

import logging
import multiprocessing as mp
import os
import queue
import subprocess
import sys
import time
import typing
from pathlib import Path

import dataclassy
import hdbscan
import kneed
import librosa
import numpy as np
import sqlalchemy
from scipy.spatial import distance
from sklearn import cluster, manifold, metrics, neighbors, preprocessing
from sqlalchemy.orm import Session, joinedload

from montreal_forced_aligner.abc import KaldiFunction
from montreal_forced_aligner.config import GLOBAL_CONFIG, IVECTOR_DIMENSION, XVECTOR_DIMENSION
from montreal_forced_aligner.corpus.features import (
    PldaModel,
    classify_plda,
    compute_classification_stats,
    pairwise_plda_distance_matrix,
)
from montreal_forced_aligner.data import (
    ClusterType,
    DistanceMetric,
    ManifoldAlgorithm,
    MfaArguments,
)
from montreal_forced_aligner.db import File, Job, SoundFile, Speaker, Utterance
from montreal_forced_aligner.utils import Stopped, read_feats, thirdparty_binary

try:
    import warnings

    with warnings.catch_warnings():
        warnings.simplefilter("ignore")
        torch_logger = logging.getLogger("speechbrain.utils.torch_audio_backend")
        torch_logger.setLevel(logging.ERROR)
        torch_logger = logging.getLogger("speechbrain.utils.train_logger")
        torch_logger.setLevel(logging.ERROR)
        import torch
        from speechbrain.pretrained import EncoderClassifier, SpeakerRecognition
    FOUND_SPEECHBRAIN = True
except (ImportError, OSError):
    FOUND_SPEECHBRAIN = False
    EncoderClassifier = None
    SpeakerRecognition = None

__all__ = [
    "PldaClassificationArguments",
    "PldaClassificationFunction",
    "ComputeEerArguments",
    "ComputeEerFunction",
    "SpeechbrainArguments",
    "SpeechbrainClassificationFunction",
    "SpeechbrainEmbeddingFunction",
    "cluster_matrix",
    "visualize_clusters",
]

logger = logging.getLogger("mfa")


# noinspection PyUnresolvedReferences
[docs] @dataclassy.dataclass(slots=True) class PldaClassificationArguments(MfaArguments): """Arguments for :class:`~montreal_forced_aligner.diarization.multiprocessing.PldaClassificationFunction`""" plda: PldaModel train_ivector_path: Path num_utts_path: Path use_xvector: bool
# noinspection PyUnresolvedReferences
[docs] @dataclassy.dataclass(slots=True) class ComputeEerArguments(MfaArguments): """Arguments for :class:`~montreal_forced_aligner.diarization.multiprocessing.ComputeEerFunction`""" plda: PldaModel metric: DistanceMetric use_xvector: bool limit_within_speaker: int limit_per_speaker: int
# noinspection PyUnresolvedReferences
[docs] @dataclassy.dataclass(slots=True) class SpeechbrainArguments(MfaArguments): """Arguments for :class:`~montreal_forced_aligner.diarization.multiprocessing.SpeechbrainClassificationFunction`""" cuda: bool cluster: bool
def visualize_clusters( ivectors: np.ndarray, manifold_algorithm: ManifoldAlgorithm, metric_type: DistanceMetric, n_neighbors: int = 10, plda: typing.Optional[PldaModel] = None, quick=False, ): logger.debug(f"Generating 2D representation of ivectors with {manifold_algorithm.name}...") begin = time.time() to_fit = ivectors metric = metric_type.name tsne_angle = 0.5 tsne_iterations = 1000 mds_iterations = 300 if quick: tsne_angle = 0.8 tsne_iterations = 500 mds_iterations = 150 if metric_type is DistanceMetric.plda: logger.info("Generating precomputed distance matrix...") to_fit = metrics.pairwise_distances( ivectors, ivectors, metric=plda.distance, n_jobs=GLOBAL_CONFIG.current_profile.num_jobs ) np.fill_diagonal(to_fit, 0) metric = "precomputed" if manifold_algorithm is ManifoldAlgorithm.mds: if metric_type is DistanceMetric.cosine: to_fit = preprocessing.normalize(ivectors, norm="l2") metric = "euclidean" points = manifold.MDS( dissimilarity=metric, random_state=0, n_jobs=GLOBAL_CONFIG.current_profile.num_jobs, max_iter=mds_iterations, metric=False, normalized_stress=True, ).fit_transform(to_fit) elif manifold_algorithm is ManifoldAlgorithm.tsne: points = manifold.TSNE( metric=metric, random_state=0, perplexity=n_neighbors, init="pca" if metric != "precomputed" else "random", n_jobs=GLOBAL_CONFIG.current_profile.num_jobs, angle=tsne_angle, n_iter=tsne_iterations, ).fit_transform(to_fit) elif manifold_algorithm is ManifoldAlgorithm.spectral: points = manifold.SpectralEmbedding( affinity="nearest_neighbors", random_state=0, n_neighbors=n_neighbors, n_jobs=GLOBAL_CONFIG.current_profile.num_jobs, ).fit_transform(to_fit) elif manifold_algorithm is ManifoldAlgorithm.isomap: points = manifold.Isomap( metric=metric, n_neighbors=n_neighbors, n_jobs=GLOBAL_CONFIG.current_profile.num_jobs ).fit_transform(to_fit) else: raise NotImplementedError logger.debug(f"Generating 2D representation took {time.time() - begin:.3f} seconds") return points def calculate_distance_threshold( metric: typing.Union[str, callable], to_fit: np.ndarray, min_samples: int = 5, working_directory: str = None, score_metric_params=None, no_visuals: bool = False, ) -> float: """ Calculate a threshold for the given ivectors using a relative threshold Parameters ---------- metric: str or callable Metric to evaluate to_fit: numpy.ndarray Ivectors or distance matrix relative_distance_threshold: float Relative threshold from 0 to 1 Returns ------- float Absolute distance threshold """ logger.debug(f"Calculating distance threshold from {min_samples} nearest neighbors...") nbrs = neighbors.NearestNeighbors( n_neighbors=min_samples, metric=metric, metric_params=score_metric_params, n_jobs=GLOBAL_CONFIG.current_profile.num_jobs, ).fit(to_fit) distances, indices = nbrs.kneighbors(to_fit) distances = distances[:, min_samples - 1] distances = np.sort(distances, axis=0) kneedle = kneed.KneeLocator(np.arange(distances.shape[0]), distances, curve="concave", S=5) index = kneedle.elbow threshold = distances[index] min_distance = np.min(distances) max_distance = np.max(distances) logger.debug( f"Distance threshold was set to {threshold} (range = {min_distance:.4f} - {max_distance:.4f})" ) if GLOBAL_CONFIG.current_profile.debug and not no_visuals: import seaborn as sns from matplotlib import pyplot as plt sns.set() plt.plot(distances) plt.xlabel("Index") plt.ylabel("Distance to NN") plt.axvline(index, c="k") plt.text( index, max_distance, "threshold", horizontalalignment="right", verticalalignment="top" ) if working_directory is not None: plot_path = os.path.join(working_directory, "nearest_neighbor_distances.png") close_string = f"Closing k-distance plot, it has been saved to {plot_path}." plt.savefig(plot_path, transparent=True) else: close_string = "Closing k-distance plot." if GLOBAL_CONFIG.current_profile.verbose: plt.show(block=False) plt.pause(10) logger.debug(close_string) plt.close() return float(threshold)
[docs] def cluster_matrix( ivectors: np.ndarray, cluster_type: ClusterType, metric: DistanceMetric = DistanceMetric.euclidean, strict=True, no_visuals=False, working_directory=None, **kwargs, ) -> np.ndarray: """ Wrapper function for sklearn's clustering methods Parameters ---------- ivectors: numpy.ndarray Ivectors to cluster cluster_type: :class:`~montreal_forced_aligner.data.ClusterType` Clustering algorithm metric: :class:`~montreal_forced_aligner.data.DistanceMetric` Distance metric to use in clustering strict: bool Flag for whether to raise exceptions when only one cluster is found kwargs Extra keyword arguments to pass to sklearn cluster classes Returns ------- numpy.ndarray Cluster labels for each utterance """ from montreal_forced_aligner.config import GLOBAL_CONFIG logger.debug(f"Running {cluster_type}...") if sys.platform == "win32" and cluster_type is ClusterType.kmeans: os.environ["OMP_NUM_THREADS"] = "1" os.environ["OPENBLAS_NUM_THREADS"] = "1" os.environ["MKL_NUM_THREADS"] = "1" else: os.environ["OMP_NUM_THREADS"] = f"{GLOBAL_CONFIG.current_profile.num_jobs}" os.environ["OPENBLAS_NUM_THREADS"] = f"{GLOBAL_CONFIG.current_profile.num_jobs}" os.environ["MKL_NUM_THREADS"] = f"{GLOBAL_CONFIG.current_profile.num_jobs}" distance_threshold = kwargs.pop("distance_threshold", None) plda: PldaModel = kwargs.pop("plda", None) min_cluster_size = kwargs.pop("min_cluster_size", 15) score_metric = metric.value if score_metric == "plda": score_metric = plda to_fit = ivectors score_metric_params = None if score_metric == "plda" and cluster_type is not ClusterType.affinity: logger.debug("Generating precomputed distance matrix...") begin = time.time() to_fit = to_fit.astype("float64") psi = plda.psi.astype("float64") to_fit = pairwise_plda_distance_matrix(to_fit, psi) logger.debug(f"Precomputed distance matrix took {time.time() - begin:.3f} seconds") score_metric = "precomputed" if cluster_type is ClusterType.affinity: affinity = metric if metric is DistanceMetric.cosine: to_fit = preprocessing.normalize(to_fit, norm="l2") score_metric = "euclidean" affinity = "euclidean" elif metric is DistanceMetric.plda: logger.debug("Generating precomputed distance matrix...") to_fit = metrics.pairwise_distances( to_fit, to_fit, metric=plda.log_likelihood, n_jobs=GLOBAL_CONFIG.current_profile.num_jobs, ) score_metric = "precomputed" affinity = "precomputed" c_labels = cluster.AffinityPropagation( affinity=affinity, copy=False, random_state=GLOBAL_CONFIG.current_profile.seed, verbose=GLOBAL_CONFIG.current_profile.verbose, **kwargs, ).fit_predict(to_fit) elif cluster_type is ClusterType.agglomerative: if metric is DistanceMetric.cosine: to_fit = preprocessing.normalize(to_fit, norm="l2") score_metric = "euclidean" if not kwargs["n_clusters"]: if distance_threshold is not None: eps = distance_threshold else: eps = calculate_distance_threshold( score_metric, to_fit, min_cluster_size, working_directory, score_metric_params=score_metric_params, no_visuals=no_visuals, ) kwargs["distance_threshold"] = eps c_labels = cluster.AgglomerativeClustering(metric=score_metric, **kwargs).fit_predict( to_fit ) elif cluster_type is ClusterType.spectral: affinity = "nearest_neighbors" if metric is DistanceMetric.cosine: to_fit = preprocessing.normalize(to_fit, norm="l2") score_metric = "euclidean" elif metric is DistanceMetric.plda: logger.info("Generating precomputed distance matrix...") affinity = "precomputed_nearest_neighbors" to_fit = metrics.pairwise_distances( to_fit, to_fit, metric=score_metric, n_jobs=GLOBAL_CONFIG.current_profile.num_jobs ) np.fill_diagonal(to_fit, 0) score_metric = "precomputed" c_labels = cluster.SpectralClustering( affinity=affinity, n_jobs=GLOBAL_CONFIG.current_profile.num_jobs, random_state=GLOBAL_CONFIG.current_profile.seed, verbose=GLOBAL_CONFIG.current_profile.verbose, **kwargs, ).fit_predict(to_fit) elif cluster_type is ClusterType.dbscan: if distance_threshold is not None: eps = distance_threshold else: eps = calculate_distance_threshold( score_metric, to_fit, min_cluster_size, working_directory, score_metric_params=score_metric_params, no_visuals=no_visuals, ) c_labels = cluster.DBSCAN( min_samples=min_cluster_size, metric=score_metric, eps=eps, n_jobs=GLOBAL_CONFIG.current_profile.num_jobs, **kwargs, ).fit_predict(to_fit) elif cluster_type is ClusterType.meanshift: if score_metric == "cosine": to_fit = preprocessing.normalize(to_fit, norm="l2") score_metric = "euclidean" c_labels = cluster.MeanShift( n_jobs=GLOBAL_CONFIG.current_profile.num_jobs, **kwargs ).fit_predict(to_fit) elif cluster_type is ClusterType.hdbscan: if score_metric == "cosine": to_fit = preprocessing.normalize(to_fit, norm="l2") score_metric = "euclidean" min_samples = max(5, int(min_cluster_size / 4)) if distance_threshold is not None: eps = distance_threshold else: eps = calculate_distance_threshold( score_metric, to_fit, min_cluster_size, working_directory, score_metric_params=score_metric_params, no_visuals=no_visuals, ) if score_metric == "precomputed" or metric is DistanceMetric.plda: algorithm = "best" else: algorithm = "boruvka_balltree" c_labels = hdbscan.HDBSCAN( min_samples=min_samples, min_cluster_size=min_cluster_size, cluster_selection_epsilon=eps, metric=score_metric, algorithm=algorithm, core_dist_n_jobs=GLOBAL_CONFIG.current_profile.num_jobs, **kwargs, ).fit_predict(to_fit) elif cluster_type is ClusterType.optics: if distance_threshold is not None: eps = distance_threshold else: eps = calculate_distance_threshold( score_metric, to_fit, min_cluster_size, working_directory, score_metric_params=score_metric_params, no_visuals=no_visuals, ) c_labels = cluster.OPTICS( min_samples=min_cluster_size, max_eps=eps, metric=score_metric, n_jobs=GLOBAL_CONFIG.current_profile.num_jobs, **kwargs, ).fit_predict(to_fit) elif cluster_type is ClusterType.kmeans: if score_metric == "cosine": to_fit = preprocessing.normalize(to_fit, norm="l2") score_metric = "euclidean" c_labels = cluster.MiniBatchKMeans( verbose=GLOBAL_CONFIG.current_profile.verbose, n_init="auto", **kwargs ).fit_predict(to_fit) else: raise NotImplementedError(f"The cluster type '{cluster_type}' is not supported.") num_clusters = np.unique(c_labels).shape[0] logger.debug(f"Found {num_clusters} clusters") try: if score_metric == "plda": score_metric = plda.distance elif score_metric == "precomputed": if cluster_type is ClusterType.affinity: to_fit = np.max(to_fit) - to_fit np.fill_diagonal(to_fit, 0) score = metrics.silhouette_score(to_fit, c_labels, metric=score_metric) logger.debug(f"Silhouette score (-1-1): {score}") except ValueError: if num_clusters == 1: logger.warning( "Only found one cluster, please adjust cluster parameters to generate more clusters." ) if strict: raise os.environ["OMP_NUM_THREADS"] = f"{GLOBAL_CONFIG.current_profile.blas_num_threads}" os.environ["OPENBLAS_NUM_THREADS"] = f"{GLOBAL_CONFIG.current_profile.blas_num_threads}" os.environ["MKL_NUM_THREADS"] = f"{GLOBAL_CONFIG.current_profile.blas_num_threads}" return c_labels
[docs] class PldaClassificationFunction(KaldiFunction): """ Multiprocessing function to compute voice activity detection See Also -------- :meth:`.AcousticCorpusMixin.compute_vad` Main function that calls this function in parallel :meth:`.AcousticCorpusMixin.compute_vad_arguments` Job method for generating arguments for this function :kaldi_src:`compute-vad` Relevant Kaldi binary Parameters ---------- args: :class:`~montreal_forced_aligner.corpus.features.VadArguments` Arguments for the function """ def __init__(self, args: PldaClassificationArguments): super().__init__(args) self.plda = args.plda self.train_ivector_path = args.train_ivector_path self.num_utts_path = args.num_utts_path self.use_xvector = args.use_xvector def _run(self) -> typing.Generator[typing.Tuple[int, int, int]]: """Run the function""" utterance_counts = {} with open(self.num_utts_path) as f: for line in f: speaker, utt_count = line.strip().split() utt_count = int(utt_count) utterance_counts[int(speaker)] = utt_count input_proc = subprocess.Popen( [ thirdparty_binary("ivector-subtract-global-mean"), f"ark:{self.train_ivector_path}", "ark,t:-", ], stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, env=os.environ, ) speaker_ids = [] speaker_counts = [] if self.use_xvector: dim = XVECTOR_DIMENSION else: dim = IVECTOR_DIMENSION speaker_ivectors = np.empty((len(utterance_counts), dim)) for speaker_id, ivector in read_feats(input_proc, raw_id=True): speaker_id = int(speaker_id) if speaker_id not in utterance_counts: continue speaker_ivectors[len(speaker_ids), :] = ivector speaker_ids.append(speaker_id) speaker_counts.append(utterance_counts[speaker_id]) speaker_counts = np.array(speaker_counts) speaker_ivectors = speaker_ivectors.astype("float64") self.plda.psi = self.plda.psi.astype("float64") speaker_ivectors = self.plda.process_ivectors(speaker_ivectors, counts=speaker_counts) classification_args = compute_classification_stats( speaker_ivectors, self.plda.psi, counts=speaker_counts ) lines = [] for line in input_proc.stdout: lines.append(line) input_proc.wait() with Session(self.db_engine()) as session: job: Job = ( session.query(Job) .options(joinedload(Job.corpus, innerjoin=True)) .filter(Job.id == self.job_name) .first() ) utterances = ( session.query(Utterance.id, Utterance.plda_vector) .filter(Utterance.plda_vector != None) # noqa .filter(Utterance.job_id == job.id) .order_by(Utterance.kaldi_id) ) for u_id, u_ivector in utterances: ind, score = classify_plda(u_ivector.astype("float64"), *classification_args) speaker = speaker_ids[ind] yield u_id, speaker, score
[docs] class ComputeEerFunction(KaldiFunction): """ Multiprocessing function to compute voice activity detection See Also -------- :meth:`.AcousticCorpusMixin.compute_vad` Main function that calls this function in parallel :meth:`.AcousticCorpusMixin.compute_vad_arguments` Job method for generating arguments for this function :kaldi_src:`compute-vad` Relevant Kaldi binary Parameters ---------- args: :class:`~montreal_forced_aligner.corpus.features.VadArguments` Arguments for the function """ def __init__(self, args: ComputeEerArguments): super().__init__(args) self.plda = args.plda self.metric = args.metric self.use_xvector = args.use_xvector self.limit_within_speaker = args.limit_within_speaker self.limit_per_speaker = args.limit_per_speaker # noinspection PyTypeChecker def _run(self) -> typing.Generator[typing.Tuple[int, int, int]]: """Run the function""" if self.use_xvector: columns = [Utterance.id, Utterance.speaker_id, Utterance.xvector] filter = Utterance.xvector != None # noqa else: columns = [Utterance.id, Utterance.speaker_id, Utterance.plda_vector] filter = Utterance.plda_vector != None # noqa with Session(self.db_engine()) as session: speakers = ( session.query(Speaker.id) .join(Speaker.utterances) .filter(Utterance.job_id == self.job_name) .order_by(Speaker.id) .distinct(Speaker.id) ) for (s_id,) in speakers: match_scores = [] mismatch_scores = [] random_within_speaker = ( session.query(*columns) .filter(filter, Utterance.speaker_id == s_id) .order_by(sqlalchemy.func.random()) .limit(self.limit_within_speaker) ) for u_id, s_id, u_ivector in random_within_speaker: comp_query = ( session.query(columns[2]) .filter(filter, Utterance.speaker_id == s_id, Utterance.id != u_id) .order_by(sqlalchemy.func.random()) .limit(self.limit_within_speaker) ) for (u2_ivector,) in comp_query: if self.metric is DistanceMetric.plda: score = self.plda.distance(u_ivector, u2_ivector) elif self.metric is DistanceMetric.cosine: score = distance.cosine(u_ivector, u2_ivector) else: score = distance.euclidean(u_ivector, u2_ivector) match_scores.append(score) other_speakers = session.query(Speaker.id).filter(Speaker.id != s_id) for (o_s_id,) in other_speakers: random_out_speaker = ( session.query(columns[2]) .filter(filter, Utterance.speaker_id == s_id) .order_by(sqlalchemy.func.random()) .limit(self.limit_per_speaker) ) for (u_ivector,) in random_out_speaker: comp_query = ( session.query(columns[2]) .filter(filter, Utterance.speaker_id == o_s_id) .order_by(sqlalchemy.func.random()) .limit(self.limit_per_speaker) ) for (u2_ivector,) in comp_query: if self.metric is DistanceMetric.plda: score = self.plda.distance(u_ivector, u2_ivector) elif self.metric is DistanceMetric.cosine: score = distance.cosine(u_ivector, u2_ivector) else: score = distance.euclidean(u_ivector, u2_ivector) mismatch_scores.append(score) yield match_scores, mismatch_scores
[docs] class SpeechbrainClassificationFunction(KaldiFunction): """ Multiprocessing function to classify speakers based on a speechbrain model Parameters ---------- args: :class:`~montreal_forced_aligner.diarization.multiprocessing.SpeechbrainArguments` Arguments for the function """ def __init__(self, args: SpeechbrainArguments): super().__init__(args) self.cuda = args.cuda self.cluster = args.cluster def _run(self) -> typing.Generator[typing.Tuple[int, int, int]]: """Run the function""" run_opts = None if self.cuda: run_opts = {"device": "cuda"} model = EncoderClassifier.from_hparams( source="speechbrain/spkrec-ecapa-voxceleb", savedir=os.path.join( GLOBAL_CONFIG.current_profile.temporary_directory, "models", "SpeakerRecognition", ), run_opts=run_opts, ) device = torch.device("cuda" if self.cuda else "cpu") with Session(self.db_engine()) as session: job: Job = ( session.query(Job) .options(joinedload(Job.corpus, innerjoin=True)) .filter(Job.id == self.job_name) .first() ) utterances = session.query(Utterance.id, Utterance.xvector).filter( Utterance.xvector != None, Utterance.job_id == job.id # noqa ) for u_id, ivector in utterances: ivector = torch.tensor(ivector, device=device).unsqueeze(0).unsqueeze(0) out_prob = model.mods.classifier(ivector).squeeze(1) score, index = torch.max(out_prob, dim=-1) text_lab = model.hparams.label_encoder.decode_torch(index) new_speaker = text_lab[0] del out_prob del index yield u_id, new_speaker, float(score.cpu().numpy()) del text_lab del new_speaker del score if self.cuda: torch.cuda.empty_cache() del model if self.cuda: torch.cuda.empty_cache()
[docs] class SpeechbrainEmbeddingFunction(KaldiFunction): """ Multiprocessing function to generating xvector embeddings from a speechbrain model Parameters ---------- args: :class:`~montreal_forced_aligner.diarization.multiprocessing.SpeechbrainArguments` Arguments for the function """ def __init__(self, args: SpeechbrainArguments): super().__init__(args) self.cuda = args.cuda self.cluster = args.cluster def _run(self) -> typing.Generator[typing.Tuple[int, int, int]]: """Run the function""" run_opts = None if self.cuda: run_opts = {"device": "cuda"} if self.cluster: model_class = SpeakerRecognition else: model_class = EncoderClassifier model = model_class.from_hparams( source="speechbrain/spkrec-ecapa-voxceleb", savedir=os.path.join( GLOBAL_CONFIG.current_profile.temporary_directory, "models", "SpeakerRecognition", ), run_opts=run_opts, ) return_q = mp.Queue(2) finished_adding = Stopped() stopped = Stopped() loader = UtteranceFileLoader( self.job_name, self.db_string, return_q, stopped, finished_adding ) loader.start() exception = None device = torch.device("cuda" if self.cuda else "cpu") while True: try: result = return_q.get(timeout=1) except queue.Empty: if finished_adding.stop_check(): break continue if stopped.stop_check(): continue if isinstance(result, Exception): stopped.stop() continue u_id, y = result emb = ( model.encode_batch( torch.tensor(y[np.newaxis, :], device=device), normalize=self.cluster ) .cpu() .numpy() .squeeze(axis=1) ) yield u_id, emb[0] del emb if self.cuda: torch.cuda.empty_cache() loader.join() if exception: raise Exception
class UtteranceFileLoader(mp.Process): """ Helper process for loading utterance waveforms in parallel with embedding extraction Parameters ---------- job_name: int Job identifier db_string: str Connection string for database return_q: multiprocessing.Queue Queue to put waveforms stopped: :class:`~montreal_forced_aligner.utils.Stopped` Check for whether the process to exit gracefully finished_adding: :class:`~montreal_forced_aligner.utils.Stopped` Check for whether the worker has processed all utterances """ def __init__( self, job_name: int, db_string: str, return_q: mp.Queue, stopped: Stopped, finished_adding: Stopped, ): super().__init__() self.job_name = job_name self.db_string = db_string self.return_q = return_q self.stopped = stopped self.finished_adding = finished_adding def run(self) -> None: """ Run the waveform loading job """ db_engine = sqlalchemy.create_engine( self.db_string, poolclass=sqlalchemy.NullPool, pool_reset_on_return=None, isolation_level="AUTOCOMMIT", logging_name=f"{type(self).__name__}_engine", ).execution_options(logging_token=f"{type(self).__name__}_engine") with Session(db_engine) as session: try: utterances = ( session.query( Utterance.id, Utterance.begin, Utterance.duration, SoundFile.sound_file_path, ) .join(Utterance.file) .join(File.sound_file) .filter(Utterance.job_id == self.job_name) ) for u_id, begin, duration, sound_file_path in utterances: if self.stopped.stop_check(): break y, _ = librosa.load( sound_file_path, sr=16000, mono=False, offset=begin, duration=duration, ) self.return_q.put((u_id, y)) except Exception as e: self.return_q.put(e) finally: self.finished_adding.stop()