"""Class definitions for corpora"""
from __future__ import annotations
import logging
import os
import time
import typing
from abc import ABCMeta, abstractmethod
import sqlalchemy.engine
import tqdm
from sqlalchemy.orm import Session, joinedload, selectinload, subqueryload
from montreal_forced_aligner.abc import DatabaseMixin, MfaWorker
from montreal_forced_aligner.config import GLOBAL_CONFIG
from montreal_forced_aligner.corpus.classes import FileData, UtteranceData
from montreal_forced_aligner.corpus.multiprocessing import (
ExportKaldiFilesArguments,
ExportKaldiFilesFunction,
NormalizeTextFunction,
dictionary_ids_for_job,
)
from montreal_forced_aligner.data import DatabaseImportData, TextFileType, WordType, WorkflowType
from montreal_forced_aligner.db import (
Corpus,
CorpusWorkflow,
Dialect,
Dictionary,
Dictionary2Job,
File,
Job,
Pronunciation,
SoundFile,
Speaker,
SpeakerOrdering,
TextFile,
Utterance,
Word,
bulk_update,
)
from montreal_forced_aligner.exceptions import CorpusError
from montreal_forced_aligner.helper import output_mapping
from montreal_forced_aligner.utils import Stopped, run_kaldi_function
__all__ = ["CorpusMixin"]
logger = logging.getLogger("mfa")
[docs]
class CorpusMixin(MfaWorker, DatabaseMixin, metaclass=ABCMeta):
"""
Mixin class for processing corpora
Notes
-----
Using characters in files to specify speakers is generally finicky and leads to errors, so I would not
recommend using it. Additionally, consider it deprecated and could be removed in future versions
Parameters
----------
corpus_directory: str
Path to corpus
speaker_characters: int or str, optional
Number of characters in the file name to specify the speaker
ignore_speakers: bool
Flag for whether to discard any parsed speaker information during top-level worker's processing
oov_count_threshold: int
Words in the corpus with counts less than or equal to the threshold will be treated as OOV items, defaults to 0
See Also
--------
:class:`~montreal_forced_aligner.abc.MfaWorker`
For MFA processing parameters
:class:`~montreal_forced_aligner.abc.TemporaryDirectoryMixin`
For temporary directory parameters
Attributes
----------
jobs: list[:class:`~montreal_forced_aligner.corpus.multiprocessing.Job`]
List of jobs for processing the corpus and splitting speakers
stopped: Stopped
Stop check for loading the corpus
decode_error_files: list[str]
List of text files that could not be loaded with utf8
textgrid_read_errors: list[str]
List of TextGrid files that had an error in loading
"""
def __init__(
self,
corpus_directory: str,
speaker_characters: typing.Union[int, str] = 0,
ignore_speakers: bool = False,
oov_count_threshold: int = 0,
**kwargs,
):
if not os.path.exists(corpus_directory):
raise CorpusError(f"The directory '{corpus_directory}' does not exist.")
if not os.path.isdir(corpus_directory):
raise CorpusError(
f"The specified path for the corpus ({corpus_directory}) is not a directory."
)
self._speaker_ids = {}
self.corpus_directory = corpus_directory
self.speaker_characters = speaker_characters
self.ignore_speakers = ignore_speakers
self.oov_count_threshold = oov_count_threshold
self.stopped = Stopped()
self.decode_error_files = []
self.textgrid_read_errors = []
self._num_speakers = None
self._num_utterances = None
self._num_files = None
super().__init__(**kwargs)
os.makedirs(self.corpus_output_directory, exist_ok=True)
self.imported = False
self.text_normalized = False
self._current_speaker_index = 1
self._current_file_index = 1
self._current_utterance_index = 1
self._speaker_ids = {}
self._word_set = []
self._jobs = []
self.ignore_empty_utterances = False
@property
def jobs(self) -> typing.List[Job]:
if not self._jobs:
with self.session() as session:
c: Corpus = session.query(Corpus).first()
jobs = session.query(Job).options(
joinedload(Job.corpus, innerjoin=True), subqueryload(Job.dictionaries)
)
if c.current_subset:
jobs = jobs.filter(Job.utterances.any(Utterance.in_subset == True)) # noqa
jobs = jobs.filter(Job.utterances.any(Utterance.ignored == False)) # noqa
self._jobs = jobs.all()
return self._jobs
def dictionary_ids_for_job(self, job_id):
with self.session() as session:
return dictionary_ids_for_job(session, job_id)
[docs]
def inspect_database(self) -> None:
"""Check if a database file exists and create the necessary metadata"""
self.initialize_database()
with self.session() as session:
corpus = session.query(Corpus).first()
if corpus:
self.imported = corpus.imported
self.text_normalized = corpus.text_normalized
else:
session.add(
Corpus(
name=self.data_source_identifier,
path=str(self.corpus_directory),
data_directory=self.corpus_output_directory,
)
)
session.commit()
[docs]
def get_utterances(
self,
id: typing.Optional[int] = None,
file: typing.Optional[typing.Union[str, int]] = None,
speaker: typing.Optional[typing.Union[str, int]] = None,
begin: typing.Optional[float] = None,
end: typing.Optional[float] = None,
session: Session = None,
):
"""
Get a file from search parameters
Parameters
----------
id: int
Integer ID to look up
file: str or int
File name or ID to look up
speaker: str or int
Speaker name or ID to look up
begin: float
Begin timestamp to look up
end: float
Ending timestamp to look up
Returns
-------
:class:`~montreal_forced_aligner.db.Utterance`
Utterance match
"""
if session is None:
session = self.session()
if id is not None:
utterance = session.get(Utterance, id)
if not utterance:
raise Exception(f"Could not find utterance with id of {id}")
return utterance
else:
utterance = session.query(Utterance)
if file is not None:
utterance = utterance.join(Utterance.file)
if isinstance(file, int):
utterance = utterance.filter(File.id == file)
else:
utterance = utterance.filter(File.name == file)
if speaker is not None:
utterance = utterance.join(Utterance.speaker)
if isinstance(speaker, int):
utterance = utterance.filter(Speaker.id == speaker)
else:
utterance = utterance.filter(Speaker.name == speaker)
if begin is not None:
utterance = utterance.filter(Utterance.begin == begin)
if end is not None:
utterance = utterance.filter(Utterance.end == end)
utterance = utterance.all()
return list(utterance)
[docs]
def get_file(
self, id: typing.Optional[int] = None, name=None, session: Session = None
) -> File:
"""
Get a file from search parameters
Parameters
----------
id: int
Integer ID to look up
name: str
File name to look up
Returns
-------
:class:`~montreal_forced_aligner.db.File`
File match
"""
if session is None:
session = self.session()
file = session.query(File).options(
selectinload(File.utterances).joinedload(Utterance.speaker, innerjoin=True),
joinedload(File.sound_file, innerjoin=True),
joinedload(File.text_file, innerjoin=True),
selectinload(File.speakers),
)
if id is not None:
file = file.get(id)
if not file:
raise Exception(f"Could not find utterance with id of {id}")
return file
else:
file = file.filter(File.name == name).first()
if not file:
raise Exception(f"Could not find utterance with name of {name}")
return file
@property
def corpus_meta(self) -> typing.Dict[str, typing.Any]:
"""Corpus metadata"""
return {}
@property
def features_log_directory(self) -> str:
"""Feature log directory"""
return os.path.join(self.split_directory, "log")
@property
def split_directory(self) -> str:
"""Directory used to store information split by job"""
return os.path.join(
self.corpus_output_directory, f"split{GLOBAL_CONFIG.current_profile.num_jobs}"
)
def _write_spk2utt(self) -> None:
"""Write spk2utt scp file for Kaldi"""
data = {}
utt2spk_data = {}
with self.session() as session:
utterances = (
session.query(Utterance.kaldi_id, Utterance.speaker_id)
.join(Utterance.speaker)
.filter(Speaker.name != "MFA_UNKNOWN")
.order_by(Utterance.kaldi_id)
)
for utt_id, speaker_id in utterances:
if speaker_id not in data:
data[speaker_id] = []
data[speaker_id].append(utt_id)
utt2spk_data[utt_id] = speaker_id
output_mapping(utt2spk_data, os.path.join(self.corpus_output_directory, "utt2spk.scp"))
output_mapping(data, os.path.join(self.corpus_output_directory, "spk2utt.scp"))
[docs]
def create_corpus_split(self) -> None:
"""Create split directory and output information from Jobs"""
os.makedirs(os.path.join(self.split_directory, "log"), exist_ok=True)
with self.session() as session, tqdm.tqdm(
total=self.num_utterances, disable=GLOBAL_CONFIG.quiet
) as pbar:
jobs = session.query(Job)
arguments = [
ExportKaldiFilesArguments(
j.id, self.db_string, None, self.split_directory, for_features=False
)
for j in jobs
]
for _ in run_kaldi_function(ExportKaldiFilesFunction, arguments, pbar.update):
pass
@property
def corpus_word_set(self) -> typing.List[str]:
"""Set of words used in the corpus"""
if not self._word_set:
with self.session() as session:
self._word_set = [
x[0]
for x in session.query(Word.word).filter(Word.count > 0).order_by(Word.word)
]
return self._word_set
[docs]
def add_utterance(self, utterance: UtteranceData, session: Session = None) -> Utterance:
"""
Add an utterance to the corpus
Parameters
----------
utterance: :class:`~montreal_forced_aligner.corpus.classes.UtteranceData`
Utterance to add
"""
close = False
if session is None:
session = self.session()
close = True
speaker_obj = session.query(Speaker).filter_by(name=utterance.speaker_name).first()
if not speaker_obj:
dictionary = None
if hasattr(self, "get_dictionary"):
dictionary = (
session.query(Dictionary)
.filter_by(name=self.get_dictionary(utterance.speaker_name).name)
.first()
)
speaker_obj = Speaker(name=utterance.speaker_name, dictionary=dictionary)
session.add(speaker_obj)
self._speaker_ids[utterance.speaker_name] = speaker_obj
else:
self._speaker_ids[utterance.speaker_name] = speaker_obj
file_obj = session.query(File).filter_by(name=utterance.file_name).first()
u = Utterance.from_data(
utterance, file_obj, speaker_obj, frame_shift=getattr(self, "frame_shift", None)
)
u.id = self.get_next_primary_key(Utterance)
session.add(u)
if close:
session.commit()
session.close()
return u
[docs]
def delete_utterance(self, utterance_id: int, session: Session = None) -> None:
"""
Delete an utterance from the corpus
Parameters
----------
utterance_id: int
Utterance to delete
"""
close = False
if session is None:
session = self.session()
close = True
session.query(Utterance).filter(Utterance.id == utterance_id).delete()
session.commit()
if close:
session.close()
[docs]
def speakers(self, session: Session = None) -> sqlalchemy.orm.Query:
"""
Get all speakers in the corpus
Parameters
----------
session: sqlalchemy.orm.Session, optional
Session to use in querying
Returns
-------
sqlalchemy.orm.Query
Speaker query
"""
close = False
if session is None:
session = self.session()
close = True
speakers = session.query(Speaker).options(
selectinload(Speaker.utterances),
selectinload(Speaker.files),
joinedload(Speaker.dictionary),
)
if close:
session.close()
return speakers
[docs]
def files(self, session: Session = None) -> sqlalchemy.orm.Query:
"""
Get all files in the corpus
Parameters
----------
session: sqlalchemy.orm.Session, optional
Session to use in querying
Returns
-------
sqlalchemy.orm.Query
File query
"""
close = False
if session is None:
session = self.session()
close = True
files = session.query(File).options(
selectinload(File.utterances),
selectinload(File.speakers),
joinedload(File.sound_file),
joinedload(File.text_file),
)
if close:
session.close()
return files
[docs]
def utterances(self, session: Session = None) -> sqlalchemy.orm.Query:
"""
Get all utterances in the corpus
Parameters
----------
session: sqlalchemy.orm.Session, optional
Session to use in querying
Returns
-------
:class:`sqlalchemy.orm.Query`
Utterance query
"""
close = False
if session is None:
session = Session(self.db_engine)
close = True
utterances = session.query(Utterance).options(
joinedload(Utterance.file, innerjoin=True),
joinedload(Utterance.speaker, innerjoin=True),
selectinload(Utterance.phone_intervals),
selectinload(Utterance.word_intervals),
)
if close:
session.close()
return utterances
[docs]
def initialize_jobs(self) -> None:
"""
Initialize the corpus's Jobs
"""
with self.session() as session:
if session.query(sqlalchemy.sql.exists().where(Utterance.job_id > 1)).scalar():
logger.info("Jobs already initialized.")
return
logger.info("Initializing multiprocessing jobs...")
if (
self.num_speakers < GLOBAL_CONFIG.current_profile.num_jobs
and not GLOBAL_CONFIG.current_profile.single_speaker
):
logger.warning(
f"Number of jobs was specified as {GLOBAL_CONFIG.current_profile.num_jobs}, "
f"but due to only having {self.num_speakers} speakers, MFA "
f"will only use {self.num_speakers} jobs. Use the --single_speaker flag if you would like to split "
f"utterances across jobs regardless of their speaker."
)
session.query(Job).filter(Job.id > GLOBAL_CONFIG.current_profile.num_jobs).delete()
session.query(Corpus).update(
{Corpus.num_jobs: GLOBAL_CONFIG.current_profile.num_jobs}
)
session.commit()
elif (
GLOBAL_CONFIG.current_profile.single_speaker
and self.num_utterances < GLOBAL_CONFIG.current_profile.num_jobs
):
logger.warning(
f"Number of jobs was specified as {GLOBAL_CONFIG.current_profile.num_jobs}, "
f"but due to only having {self.num_utterances} utterances, MFA "
f"will only use {self.num_utterances} jobs."
)
session.query(Job).filter(Job.id > GLOBAL_CONFIG.current_profile.num_jobs).delete()
session.query(Corpus).update(
{Corpus.num_jobs: GLOBAL_CONFIG.current_profile.num_jobs}
)
session.commit()
jobs = session.query(Job).all()
update_mappings = []
if GLOBAL_CONFIG.current_profile.single_speaker:
utts_per_job = int(self.num_utterances / GLOBAL_CONFIG.current_profile.num_jobs)
if utts_per_job == 0:
utts_per_job = 1
for i, j in enumerate(jobs):
update_mappings.extend(
{"id": u, "job_id": j.id}
for u in range((utts_per_job * i) + 1, (utts_per_job * (i + 1)) + 1)
)
last_ind = update_mappings[-1]["id"] + 1
for u in range(last_ind, self.num_utterances):
update_mappings.append({"id": u, "job_id": jobs[-1].id})
bulk_update(session, Utterance, update_mappings)
else:
utt_counts = {j.id: 0 for j in jobs}
speakers = (
session.query(Speaker.id, sqlalchemy.func.count(Utterance.id))
.outerjoin(Speaker.utterances)
.group_by(Speaker.id)
.order_by(sqlalchemy.func.count(Utterance.id).desc())
)
for s_id, speaker_utt_count in speakers:
if not speaker_utt_count:
continue
job_id = min(utt_counts.keys(), key=lambda x: utt_counts[x])
update_mappings.append({"speaker_id": s_id, "job_id": job_id})
utt_counts[job_id] += speaker_utt_count
bulk_update(session, Utterance, update_mappings, id_field="speaker_id")
session.commit()
if session.query(Dictionary2Job).count() == 0:
dict_job_mappings = []
for job_id, dict_id in (
session.query(Utterance.job_id, Dictionary.id)
.join(Utterance.speaker)
.join(Speaker.dictionary)
.distinct()
):
if not dict_id:
continue
dict_job_mappings.append({"job_id": job_id, "dictionary_id": dict_id})
if dict_job_mappings:
session.execute(Dictionary2Job.insert().values(dict_job_mappings))
session.commit()
def _finalize_load(self, session: Session, import_data: DatabaseImportData):
"""Finalize the import of database objects after parsing"""
with session.begin_nested():
c = session.query(Corpus).first()
job_objs = [
{"id": j, "corpus_id": c.id}
for j in range(1, GLOBAL_CONFIG.current_profile.num_jobs + 1)
]
session.execute(sqlalchemy.insert(Job.__table__), job_objs)
c.num_jobs = GLOBAL_CONFIG.current_profile.num_jobs
if import_data.speaker_objects:
session.execute(sqlalchemy.insert(Speaker.__table__), import_data.speaker_objects)
if import_data.file_objects:
session.execute(sqlalchemy.insert(File.__table__), import_data.file_objects)
if import_data.text_file_objects:
session.execute(
sqlalchemy.insert(TextFile.__table__), import_data.text_file_objects
)
if import_data.sound_file_objects:
session.execute(
sqlalchemy.insert(SoundFile.__table__), import_data.sound_file_objects
)
if import_data.speaker_ordering_objects:
session.execute(
sqlalchemy.insert(SpeakerOrdering),
import_data.speaker_ordering_objects,
)
if import_data.utterance_objects:
session.execute(
sqlalchemy.insert(Utterance.__table__), import_data.utterance_objects
)
session.flush()
self.imported = True
speakers = (
session.query(Speaker.id)
.outerjoin(Speaker.utterances)
.group_by(Speaker.id)
.having(sqlalchemy.func.count(Utterance.id) == 0)
)
self._speaker_ids = {}
speaker_ids = [x[0] for x in speakers]
session.query(Corpus).update(
{
"imported": True,
"has_text_files": len(import_data.text_file_objects) > 0,
"has_sound_files": len(import_data.sound_file_objects) > 0,
}
)
if speaker_ids:
session.query(SpeakerOrdering).filter(
SpeakerOrdering.c.speaker_id.in_(speaker_ids)
).delete()
session.query(Speaker).filter(Speaker.id.in_(speaker_ids)).delete()
self._num_speakers = None
self._num_utterances = None # Recalculate if already cached
self._num_files = None
session.commit()
def normalize_text_arguments(self):
from montreal_forced_aligner.dictionary.mixins import DictionaryMixin
if not isinstance(self, DictionaryMixin):
return None
from montreal_forced_aligner.corpus.multiprocessing import NormalizeTextArguments
with self.session() as session:
jobs = session.query(Job).filter(Job.utterances.any())
return [
NormalizeTextArguments(
j.id,
self.db_string,
None,
self.word_break_markers,
self.punctuation,
self.clitic_markers,
self.compound_markers,
self.brackets,
self.laughter_word,
self.oov_word,
self.bracketed_word,
self.ignore_case,
getattr(self, "use_g2p", False),
)
for j in jobs
]
[docs]
def normalize_text(self) -> None:
"""Normalize the text of the corpus using a dictionary's sanitization functions and word mappings"""
if self.text_normalized:
logger.info("Text already normalized.")
return
args = self.normalize_text_arguments()
if args is None:
return
logger.info("Normalizing text...")
log_directory = os.path.join(self.split_directory, "log")
word_update_mappings = {}
word_insert_mappings = {}
pronunciation_insert_mappings = []
word_indexes = {}
word_mapping_ids = {}
max_mapping_ids = {}
os.makedirs(log_directory, exist_ok=True)
update_mapping = []
word_key = self.get_next_primary_key(Word)
pronunciation_key = self.get_next_primary_key(Pronunciation)
with tqdm.tqdm(
total=self.num_utterances, disable=GLOBAL_CONFIG.quiet
) as pbar, self.session() as session:
dictionaries: typing.Dict[int, Dictionary] = {
d.id: d for d in session.query(Dictionary)
}
has_words = session.query(Dictionary).filter(Dictionary.name == "unknown") is not None
words = session.query(
Word.id, Word.mapping_id, Word.dictionary_id, Word.word
).order_by(Word.mapping_id)
if not has_words or getattr(self, "use_g2p", False):
word_insert_mappings["<eps>"] = {
"id": word_key,
"word": "<eps>",
"word_type": WordType.silence,
"mapping_id": word_key - 1,
"count": 0,
"dictionary_id": 1,
}
word_key += 1
for w_id, m_id, d_id, w in words:
word_indexes[(d_id, w)] = w_id
word_mapping_ids[(d_id, w)] = m_id
max_mapping_ids[d_id] = m_id
for result in run_kaldi_function(NormalizeTextFunction, args, pbar.update):
result, dict_id = result
if dict_id is not None and not getattr(self, "use_g2p", False):
oovs = set(result["oovs"].split())
for w in oovs:
if w in dictionaries[dict_id].special_set:
continue
if (w, dict_id) not in word_insert_mappings:
max_mapping_ids[dict_id] += 1
word_insert_mappings[(w, dict_id)] = {
"id": word_key,
"mapping_id": max_mapping_ids[d_id],
"word": w,
"count": 0,
"dictionary_id": dict_id,
"word_type": WordType.oov,
}
pronunciation_insert_mappings.append(
{
"id": pronunciation_key,
"word_id": word_key,
"pronunciation": getattr(self, "oov_phone", "spn"),
"base_pronunciation_id": pronunciation_key,
}
)
word_key += 1
pronunciation_key += 1
word_insert_mappings[(w, dict_id)]["count"] += 1
for w in result["normalized_text"].split():
if w in oovs:
continue
if (dict_id, w) not in word_update_mappings:
word_update_mappings[(dict_id, w)] = {
"id": word_indexes[(dict_id, w)],
"count": 0,
}
word_update_mappings[(dict_id, w)]["count"] += 1
else:
for word in result["normalized_text"].split():
if word not in word_insert_mappings:
word_insert_mappings[word] = {
"id": word_key,
"word": word,
"word_type": WordType.speech,
"mapping_id": word_key - 1,
"count": 0,
"dictionary_id": 1,
}
pronunciation_insert_mappings.append(
{
"id": pronunciation_key,
"word_id": word_key,
"pronunciation": getattr(self, "oov_phone", "spn"),
"base_pronunciation_id": pronunciation_key,
}
)
word_key += 1
pronunciation_key += 1
word_insert_mappings[word]["count"] += 1
update_mapping.append(result)
bulk_update(session, Utterance, update_mapping)
if word_update_mappings:
if has_words:
session.query(Word).update({"count": 0})
session.commit()
bulk_update(session, Word, list(word_update_mappings.values()))
if word_insert_mappings:
if not has_words:
word_insert_mappings["<unk>"] = {
"id": word_key,
"word": "<unk>",
"word_type": WordType.oov,
"mapping_id": word_key - 1,
"count": 0,
"dictionary_id": 1,
}
session.bulk_insert_mappings(
Word,
list(word_insert_mappings.values()),
return_defaults=False,
render_nulls=True,
)
session.bulk_insert_mappings(
Pronunciation,
pronunciation_insert_mappings,
return_defaults=False,
render_nulls=True,
)
self.text_normalized = True
session.query(Corpus).update({"text_normalized": True})
if self.oov_count_threshold > 0:
session.query(Word).filter(Word.word_type == WordType.speech).filter(
Word.count <= self.oov_count_threshold
).update({Word.word_type: WordType.oov})
session.commit()
[docs]
def add_speaker(self, name: str, session: Session = None):
"""
Add a speaker to the corpus
Parameters
----------
name: str
Name of the speaker
session: sqlalchemy.orm.Session
Database session, if not specified, will use a temporary session
"""
if name in self._speaker_ids:
return
close = False
if session is None:
session = self.session()
close = True
speaker_obj = session.query(Speaker).filter_by(name=name).first()
if not speaker_obj:
dictionary = None
if hasattr(self, "get_dictionary_id"):
dictionary = session.get(Dictionary, self.get_dictionary_id(name))
speaker_obj = Speaker(
id=self.get_next_primary_key(Speaker), name=name, dictionary=dictionary
)
session.add(speaker_obj)
session.flush()
self._speaker_ids[name] = speaker_obj.id
else:
self._speaker_ids[name] = speaker_obj.id
if close:
session.commit()
session.close()
def _create_dummy_dictionary(self):
with self.session() as session:
if session.query(Dictionary).first() is None:
dialect = Dialect(name="unspecified")
d = Dictionary(name="unknown", path="unknown", dialect=dialect)
session.add(dialect)
session.add(d)
session.flush()
session.query(Speaker).update({Speaker.dictionary_id: d.id})
session.commit()
[docs]
def add_file(self, file: FileData, session: Session = None):
"""
Add a file to the corpus
Parameters
----------
file: :class:`~montreal_forced_aligner.corpus.classes.FileData`
File to be added
"""
close = False
if session is None:
session = self.session()
close = True
f = File(
id=self._current_file_index,
name=file.name,
relative_path=file.relative_path,
modified=False,
)
session.add(f)
session.flush()
for i, speaker in enumerate(file.speaker_ordering):
if speaker not in self._speaker_ids:
speaker_obj = Speaker(
id=self._current_speaker_index,
name=speaker,
dictionary_id=getattr(self, "_default_dictionary_id", None),
)
session.add(speaker_obj)
self._speaker_ids[speaker] = self._current_speaker_index
self._current_speaker_index += 1
so = SpeakerOrdering(
file_id=self._current_file_index,
speaker_id=self._speaker_ids[speaker],
index=i,
)
session.add(so)
if file.wav_path is not None:
sf = SoundFile(
file_id=self._current_file_index,
sound_file_path=file.wav_path,
format=file.wav_info.format,
sample_rate=file.wav_info.sample_rate,
duration=file.wav_info.duration,
num_channels=file.wav_info.num_channels,
sox_string=file.wav_info.sox_string,
)
session.add(sf)
if file.text_path is not None:
text_type = file.text_type
if isinstance(text_type, TextFileType):
text_type = file.text_type.value
tf = TextFile(
file_id=self._current_file_index,
text_file_path=file.text_path,
file_type=text_type,
)
session.add(tf)
frame_shift = getattr(self, "frame_shift", None)
if frame_shift is not None:
frame_shift = round(frame_shift / 1000, 4)
for u in file.utterances:
duration = u.end - u.begin
num_frames = None
if frame_shift is not None:
num_frames = int(duration / frame_shift)
utterance = Utterance(
id=self._current_utterance_index,
begin=u.begin,
end=u.end,
duration=duration,
channel=u.channel,
oovs=u.oovs,
normalized_text=u.normalized_text,
normalized_character_text=u.normalized_character_text,
text=u.text,
num_frames=num_frames,
in_subset=False,
ignored=False,
file_id=self._current_file_index,
speaker_id=self._speaker_ids[u.speaker_name],
)
self._current_utterance_index += 1
session.add(utterance)
if close:
session.commit()
session.close()
self._current_file_index += 1
[docs]
def generate_import_objects(self, file: FileData) -> DatabaseImportData:
"""
Add a file to the corpus
Parameters
----------
file: :class:`~montreal_forced_aligner.corpus.classes.FileData`
File to be added
"""
data = DatabaseImportData()
data.file_objects.append(
{
"id": self._current_file_index,
"name": file.name,
"relative_path": file.relative_path,
"modified": False,
}
)
for i, speaker in enumerate(file.speaker_ordering):
if speaker not in self._speaker_ids:
data.speaker_objects.append(
{
"id": self._current_speaker_index,
"name": speaker,
"dictionary_id": getattr(self, "_default_dictionary_id", None),
}
)
self._speaker_ids[speaker] = self._current_speaker_index
self._current_speaker_index += 1
data.speaker_ordering_objects.append(
{
"file_id": self._current_file_index,
"speaker_id": self._speaker_ids[speaker],
"index": i,
}
)
if file.wav_path is not None:
data.sound_file_objects.append(
{
"file_id": self._current_file_index,
"sound_file_path": file.wav_path,
"format": file.wav_info.format,
"sample_rate": file.wav_info.sample_rate,
"duration": file.wav_info.duration,
"num_channels": file.wav_info.num_channels,
"sox_string": file.wav_info.sox_string,
}
)
if file.text_path is not None:
text_type = file.text_type
if isinstance(text_type, TextFileType):
text_type = file.text_type.value
data.text_file_objects.append(
{
"file_id": self._current_file_index,
"text_file_path": file.text_path,
"file_type": text_type,
}
)
frame_shift = getattr(self, "frame_shift", None)
if frame_shift is not None:
frame_shift = round(frame_shift / 1000, 4)
for u in file.utterances:
duration = u.end - u.begin
num_frames = None
if frame_shift is not None:
num_frames = int(duration / frame_shift)
ignored = False
if self.ignore_empty_utterances and not u.text:
ignored = True
data.utterance_objects.append(
{
"id": self._current_utterance_index,
"begin": u.begin,
"end": u.end,
"channel": u.channel,
"oovs": u.oovs,
"normalized_text": u.normalized_text,
"normalized_character_text": u.normalized_character_text,
"text": u.text,
"num_frames": num_frames,
"in_subset": False,
"ignored": ignored,
"file_id": self._current_file_index,
"job_id": 1,
"speaker_id": self._speaker_ids[u.speaker_name],
}
)
self._current_utterance_index += 1
self._current_file_index += 1
return data
@property
def data_source_identifier(self) -> str:
"""Corpus name"""
return os.path.basename(self.corpus_directory)
[docs]
def create_subset(self, subset: int) -> None:
"""
Create a subset of utterances to use for training
Parameters
----------
subset: int
Number of utterances to include in subset
"""
logger.info(f"Creating subset directory with {subset} utterances...")
subset_directory = os.path.join(self.corpus_output_directory, f"subset_{subset}")
log_dir = os.path.join(subset_directory, "log")
os.makedirs(log_dir, exist_ok=True)
num_dictionaries = getattr(self, "num_dictionaries", 1)
with self.session() as session:
begin = time.time()
session.query(Utterance).update({Utterance.in_subset: False})
if num_dictionaries > 1:
subsets_per_dictionary = {}
utts_per_dictionary = {}
subsetted = 0
for dict_id in getattr(self, "dictionary_lookup", {}).values():
num_utts = (
session.query(Utterance)
.join(Utterance.speaker)
.filter(Speaker.dictionary_id == dict_id)
.count()
)
utts_per_dictionary[dict_id] = num_utts
if num_utts < int(subset / num_dictionaries):
subsets_per_dictionary[dict_id] = num_utts
subsetted += 1
remaining_subset = subset - sum(subsets_per_dictionary.values())
remaining_dicts = num_dictionaries - subsetted
remaining_subset_per_dictionary = int(remaining_subset / remaining_dicts)
for dict_id in getattr(self, "dictionary_lookup", {}).values():
num_utts = utts_per_dictionary[dict_id]
if dict_id in subsets_per_dictionary:
subset_per_dictionary = subsets_per_dictionary[dict_id]
else:
subset_per_dictionary = remaining_subset_per_dictionary
logger.debug(f"For {dict_id}, total number of utterances is {num_utts}")
larger_subset_num = int(subset_per_dictionary * 10)
if num_utts > larger_subset_num:
larger_subset_query = (
session.query(Utterance.id)
.join(Utterance.speaker)
.filter(Speaker.dictionary_id == dict_id)
.filter(Utterance.text.like("% %"))
.filter(Utterance.ignored == False) # noqa
.order_by(Utterance.duration)
.limit(larger_subset_num)
)
sq = larger_subset_query.subquery()
subset_utts = (
sqlalchemy.select(sq.c.id)
.order_by(sqlalchemy.func.random())
.limit(subset_per_dictionary)
.scalar_subquery()
)
query = (
sqlalchemy.update(Utterance)
.execution_options(synchronize_session="fetch")
.values(in_subset=True)
.where(Utterance.id.in_(subset_utts))
)
session.execute(query)
logger.debug(f"For {dict_id}, subset is {subset_per_dictionary}")
elif num_utts > subset_per_dictionary:
larger_subset_query = (
session.query(Utterance.id)
.join(Utterance.speaker)
.filter(Speaker.dictionary_id == dict_id)
.filter(Utterance.ignored == False) # noqa
)
sq = larger_subset_query.subquery()
subset_utts = (
sqlalchemy.select(sq.c.id)
.order_by(sqlalchemy.func.random())
.limit(subset_per_dictionary)
.scalar_subquery()
)
query = (
sqlalchemy.update(Utterance)
.execution_options(synchronize_session="fetch")
.values(in_subset=True)
.where(Utterance.id.in_(subset_utts))
)
session.execute(query)
logger.debug(f"For {dict_id}, subset is {subset_per_dictionary}")
else:
larger_subset_query = (
session.query(Utterance.id)
.join(Utterance.speaker)
.filter(Speaker.dictionary_id == dict_id)
.filter(Utterance.ignored == False) # noqa
)
sq = larger_subset_query.subquery()
subset_utts = sqlalchemy.select(sq.c.id).scalar_subquery()
query = (
sqlalchemy.update(Utterance)
.execution_options(synchronize_session="fetch")
.values(in_subset=True)
.where(Utterance.id.in_(subset_utts))
)
session.execute(query)
else:
larger_subset_num = subset * 10
if subset < self.num_utterances:
# Get all shorter utterances that are not one word long
larger_subset_query = (
session.query(Utterance.id)
.filter(Utterance.text.like("% %"))
.filter(Utterance.ignored == False) # noqa
.order_by(Utterance.duration)
.limit(larger_subset_num)
)
sq = larger_subset_query.subquery()
subset_utts = (
sqlalchemy.select(sq.c.id)
.order_by(sqlalchemy.func.random())
.limit(subset)
.scalar_subquery()
)
query = (
sqlalchemy.update(Utterance)
.execution_options(synchronize_session="fetch")
.values(in_subset=True)
.where(Utterance.id.in_(subset_utts))
)
session.execute(query)
else:
session.query(Utterance).update({Utterance.in_subset: True})
session.commit()
logger.debug(f"Setting subset flags took {time.time()-begin} seconds")
with self.session() as session, tqdm.tqdm(
total=subset, disable=GLOBAL_CONFIG.quiet
) as pbar:
jobs = (
session.query(Job)
.options(
joinedload(Job.corpus, innerjoin=True), subqueryload(Job.dictionaries)
)
.filter(Job.utterances.any(Utterance.in_subset == True)) # noqa
)
self._jobs = jobs.all()
arguments = [
ExportKaldiFilesArguments(j.id, self.db_string, None, subset_directory, False)
for j in self._jobs
]
for _ in run_kaldi_function(ExportKaldiFilesFunction, arguments, pbar.update):
pass
@property
def num_files(self) -> int:
"""Number of files in the corpus"""
if self._num_files is None:
with self.session() as session:
self._num_files = session.query(File).count()
return self._num_files
@property
def num_utterances(self) -> int:
"""Number of utterances in the corpus"""
if self._num_utterances is None:
with self.session() as session:
self._num_utterances = session.query(Utterance).count()
return self._num_utterances
@property
def num_speakers(self) -> int:
"""Number of speakers in the corpus"""
if self._num_speakers is None:
with self.session() as session:
self._num_speakers = session.query(sqlalchemy.func.count(Speaker.id)).scalar()
return self._num_speakers
[docs]
def subset_directory(self, subset: typing.Optional[int]) -> str:
"""
Construct a subset directory for the corpus
Parameters
----------
subset: int, optional
Number of utterances to include, if larger than the total number of utterance or not specified, the
split_directory is returned
Returns
-------
str
Path to subset directory
"""
self._jobs = []
with self.session() as session:
c = session.query(Corpus).first()
if subset is None or subset >= self.num_utterances or subset <= 0:
c.current_subset = 0
else:
c.current_subset = subset
session.commit()
if subset is None or subset >= self.num_utterances or subset <= 0:
return self.split_directory
directory = os.path.join(self.corpus_output_directory, f"subset_{subset}")
if not os.path.exists(directory):
self.create_subset(subset)
return directory
[docs]
def get_latest_workflow_run(self, workflow: WorkflowType, session: Session) -> CorpusWorkflow:
"""
Get the latest version of a workflow type
Parameters
----------
workflow: :class:`~montreal_forced_aligner.data.WorkflowType`
Workflow type
session: :class:`sqlalchemy.orm.Session`
Database session
Returns
-------
:class:`~montreal_forced_aligner.db.CorpusWorkflow` or None
Latest run of workflow type
"""
workflow = (
session.query(CorpusWorkflow)
.filter(CorpusWorkflow.workflow_type == workflow)
.order_by(CorpusWorkflow.time_stamp.desc())
.first()
)
return workflow
def _load_corpus(self) -> None:
"""
Load the corpus
"""
self.inspect_database()
logger.info("Setting up corpus information...")
if not self.imported:
logger.debug("Could not load from temp")
logger.info("Loading corpus from source files...")
if GLOBAL_CONFIG.use_mp:
self._load_corpus_from_source_mp()
else:
self._load_corpus_from_source()
else:
logger.debug("Successfully loaded from temporary files")
if not self.num_files:
raise CorpusError(
"There were no files found for this corpus. Please validate the corpus."
)
if not self.num_speakers:
raise CorpusError(
"There were no sound files found of the appropriate format. Please double check the corpus path "
"and/or run the validation utility (mfa validate)."
)
average_utterances = self.num_utterances / self.num_speakers
logger.info(
f"Found {self.num_speakers} speaker{'s' if self.num_speakers > 1 else ''} across {self.num_files} file{'s' if self.num_files > 1 else ''}, "
f"average number of utterances per speaker: {average_utterances}"
)
@property
def base_data_directory(self) -> str:
"""Corpus data directory"""
return self.corpus_output_directory
@property
def data_directory(self) -> str:
"""Corpus data directory"""
return self.split_directory
@abstractmethod
def _load_corpus_from_source_mp(self) -> None:
"""Abstract method for loading a corpus with multiprocessing"""
...
@abstractmethod
def _load_corpus_from_source(self) -> None:
"""Abstract method for loading a corpus without multiprocessing"""
...