Files
ffx/src/ffx/media_descriptor.py
2026-06-15 12:43:34 +02:00

783 lines
31 KiB
Python

import os, re, click
from typing import List, Self
from ffx.attachment_format import AttachmentFormat
from ffx.track_type import TrackType
from ffx.iso_language import IsoLanguage
from ffx.track_disposition import TrackDisposition
from ffx.track_codec import TrackCodec
from ffx.track_descriptor import TrackDescriptor
from ffx.logging_utils import get_ffx_logger
class MediaDescriptor:
"""This class represents the structural content of a media file including streams and metadata"""
CONTEXT_KEY = "context"
TAGS_KEY = "tags"
TRACKS_KEY = "tracks"
TRACK_DESCRIPTOR_LIST_KEY = "track_descriptors"
ATTACHMENT_DESCRIPTOR_LIST_KEY = "attachment_descriptors"
CLEAR_TAGS_FLAG_KEY = "clear_tags"
FFPROBE_DISPOSITION_KEY = "disposition"
FFPROBE_TAGS_KEY = "tags"
FFPROBE_CODEC_TYPE_KEY = "codec_type"
#407 remove as well
EXCLUDED_MEDIA_TAGS = ["creation_time"]
SEASON_EPISODE_STREAM_LANGUAGE_DISPOSITIONS_MATCH = '[sS]([0-9]+)[eE]([0-9]+)_([0-9]+)_([a-z]{3})(?:_([A-Z]{3}))*'
STREAM_LANGUAGE_DISPOSITIONS_MATCH = '([0-9]+)_([a-z]{3})(?:_([A-Z]{3}))*'
SUBTITLE_FILE_EXTENSION = 'vtt'
def __init__(self, **kwargs):
if MediaDescriptor.CONTEXT_KEY in kwargs.keys():
if type(kwargs[MediaDescriptor.CONTEXT_KEY]) is not dict:
raise TypeError(
f"MediaDescriptor.__init__(): Argument {MediaDescriptor.CONTEXT_KEY} is required to be of type dict"
)
self.__context = kwargs[MediaDescriptor.CONTEXT_KEY]
self.__logger = self.__context['logger']
else:
self.__context = {}
self.__logger = get_ffx_logger()
if MediaDescriptor.TAGS_KEY in kwargs.keys():
if type(kwargs[MediaDescriptor.TAGS_KEY]) is not dict:
raise TypeError(
f"MediaDescriptor.__init__(): Argument {MediaDescriptor.TAGS_KEY} is required to be of type dict"
)
self.__mediaTags = kwargs[MediaDescriptor.TAGS_KEY]
else:
self.__mediaTags = {}
if MediaDescriptor.TRACK_DESCRIPTOR_LIST_KEY in kwargs.keys():
if (
type(kwargs[MediaDescriptor.TRACK_DESCRIPTOR_LIST_KEY]) is not list
): # Use List typehint for TrackDescriptor as well if it works
raise TypeError(
f"MediaDescriptor.__init__(): Argument {MediaDescriptor.TRACK_DESCRIPTOR_LIST_KEY} is required to be of type list"
)
for d in kwargs[MediaDescriptor.TRACK_DESCRIPTOR_LIST_KEY]:
if type(d) is not TrackDescriptor:
raise TypeError(
f"TrackDesciptor.__init__(): All elements of argument list {MediaDescriptor.TRACK_DESCRIPTOR_LIST_KEY} are required to be of type TrackDescriptor"
)
self.__trackDescriptors: List[TrackDescriptor] = kwargs[MediaDescriptor.TRACK_DESCRIPTOR_LIST_KEY]
else:
self.__trackDescriptors: List[TrackDescriptor] = []
def setTrackLanguage(self, language: str, index: int, trackType: TrackType = None):
trackLanguage = IsoLanguage.findThreeLetter(language)
if trackLanguage == IsoLanguage.UNDEFINED:
self.__logger.warning('MediaDescriptor.setTrackLanguage(): Parameter language does not contain a registered '
+ f"ISO 639 3-letter language code, skipping to set language for"
+ str('' if trackType is None else trackType.label()) + f"track {index}")
trackList = self.getTrackDescriptors(trackType=trackType)
if index < 0 or index > len(trackList) - 1:
self.__logger.warning(f"MediaDescriptor.setTrackLanguage(): Parameter index ({index}) is "
+ f"out of range of {'' if trackType is None else trackType.label()}track list")
td: TrackDescriptor = trackList[index]
td.setLanguage(trackLanguage)
return
def setTrackTitle(self, title: str, index: int, trackType: TrackType = None):
trackList = self.getTrackDescriptors(trackType=trackType)
if index < 0 or index > len(trackList) - 1:
self.__logger.error(f"MediaDescriptor.setTrackTitle(): Parameter index ({index}) is "
+ f"out of range of {'' if trackType is None else trackType.label()}track list")
raise click.Abort()
td: TrackDescriptor = trackList[index]
td.setTitle(title)
def setDefaultSubTrack(self, trackType: TrackType, subIndex: int):
# for t in self.getAllTrackDescriptors():
for t in self.getTrackDescriptors():
if t.getType() == trackType:
t.setDispositionFlag(
TrackDisposition.DEFAULT, t.getSubIndex() == int(subIndex)
)
def setForcedSubTrack(self, trackType: TrackType, subIndex: int):
# for t in self.getAllTrackDescriptors():
for t in self.getTrackDescriptors():
if t.getType() == trackType:
t.setDispositionFlag(
TrackDisposition.FORCED, t.getSubIndex() == int(subIndex)
)
def checkConfiguration(self):
videoTracks = self.getVideoTracks()
audioTracks = self.getAudioTracks()
subtitleTracks = self.getSubtitleTracks()
if len([v for v in videoTracks if v.getDispositionFlag(TrackDisposition.DEFAULT)]) > 1:
raise ValueError('More than one default video track')
if len([a for a in audioTracks if a.getDispositionFlag(TrackDisposition.DEFAULT)]) > 1:
raise ValueError('More than one default audio track')
if len([s for s in subtitleTracks if s.getDispositionFlag(TrackDisposition.DEFAULT)]) > 1:
raise ValueError('More than one default subtitle track')
if len([v for v in videoTracks if v.getDispositionFlag(TrackDisposition.FORCED)]) > 1:
raise ValueError('More than one forced video track')
if len([a for a in audioTracks if a.getDispositionFlag(TrackDisposition.FORCED)]) > 1:
raise ValueError('More than one forced audio track')
if len([s for s in subtitleTracks if s.getDispositionFlag(TrackDisposition.FORCED)]) > 1:
raise ValueError('More than one forced subtitle track')
trackDescriptors = videoTracks + audioTracks + subtitleTracks
sourceIndices = [
t.getSourceIndex() for t in trackDescriptors
]
if len(set(sourceIndices)) < len(trackDescriptors):
raise ValueError('Multiple streams originating from the same source stream')
def applyOverrides(self, overrides: dict):
if 'languages' in overrides.keys():
for trackIndex in overrides['languages'].keys():
self.setTrackLanguage(overrides['languages'][trackIndex], trackIndex)
if 'titles' in overrides.keys():
for trackIndex in overrides['titles'].keys():
self.setTrackTitle(overrides['titles'][trackIndex], trackIndex)
if 'forced_video' in overrides.keys():
sti = int(overrides['forced_video'])
self.setForcedSubTrack(TrackType.VIDEO, sti)
self.setDefaultSubTrack(TrackType.VIDEO, sti)
elif 'default_video' in overrides.keys():
sti = int(overrides['default_video'])
self.setDefaultSubTrack(TrackType.VIDEO, sti)
if 'forced_audio' in overrides.keys():
sti = int(overrides['forced_audio'])
self.setForcedSubTrack(TrackType.AUDIO, sti)
self.setDefaultSubTrack(TrackType.AUDIO, sti)
elif 'default_audio' in overrides.keys():
sti = int(overrides['default_audio'])
self.setDefaultSubTrack(TrackType.AUDIO, sti)
if 'forced_subtitle' in overrides.keys():
sti = int(overrides['forced_subtitle'])
self.setForcedSubTrack(TrackType.SUBTITLE, sti)
self.setDefaultSubTrack(TrackType.SUBTITLE, sti)
elif 'default_subtitle' in overrides.keys():
sti = int(overrides['default_subtitle'])
self.setDefaultSubTrack(TrackType.SUBTITLE, sti)
if 'stream_order' in overrides.keys():
self.rearrangeTrackDescriptors(overrides['stream_order'])
def applySourceIndices(self, sourceMediaDescriptor: Self):
# sourceTrackDescriptors = sourceMediaDescriptor.getAllTrackDescriptors()
sourceTrackDescriptors = sourceMediaDescriptor.getTrackDescriptors()
numTrackDescriptors = len(self.__trackDescriptors)
if len(sourceTrackDescriptors) != numTrackDescriptors:
raise ValueError('MediaDescriptor.applySourceIndices (): Number of track descriptors does not match')
for trackIndex in range(numTrackDescriptors):
self.__trackDescriptors[trackIndex].setSourceIndex(sourceTrackDescriptors[trackIndex].getSourceIndex())
def rearrangeTrackDescriptors(self, newOrder: List[int]):
if len(newOrder) != len(self.__trackDescriptors):
raise ValueError('Length of list with reordered indices does not match number of track descriptors')
reorderedTrackDescriptors = []
for oldIndex in newOrder:
reorderedTrackDescriptors.append(self.__trackDescriptors[oldIndex])
self.__trackDescriptors = reorderedTrackDescriptors
self.reindexSubIndices()
self.reindexIndices()
@classmethod
def fromFfprobe(cls, context, formatData, streamData):
kwargs = {}
kwargs[MediaDescriptor.CONTEXT_KEY] = context
if MediaDescriptor.FFPROBE_TAGS_KEY in formatData.keys():
kwargs[MediaDescriptor.TAGS_KEY] = formatData[
MediaDescriptor.FFPROBE_TAGS_KEY
]
kwargs[MediaDescriptor.TRACK_DESCRIPTOR_LIST_KEY] = []
# TODO: Evtl obsolet
subIndexCounters = {}
for streamObj in streamData:
ffprobeCodecType = streamObj[MediaDescriptor.FFPROBE_CODEC_TYPE_KEY]
trackType = TrackType.fromLabel(ffprobeCodecType)
if trackType != TrackType.UNKNOWN:
if trackType not in subIndexCounters.keys():
subIndexCounters[trackType] = 0
kwargs[MediaDescriptor.TRACK_DESCRIPTOR_LIST_KEY].append(
TrackDescriptor.fromFfprobe(
streamObj, subIndex=subIndexCounters[trackType]
)
)
subIndexCounters[trackType] += 1
return cls(**kwargs)
def getTags(self):
return self.__mediaTags
def sortSubIndices(
self, descriptors: List[TrackDescriptor]
) -> List[TrackDescriptor]:
subIndex = 0
for d in descriptors:
d.setSubIndex(subIndex)
subIndex += 1
return descriptors
def reindexSubIndices(self, trackDescriptors: list = []):
tdList = trackDescriptors if trackDescriptors else self.__trackDescriptors
subIndexCounter = {}
for td in tdList:
trackType = td.getType()
if trackType not in subIndexCounter.keys():
subIndexCounter[trackType] = 0
td.setSubIndex(subIndexCounter[trackType])
subIndexCounter[trackType] += 1
def sortIndices(
self, descriptors: List[TrackDescriptor]
) -> List[TrackDescriptor]:
index = 0
for d in descriptors:
d.setIndex(index)
index += 1
return descriptors
def reindexIndices(self, trackDescriptors: list = []):
tdList = trackDescriptors if trackDescriptors else self.__trackDescriptors
for trackIndex in range(len(tdList)):
tdList[trackIndex].setIndex(trackIndex)
# def getAllTrackDescriptors(self):
# """Returns all track descriptors sorted by type: video, audio then subtitles"""
# return self.getVideoTracks() + self.getAudioTracks() + self.getSubtitleTracks()
def getTrackDescriptors(self,
trackType: TrackType = None) -> List[TrackDescriptor]:
if trackType is None:
return self.__trackDescriptors
descriptorList = []
for td in self.__trackDescriptors:
if td.getType() == trackType:
descriptorList.append(td)
return descriptorList
def getVideoTracks(self) -> List[TrackDescriptor]:
return [v for v in self.__trackDescriptors if v.getType() == TrackType.VIDEO]
def getAudioTracks(self) -> List[TrackDescriptor]:
return [a for a in self.__trackDescriptors if a.getType() == TrackType.AUDIO]
def getSubtitleTracks(self) -> List[TrackDescriptor]:
return [
s
for s in self.__trackDescriptors
if s.getType() == TrackType.SUBTITLE
]
def getAttachmentTracks(self) -> List[TrackDescriptor]:
return [
s
for s in self.__trackDescriptors
if s.getType() == TrackType.ATTACHMENT
]
def hasStyledAssSubtitlesWithFontAttachments(self) -> bool:
return (
any(
trackDescriptor.getCodec() == TrackCodec.ASS
for trackDescriptor in self.getSubtitleTracks()
)
and any(
trackDescriptor.getAttachmentFormat() == AttachmentFormat.TTF
for trackDescriptor in self.getAttachmentTracks()
)
)
def withoutAttachmentTracks(
self,
attachmentFormat: AttachmentFormat | None = None,
context: dict | None = None,
):
filteredTrackDescriptors = []
for trackDescriptor in self.__trackDescriptors:
if trackDescriptor.getType() == TrackType.ATTACHMENT and (
attachmentFormat is None
or trackDescriptor.getAttachmentFormat() == attachmentFormat
):
continue
filteredTrackDescriptors.append(
trackDescriptor.clone(
context=context if context is not None else self.__context
)
)
kwargs = {
MediaDescriptor.TAGS_KEY: dict(self.__mediaTags),
MediaDescriptor.TRACK_DESCRIPTOR_LIST_KEY: filteredTrackDescriptors,
}
if context is not None:
kwargs[MediaDescriptor.CONTEXT_KEY] = context
elif self.__context:
kwargs[MediaDescriptor.CONTEXT_KEY] = self.__context
filteredMediaDescriptor = MediaDescriptor(**kwargs)
filteredMediaDescriptor.reindexSubIndices()
return filteredMediaDescriptor
def withoutAttachmentsForComparison(self):
return self.withoutAttachmentTracks(context=self.__context)
def withSourceAttachmentTracks(
self,
sourceMediaDescriptor: Self,
attachmentFormat: AttachmentFormat | None = None,
context: dict | None = None,
):
trackDescriptors = []
for trackDescriptor in self.__trackDescriptors:
if trackDescriptor.getType() == TrackType.ATTACHMENT and (
attachmentFormat is None
or trackDescriptor.getAttachmentFormat() == attachmentFormat
):
continue
trackDescriptors.append(
trackDescriptor.clone(
context=context if context is not None else self.__context
)
)
for sourceTrackDescriptor in sourceMediaDescriptor.getAttachmentTracks():
if (
attachmentFormat is not None
and sourceTrackDescriptor.getAttachmentFormat() != attachmentFormat
):
continue
attachmentClone = sourceTrackDescriptor.clone(
context=context if context is not None else self.__context
)
attachmentClone.setIndex(len(trackDescriptors))
trackDescriptors.append(attachmentClone)
kwargs = {
MediaDescriptor.TAGS_KEY: dict(self.__mediaTags),
MediaDescriptor.TRACK_DESCRIPTOR_LIST_KEY: trackDescriptors,
}
if context is not None:
kwargs[MediaDescriptor.CONTEXT_KEY] = context
elif self.__context:
kwargs[MediaDescriptor.CONTEXT_KEY] = self.__context
mergedMediaDescriptor = MediaDescriptor(**kwargs)
mergedMediaDescriptor.reindexSubIndices()
return mergedMediaDescriptor
def getImportFileTokens(self, use_sub_index: bool = True):
"""Generate ffmpeg import options for external stream files"""
importFileTokens = []
td: TrackDescriptor
for td in self.__trackDescriptors:
importedFilePath = td.getExternalSourceFilePath()
if importedFilePath:
substitutionMessage = (
f"Substituting subtitle stream #{td.getIndex()} "
+ f"({td.getType().label()}:{td.getSubIndex()}) "
+ f"with import from file {td.getExternalSourceFilePath()}"
)
click.echo(substitutionMessage)
self.__logger.debug(substitutionMessage)
importFileTokens += [
"-i",
importedFilePath,
]
return importFileTokens
def getInputMappingTokens(self,
use_sub_index: bool = True,
only_video: bool = False,
sourceMediaDescriptor: Self = None):
"""Tracks must be reordered for source index order"""
inputMappingTokens = []
sortedTrackDescriptors = sorted(self.__trackDescriptors, key=lambda d: d.getIndex())
sourceTrackDescriptorsByIndex = {
td.getIndex(): td
for td in (
sourceMediaDescriptor.getTrackDescriptors()
if sourceMediaDescriptor is not None
else sortedTrackDescriptors
)
}
# raise click.ClickException(' '.join([f"\nindex={td.getIndex()} subIndex={td.getSubIndex()} srcIndex={td.getSourceIndex()} type={td.getType().label()}" for td in self.__trackDescriptors]))
filePointer = 1
for trackIndex in range(len(sortedTrackDescriptors)):
td: TrackDescriptor = sortedTrackDescriptors[trackIndex]
#HINT: Attached thumbnails are not supported by .webm container format
if td.getCodec() != TrackCodec.PNG:
sourceTrackDescriptor = sourceTrackDescriptorsByIndex.get(td.getSourceIndex())
if sourceTrackDescriptor is None:
raise ValueError(f"No source track descriptor found for source index {td.getSourceIndex()}")
stdi = sourceTrackDescriptor.getIndex()
stdsi = sourceTrackDescriptor.getSubIndex()
trackType = td.getType()
trackCodec = td.getCodec()
if (trackType != TrackType.ATTACHMENT
and (trackType == TrackType.VIDEO or not only_video)):
importedFilePath = td.getExternalSourceFilePath()
if use_sub_index:
if importedFilePath:
inputMappingTokens += [
"-map",
f"{filePointer}:{trackType.indicator()}:0",
]
filePointer += 1
else:
if not trackCodec in [TrackCodec.PGS, TrackCodec.VOBSUB]:
inputMappingTokens += [
"-map",
f"0:{trackType.indicator()}:{stdsi}",
]
else:
if not trackCodec in [TrackCodec.PGS, TrackCodec.VOBSUB]:
inputMappingTokens += ["-map", f"0:{stdi}"]
if sourceMediaDescriptor:
fontDescriptors = [ftd for ftd in sourceMediaDescriptor.getAttachmentTracks()
if ftd.getAttachmentFormat() == AttachmentFormat.TTF]
else:
fontDescriptors = [ftd for ftd in self.__trackDescriptors
if ftd.getType() == TrackType.ATTACHMENT
and ftd.getAttachmentFormat() == AttachmentFormat.TTF]
for ad in sorted(fontDescriptors, key=lambda d: d.getIndex()):
inputMappingTokens += ["-map", f"0:{ad.getIndex()}"]
return inputMappingTokens
def searchSubtitleFiles(
self,
searchDirectory,
prefix,
extension=SUBTITLE_FILE_EXTENSION,
strict=False,
):
normalizedExtension = str(extension).strip().lower()
if normalizedExtension.startswith('.'):
normalizedExtension = normalizedExtension[1:]
escapedPrefix = re.escape(prefix)
sesld_match = re.compile(
f"{escapedPrefix}_{MediaDescriptor.SEASON_EPISODE_STREAM_LANGUAGE_DISPOSITIONS_MATCH}"
)
sld_match = re.compile(
f"{escapedPrefix}_{MediaDescriptor.STREAM_LANGUAGE_DISPOSITIONS_MATCH}"
)
subtitleFileDescriptors = []
subtitleFilenames = []
for subtitleFilename in sorted(os.listdir(searchDirectory)):
subtitleFilePath = os.path.join(searchDirectory, subtitleFilename)
subtitleFilenameStem, subtitleFilenameExtension = os.path.splitext(
subtitleFilename
)
if (
os.path.isfile(subtitleFilePath)
and subtitleFilenameStem.startswith(prefix + '_')
and subtitleFilenameExtension.lower() == '.' + normalizedExtension
):
subtitleFilenames.append(subtitleFilename)
expectedSubtitleTrackIndices = {
subtitleTrack.getIndex()
for subtitleTrack in self.getSubtitleTracks()
}
if strict and len(subtitleFilenames) > len(expectedSubtitleTrackIndices):
raise ValueError(
f"Found {len(subtitleFilenames)} matching .{normalizedExtension} files "
+ f"for {len(expectedSubtitleTrackIndices)} subtitle tracks."
)
for subtitleFilename in subtitleFilenames:
subtitleFilenameStem = os.path.splitext(subtitleFilename)[0]
sesld_result = (
None
if strict
else sesld_match.fullmatch(subtitleFilenameStem)
)
sld_result = (
None
if sesld_result is not None
else sld_match.fullmatch(subtitleFilenameStem)
)
if strict and sesld_result is None and sld_result is None:
raise ValueError(
f"Subtitle filename does not match the expected pattern: "
+ subtitleFilename
)
if sesld_result is not None:
subtitleFilePath = os.path.join(searchDirectory, subtitleFilename)
subtitleFileDescriptor = {}
subtitleFileDescriptor["path"] = subtitleFilePath
subtitleFileDescriptor["season"] = int(sesld_result.group(1))
subtitleFileDescriptor["episode"] = int(sesld_result.group(2))
subtitleFileDescriptor["index"] = int(sesld_result.group(3))
subtitleFileDescriptor["language"] = sesld_result.group(4)
dispSet = set()
dispCaptGroups = sesld_result.groups()
numCaptGroups = len(dispCaptGroups)
if numCaptGroups > 4:
for groupIndex in range(numCaptGroups - 4):
disp = TrackDisposition.fromIndicator(
dispCaptGroups[groupIndex + 4]
)
if disp is not None:
dispSet.add(disp)
subtitleFileDescriptor["disposition_set"] = dispSet
subtitleFileDescriptors.append(subtitleFileDescriptor)
if sld_result is not None:
subtitleFilePath = os.path.join(searchDirectory, subtitleFilename)
subtitleFileDescriptor = {}
subtitleFileDescriptor["path"] = subtitleFilePath
subtitleFileDescriptor["index"] = int(sld_result.group(1))
subtitleFileDescriptor["language"] = sld_result.group(2)
dispSet = set()
dispCaptGroups = sld_result.groups()
numCaptGroups = len(dispCaptGroups)
if numCaptGroups > 2:
for groupIndex in range(numCaptGroups - 2):
disp = TrackDisposition.fromIndicator(
dispCaptGroups[groupIndex + 2]
)
if disp is not None:
dispSet.add(disp)
subtitleFileDescriptor["disposition_set"] = dispSet
subtitleFileDescriptors.append(subtitleFileDescriptor)
if strict:
discoveredTrackIndices = [
descriptor['index'] for descriptor in subtitleFileDescriptors
]
duplicateTrackIndices = sorted(
{
trackIndex
for trackIndex in discoveredTrackIndices
if discoveredTrackIndices.count(trackIndex) > 1
}
)
if duplicateTrackIndices:
duplicateDescription = ', '.join(
f"#{index}" for index in duplicateTrackIndices
)
raise ValueError(
"Multiple external subtitle files refer to subtitle track(s) "
+ duplicateDescription
+ "."
)
unexpectedTrackIndices = sorted(
set(discoveredTrackIndices) - expectedSubtitleTrackIndices
)
if unexpectedTrackIndices:
unexpectedDescription = ', '.join(
f"#{index}" for index in unexpectedTrackIndices
)
expectedDescription = ', '.join(
f"#{index}" for index in sorted(expectedSubtitleTrackIndices)
) or 'none'
raise ValueError(
"External subtitle track index pattern does not match the media "
+ f"subtitle tracks: found {unexpectedDescription}; "
+ f"expected a subset of {expectedDescription}."
)
self.__logger.debug(f"searchSubtitleFiles(): Available subtitle files {subtitleFileDescriptors}")
return subtitleFileDescriptors
def importSubtitles(
self,
searchDirectory,
prefix,
season: int = -1,
episode: int = -1,
preserve_dispositions: bool = False,
extension: str = SUBTITLE_FILE_EXTENSION,
strict: bool = False,
):
# click.echo(f"Season: {season} Episode: {episode}")
self.__logger.debug(f"importSubtitles(): Season: {season} Episode: {episode}")
availableFileSubtitleDescriptors = self.searchSubtitleFiles(
searchDirectory,
prefix,
extension=extension,
strict=strict,
)
self.__logger.debug(f"importSubtitles(): availableFileSubtitleDescriptors: {availableFileSubtitleDescriptors}")
subtitleTracks = self.getSubtitleTracks()
self.__logger.debug(f"importSubtitles(): subtitleTracks: {[s.getIndex() for s in subtitleTracks]}")
matchingSubtitleFileDescriptors = (
sorted(
[
d
for d in availableFileSubtitleDescriptors
if (strict
or (season == -1 and episode == -1)
or (
d.get("season") == int(season)
and d.get("episode") == int(episode)
))
],
key=lambda d: d["index"],
)
if availableFileSubtitleDescriptors
else []
)
self.__logger.debug(f"importSubtitles(): matchingSubtitleFileDescriptors: {matchingSubtitleFileDescriptors}")
importedTrackIndices = []
for msfd in matchingSubtitleFileDescriptors:
matchingSubtitleTrackDescriptor = [s for s in subtitleTracks if s.getIndex() == msfd["index"]]
if matchingSubtitleTrackDescriptor:
# click.echo(f"Found matching subtitle file {msfd["path"]}\n")
self.__logger.debug(f"importSubtitles(): Found matching subtitle file {msfd['path']}")
matchingTrack = matchingSubtitleTrackDescriptor[0]
matchingTrack.setExternalSourceFilePath(msfd["path"])
# Prefer metadata coming from the external single-track source when
# it is provided explicitly by the filename contract.
matchingTrack.getTags()["language"] = msfd["language"]
if msfd["disposition_set"] and not preserve_dispositions:
matchingTrack.setDispositionSet(msfd["disposition_set"])
importedTrackIndices.append(matchingTrack.getIndex())
expectedTrackIndices = sorted(
subtitleTrack.getIndex() for subtitleTrack in subtitleTracks
)
importedTrackIndices = sorted(set(importedTrackIndices))
return {
"candidate_count": len(availableFileSubtitleDescriptors),
"imported_track_indices": importedTrackIndices,
"missing_track_indices": sorted(
set(expectedTrackIndices) - set(importedTrackIndices)
),
}
def getConfiguration(self, label: str = ''):
yield f"--- {label if label else 'MediaDescriptor '+str(id(self))} {' '.join([str(k)+'='+str(v) for k,v in self.__mediaTags.items()])}"
# for td in self.getAllTrackDescriptors():
for td in self.getTrackDescriptors():
yield (f"{td.getIndex()}:{td.getType().indicator()}:{td.getSubIndex()} "
+ '|'.join([d.indicator() for d in td.getDispositionSet()])
+ ' ' + ' '.join([str(k)+'='+str(v) for k,v in td.getTags().items()]))
def clone(self, context: dict | None = None):
kwargs = {
MediaDescriptor.TAGS_KEY: dict(self.__mediaTags),
MediaDescriptor.TRACK_DESCRIPTOR_LIST_KEY: [
trackDescriptor.clone(context=context if context is not None else self.__context)
for trackDescriptor in self.__trackDescriptors
],
}
if context is not None:
kwargs[MediaDescriptor.CONTEXT_KEY] = context
elif self.__context:
kwargs[MediaDescriptor.CONTEXT_KEY] = self.__context
return MediaDescriptor(**kwargs)