Files
ffx/bin/ffx/media_descriptor.py
2024-11-10 14:33:59 +01:00

538 lines
21 KiB
Python

import os, re, click, logging
from typing import List, Self
from ffx.track_type import TrackType
from ffx.iso_language import IsoLanguage
from ffx.track_disposition import TrackDisposition
from ffx.track_descriptor import TrackDescriptor
from ffx.helper import dictDiff, DIFF_ADDED_KEY, DIFF_CHANGED_KEY, DIFF_REMOVED_KEY
class MediaDescriptor:
"""This class represents the structural content of a media file including streams and metadata"""
CONTEXT_KEY = "context"
TAGS_KEY = "tags"
TRACKS_KEY = "tracks"
TRACK_DESCRIPTOR_LIST_KEY = "track_descriptors"
CLEAR_TAGS_FLAG_KEY = "clear_tags"
FFPROBE_DISPOSITION_KEY = "disposition"
FFPROBE_TAGS_KEY = "tags"
FFPROBE_CODEC_TYPE_KEY = "codec_type"
# JELLYFIN_ORDER_FLAG_KEY = "jellyfin_order"
EXCLUDED_MEDIA_TAGS = ["creation_time"]
SEASON_EPISODE_STREAM_LANGUAGE_MATCH = '[sS]([0-9]+)[eE]([0-9]+)_([0-9]+)_([a-z]{3})'
SUBTITLE_FILE_EXTENSION = 'vtt'
def __init__(self, **kwargs):
if MediaDescriptor.CONTEXT_KEY in kwargs.keys():
if type(kwargs[MediaDescriptor.CONTEXT_KEY]) is not dict:
raise TypeError(
f"MediaDescriptor.__init__(): Argument {MediaDescriptor.CONTEXT_KEY} is required to be of type dict"
)
self.__context = kwargs[MediaDescriptor.CONTEXT_KEY]
self.__logger = self.__context['logger']
else:
self.__context = {}
self.__logger = logging.getLogger('FFX').addHandler(logging.NullHandler())
if MediaDescriptor.TAGS_KEY in kwargs.keys():
if type(kwargs[MediaDescriptor.TAGS_KEY]) is not dict:
raise TypeError(
f"MediaDescriptor.__init__(): Argument {MediaDescriptor.TAGS_KEY} is required to be of type dict"
)
self.__mediaTags = kwargs[MediaDescriptor.TAGS_KEY]
else:
self.__mediaTags = {}
if MediaDescriptor.TRACK_DESCRIPTOR_LIST_KEY in kwargs.keys():
if (
type(kwargs[MediaDescriptor.TRACK_DESCRIPTOR_LIST_KEY]) is not list
): # Use List typehint for TrackDescriptor as well if it works
raise TypeError(
f"MediaDescriptor.__init__(): Argument {MediaDescriptor.TRACK_DESCRIPTOR_LIST_KEY} is required to be of type list"
)
for d in kwargs[MediaDescriptor.TRACK_DESCRIPTOR_LIST_KEY]:
if type(d) is not TrackDescriptor:
raise TypeError(
f"TrackDesciptor.__init__(): All elements of argument list {MediaDescriptor.TRACK_DESCRIPTOR_LIST_KEY} are required to be of type TrackDescriptor"
)
self.__trackDescriptors = kwargs[MediaDescriptor.TRACK_DESCRIPTOR_LIST_KEY]
else:
self.__trackDescriptors = []
#TODO: to be removed
self.__jellyfinOrder = False
def setTrackLanguage(self, language: str, index: int, trackType: TrackType = None):
trackLanguage = IsoLanguage.findThreeLetter(language)
if trackLanguage == IsoLanguage.UNDEFINED:
self.__logger.warning('MediaDescriptor.setTrackLanguage(): Parameter language does not contain a registered '
+ f"ISO 639 3-letter language code, skipping to set language for"
+ str('' if trackType is None else trackType.label()) + f"track {index}")
trackList = self.getTrackDescriptors(trackType=trackType)
if index < 0 or index > len(trackList) - 1:
self.__logger.warning(f"MediaDescriptor.setTrackLanguage(): Parameter index ({index}) is "
+ f"out of range of {'' if trackType is None else trackType.label()}track list")
td: TrackDescriptor = trackList[index]
td.setLanguage(trackLanguage)
return
def setTrackTitle(self, title: str, index: int, trackType: TrackType = None):
trackList = self.getTrackDescriptors(trackType=trackType)
if index < 0 or index > len(trackList) - 1:
self.__logger.error(f"MediaDescriptor.setTrackTitle(): Parameter index ({index}) is "
+ f"out of range of {'' if trackType is None else trackType.label()}track list")
raise click.Abort()
td: TrackDescriptor = trackList[index]
td.setTitle(title)
def setDefaultSubTrack(self, trackType: TrackType, subIndex: int):
for t in self.getAllTrackDescriptors():
if t.getType() == trackType:
t.setDispositionFlag(
TrackDisposition.DEFAULT, t.getSubIndex() == int(subIndex)
)
def setForcedSubTrack(self, trackType: TrackType, subIndex: int):
for t in self.getAllTrackDescriptors():
if t.getType() == trackType:
t.setDispositionFlag(
TrackDisposition.FORCED, t.getSubIndex() == int(subIndex)
)
def checkConfiguration(self):
videoTracks = self.getVideoTracks()
audioTracks = self.getAudioTracks()
subtitleTracks = self.getSubtitleTracks()
if len([v for v in videoTracks if v.getDispositionFlag(TrackDisposition.DEFAULT)]) > 1:
raise ValueError('More than one default video track')
if len([a for a in audioTracks if a.getDispositionFlag(TrackDisposition.DEFAULT)]) > 1:
raise ValueError('More than one default audio track')
if len([s for s in subtitleTracks if s.getDispositionFlag(TrackDisposition.DEFAULT)]) > 1:
raise ValueError('More than one default subtitle track')
if len([v for v in videoTracks if v.getDispositionFlag(TrackDisposition.FORCED)]) > 1:
raise ValueError('More than one forced video track')
if len([a for a in audioTracks if a.getDispositionFlag(TrackDisposition.FORCED)]) > 1:
raise ValueError('More than one forced audio track')
if len([s for s in subtitleTracks if s.getDispositionFlag(TrackDisposition.FORCED)]) > 1:
raise ValueError('More than one forced subtitle track')
trackDescriptors = videoTracks + audioTracks + subtitleTracks
sourceIndices = [
t.getSourceIndex() for t in trackDescriptors
]
if len(set(sourceIndices)) < len(trackDescriptors):
raise ValueError('Multiple streams originating from the same source stream')
def applyOverrides(self, overrides: dict):
if 'languages' in overrides.keys():
for trackIndex in overrides['languages'].keys():
self.setTrackLanguage(overrides['languages'][trackIndex], trackIndex)
if 'titles' in overrides.keys():
for trackIndex in overrides['titles'].keys():
self.setTrackTitle(overrides['titles'][trackIndex], trackIndex)
if 'forced_video' in overrides.keys():
sti = int(overrides['forced_video'])
self.setForcedSubTrack(TrackType.VIDEO, sti)
self.setDefaultSubTrack(TrackType.VIDEO, sti)
elif 'default_video' in overrides.keys():
sti = int(overrides['default_video'])
self.setDefaultSubTrack(TrackType.VIDEO, sti)
if 'forced_audio' in overrides.keys():
sti = int(overrides['forced_audio'])
self.setForcedSubTrack(TrackType.AUDIO, sti)
self.setDefaultSubTrack(TrackType.AUDIO, sti)
elif 'default_audio' in overrides.keys():
sti = int(overrides['default_audio'])
self.setDefaultSubTrack(TrackType.AUDIO, sti)
if 'forced_subtitle' in overrides.keys():
sti = int(overrides['forced_subtitle'])
self.setForcedSubTrack(TrackType.SUBTITLE, sti)
self.setDefaultSubTrack(TrackType.SUBTITLE, sti)
elif 'default_subtitle' in overrides.keys():
sti = int(overrides['default_subtitle'])
self.setDefaultSubTrack(TrackType.SUBTITLE, sti)
if 'stream_order' in overrides.keys():
self.rearrangeTrackDescriptors(overrides['stream_order'])
def applySourceIndices(self, sourceMediaDescriptor: Self):
sourceTrackDescriptors = sourceMediaDescriptor.getAllTrackDescriptors()
numTrackDescriptors = len(self.__trackDescriptors)
if len(sourceTrackDescriptors) != numTrackDescriptors:
raise ValueError('MediaDescriptor.applySourceIndices (): Number of track descriptors does not match')
for trackIndex in range(numTrackDescriptors):
self.__trackDescriptors[trackIndex].setSourceIndex(sourceTrackDescriptors[trackIndex].getSourceIndex())
def rearrangeTrackDescriptors(self, newOrder: List[int]):
if len(newOrder) != len(self.__trackDescriptors):
raise ValueError('Length of list with reordered indices does not match number of track descriptors')
reorderedTrackDescriptors = {}
for oldIndex in newOrder:
reorderedTrackDescriptors.append(self.__trackDescriptors[oldIndex])
self.__trackDescriptors = reorderedTrackDescriptors
self.reindexSubIndices()
self.reindexIndices()
@classmethod
def fromFfprobe(cls, context, formatData, streamData):
kwargs = {}
kwargs[MediaDescriptor.CONTEXT_KEY] = context
if MediaDescriptor.FFPROBE_TAGS_KEY in formatData.keys():
kwargs[MediaDescriptor.TAGS_KEY] = formatData[
MediaDescriptor.FFPROBE_TAGS_KEY
]
kwargs[MediaDescriptor.TRACK_DESCRIPTOR_LIST_KEY] = []
# TODO: Evtl obsolet
subIndexCounters = {}
for streamObj in streamData:
ffprobeCodecType = streamObj[MediaDescriptor.FFPROBE_CODEC_TYPE_KEY]
trackType = TrackType.fromLabel(ffprobeCodecType)
if trackType != TrackType.UNKNOWN:
if trackType not in subIndexCounters.keys():
subIndexCounters[trackType] = 0
kwargs[MediaDescriptor.TRACK_DESCRIPTOR_LIST_KEY].append(
TrackDescriptor.fromFfprobe(
streamObj, subIndex=subIndexCounters[trackType]
)
)
subIndexCounters[trackType] += 1
return cls(**kwargs)
def getTags(self):
return self.__mediaTags
def sortSubIndices(
self, descriptors: List[TrackDescriptor]
) -> List[TrackDescriptor]:
subIndex = 0
for d in descriptors:
d.setSubIndex(subIndex)
subIndex += 1
return descriptors
def reindexSubIndices(self, trackDescriptors: list = []):
tdList = trackDescriptors if trackDescriptors else self.__trackDescriptors
subIndexCounter = {}
for td in tdList:
trackType = td.getType()
if trackType not in subIndexCounter.keys():
subIndexCounter[trackType] = 0
td.setSubIndex(subIndexCounter[trackType])
subIndexCounter[trackType] += 1
def sortIndices(
self, descriptors: List[TrackDescriptor]
) -> List[TrackDescriptor]:
index = 0
for d in descriptors:
d.setIndex(index)
index += 1
return descriptors
def reindexIndices(self, trackDescriptors: list = []):
tdList = trackDescriptors if trackDescriptors else self.__trackDescriptors
for trackIndex in range(len(tdList)):
tdList[trackIndex].setIndex(trackIndex)
def getAllTrackDescriptors(self):
"""Returns all track descriptors sorted by type: video, audio then subtitles"""
return self.getVideoTracks() + self.getAudioTracks() + self.getSubtitleTracks()
def getTrackDescriptors(self,
trackType: TrackType = None) -> List[TrackDescriptor]:
if trackType is None:
return self.__trackDescriptors
descriptorList = []
for td in self.__trackDescriptors:
if td.getType() == trackType:
descriptorList.append(td)
return descriptorList
def getVideoTracks(self) -> List[TrackDescriptor]:
return [v for v in self.__trackDescriptors if v.getType() == TrackType.VIDEO]
def getAudioTracks(self) -> List[TrackDescriptor]:
return [a for a in self.__trackDescriptors if a.getType() == TrackType.AUDIO]
def getSubtitleTracks(self) -> List[TrackDescriptor]:
return [
s
for s in self.__trackDescriptors
if s.getType() == TrackType.SUBTITLE
]
def compare(self, vsMediaDescriptor: Self):
if not isinstance(vsMediaDescriptor, self.__class__):
self.__logger.error(f"MediaDescriptor.compare(): Argument is required to be of type {self.__class__}")
raise click.Abort()
vsTags = vsMediaDescriptor.getTags()
tags = self.getTags()
# HINT: Some tags differ per file, for example creation_time, so these are removed before diff
for emt in MediaDescriptor.EXCLUDED_MEDIA_TAGS:
if emt in tags.keys():
del tags[emt]
if emt in vsTags.keys():
del vsTags[emt]
tagsDiff = dictDiff(vsTags, tags)
compareResult = {}
if tagsDiff:
compareResult[MediaDescriptor.TAGS_KEY] = tagsDiff
# Target track configuration (from DB)
# tracks = self.getAllTrackDescriptors()
tracks = self.getAllTrackDescriptors() # filtern
numTracks = len(tracks)
# Current track configuration (of file)
vsTracks = vsMediaDescriptor.getAllTrackDescriptors()
numVsTracks = len(vsTracks)
maxNumOfTracks = max(numVsTracks, numTracks)
trackCompareResult = {}
for tp in range(maxNumOfTracks):
# inspect/update funktionier nur so
if self.__jellyfinOrder:
vsTrackIndex = tracks[tp].getSourceIndex()
else:
vsTrackIndex = tp
# vsTrackIndex = tracks[tp].getSourceIndex()
# Will trigger if tracks are missing in file
if tp > (numVsTracks - 1):
if DIFF_ADDED_KEY not in trackCompareResult.keys():
trackCompareResult[DIFF_ADDED_KEY] = set()
trackCompareResult[DIFF_ADDED_KEY].add(tracks[tp].getIndex())
continue
# Will trigger if tracks are missing in DB definition
# New tracks will be added per update via this way
if tp > (numTracks - 1):
if DIFF_REMOVED_KEY not in trackCompareResult.keys():
trackCompareResult[DIFF_REMOVED_KEY] = {}
trackCompareResult[DIFF_REMOVED_KEY][
vsTracks[vsTrackIndex].getIndex()
] = vsTracks[vsTrackIndex]
continue
# assumption is made here that the track order will not change for all files of a sequence
trackDiff = tracks[tp].compare(vsTracks[vsTrackIndex])
if trackDiff:
if DIFF_CHANGED_KEY not in trackCompareResult.keys():
trackCompareResult[DIFF_CHANGED_KEY] = {}
trackCompareResult[DIFF_CHANGED_KEY][
vsTracks[vsTrackIndex].getIndex()
] = trackDiff
if trackCompareResult:
compareResult[MediaDescriptor.TRACKS_KEY] = trackCompareResult
return compareResult
def getImportFileTokens(self, use_sub_index: bool = True):
importFileTokens = []
for td in self.__trackDescriptors:
importedFilePath = td.getExternalSourceFilePath()
if importedFilePath:
importFileTokens += [
"-i",
importedFilePath,
]
return importFileTokens
def getInputMappingTokens(self, use_sub_index: bool = True, only_video: bool = False):
"""Tracks must be reordered for source index order"""
inputMappingTokens = []
filePointer = 1
for trackIndex in range(len(self.__trackDescriptors)):
td = self.__trackDescriptors[trackIndex]
stdi = self.__trackDescriptors[td.getSourceIndex()].getIndex()
stdsi = self.__trackDescriptors[td.getSourceIndex()].getSubIndex()
# sti = self.__trackDescriptors[trackIndex].getSourceIndex()
# sotd = sourceOrderTrackDescriptors[sti]
# appearently this negates applyJellyfinOrder
#for rtd in sorted(self.__trackDescriptors.copy(), key=lambda d: d.getSourceIndex()):
trackType = td.getType()
if (trackType == TrackType.VIDEO or not only_video):
importedFilePath = td.getExternalSourceFilePath()
if use_sub_index:
if importedFilePath:
inputMappingTokens += [
"-map",
f"{filePointer}:{trackType.indicator()}:0",
]
filePointer += 1
else:
if td.getCodec() != TrackDescriptor.CODEC_PGS:
inputMappingTokens += [
"-map",
f"0:{trackType.indicator()}:{stdsi}",
]
else:
if td.getCodec() != TrackDescriptor.CODEC_PGS:
inputMappingTokens += ["-map", f"0:{stdi}"]
return inputMappingTokens
def searchSubtitleFiles(self, searchDirectory, prefix):
sesl_match = re.compile(MediaDescriptor.SEASON_EPISODE_STREAM_LANGUAGE_MATCH)
subtitleFileDescriptors = []
for subtitleFilename in os.listdir(searchDirectory):
if subtitleFilename.startswith(prefix) and subtitleFilename.endswith(
"." + MediaDescriptor.SUBTITLE_FILE_EXTENSION
):
sesl_result = sesl_match.search(subtitleFilename)
if sesl_result is not None:
subtitleFilePath = os.path.join(searchDirectory, subtitleFilename)
if os.path.isfile(subtitleFilePath):
subtitleFileDescriptor = {}
subtitleFileDescriptor["path"] = subtitleFilePath
subtitleFileDescriptor["season"] = int(sesl_result.group(1))
subtitleFileDescriptor["episode"] = int(sesl_result.group(2))
subtitleFileDescriptor["index"] = int(sesl_result.group(3))
subtitleFileDescriptor["language"] = sesl_result.group(4)
subtitleFileDescriptors.append(subtitleFileDescriptor)
self.__logger.debug(f"searchSubtitleFiles(): Available subtitle files {subtitleFileDescriptors}")
return subtitleFileDescriptors
def importSubtitles(self, searchDirectory, prefix, season: int = -1, episode: int = -1):
# click.echo(f"Season: {season} Episode: {episode}")
self.__logger.debug(f"importSubtitles(): Season: {season} Episode: {episode}")
availableFileSubtitleDescriptors = self.searchSubtitleFiles(searchDirectory, prefix)
self.__logger.debug(f"importSubtitles(): availableFileSubtitleDescriptors: {availableFileSubtitleDescriptors}")
subtitleTracks = self.getSubtitleTracks()
self.__logger.debug(f"importSubtitles(): subtitleTracks: {[s.getIndex() for s in subtitleTracks]}")
matchingSubtitleFileDescriptors = (
sorted(
[
d
for d in availableFileSubtitleDescriptors
if d["season"] == int(season) and d["episode"] == int(episode)
],
key=lambda d: d["index"],
)
if availableFileSubtitleDescriptors
else []
)
self.__logger.debug(f"importSubtitles(): matchingSubtitleFileDescriptors: {matchingSubtitleFileDescriptors}")
for msfd in matchingSubtitleFileDescriptors:
matchingSubtitleTrackDescriptor = [s for s in subtitleTracks if s.getIndex() == msfd["index"]]
if matchingSubtitleTrackDescriptor:
# click.echo(f"Found matching subtitle file {msfd["path"]}\n")
self.__logger.debug(f"importSubtitles(): Found matching subtitle file {msfd['path']}")
matchingSubtitleTrackDescriptor[0].setExternalSourceFilePath(msfd["path"])
def getConfiguration(self, label: str = ''):
yield f"--- {label if label else 'MediaDescriptor '+str(id(self))} {' '.join([str(k)+'='+str(v) for k,v in self.__mediaTags.items()])}"
for td in self.getAllTrackDescriptors():
yield (f"{td.getIndex()}:{td.getType().indicator()}:{td.getSubIndex()} "
+ '|'.join([d.indicator() for d in td.getDispositionSet()])
+ ' ' + ' '.join([str(k)+'='+str(v) for k,v in td.getTags().items()]))