You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
ffx/bin/ffx/media_descriptor.py

480 lines
17 KiB
Python

import os
import re
import click
from typing import List, Self
from ffx.track_type import TrackType
from ffx.track_disposition import TrackDisposition
from ffx.track_descriptor import TrackDescriptor
from ffx.helper import dictDiff, DIFF_ADDED_KEY, DIFF_CHANGED_KEY, DIFF_REMOVED_KEY
class MediaDescriptor:
"""This class represents the structural content of a media file including streams and metadata"""
CONTEXT_KEY = "context"
TAGS_KEY = "tags"
TRACKS_KEY = "tracks"
TRACK_DESCRIPTOR_LIST_KEY = "track_descriptors"
CLEAR_TAGS_FLAG_KEY = "clear_tags"
FFPROBE_DISPOSITION_KEY = "disposition"
FFPROBE_TAGS_KEY = "tags"
FFPROBE_CODEC_TYPE_KEY = "codec_type"
JELLYFIN_ORDER_FLAG_KEY = "jellyfin_order"
EXCLUDED_MEDIA_TAGS = ["creation_time"]
SEASON_EPISODE_STREAM_LANGUAGE_MATCH = '[sS]([0-9]+)[eE]([0-9]+)_([0-9]+)_([a-z]{3})'
SUBTITLE_FILE_EXTENSION = 'vtt'
def __init__(self, **kwargs):
if MediaDescriptor.TAGS_KEY in kwargs.keys():
if type(kwargs[MediaDescriptor.TAGS_KEY]) is not dict:
raise TypeError(
f"MediaDescriptor.__init__(): Argument {MediaDescriptor.TAGS_KEY} is required to be of type dict"
)
self.__mediaTags = kwargs[MediaDescriptor.TAGS_KEY]
else:
self.__mediaTags = {}
if MediaDescriptor.TRACK_DESCRIPTOR_LIST_KEY in kwargs.keys():
if (
type(kwargs[MediaDescriptor.TRACK_DESCRIPTOR_LIST_KEY]) is not list
): # Use List typehint for TrackDescriptor as well if it works
raise TypeError(
f"MediaDescriptor.__init__(): Argument {MediaDescriptor.TRACK_DESCRIPTOR_LIST_KEY} is required to be of type list"
)
for d in kwargs[MediaDescriptor.TRACK_DESCRIPTOR_LIST_KEY]:
if type(d) is not TrackDescriptor:
raise TypeError(
f"TrackDesciptor.__init__(): All elements of argument list {MediaDescriptor.TRACK_DESCRIPTOR_LIST_KEY} are required to be of type TrackDescriptor"
)
self.__trackDescriptors = kwargs[MediaDescriptor.TRACK_DESCRIPTOR_LIST_KEY]
else:
self.__trackDescriptors = []
if MediaDescriptor.CLEAR_TAGS_FLAG_KEY in kwargs.keys():
if type(kwargs[MediaDescriptor.CLEAR_TAGS_FLAG_KEY]) is not bool:
raise TypeError(
f"MediaDescriptor.__init__(): Argument {MediaDescriptor.CLEAR_TAGS_FLAG_KEY} is required to be of type bool"
)
self.__clearTags = kwargs[MediaDescriptor.CLEAR_TAGS_FLAG_KEY]
else:
self.__clearTags = False
if MediaDescriptor.JELLYFIN_ORDER_FLAG_KEY in kwargs.keys():
if type(kwargs[MediaDescriptor.JELLYFIN_ORDER_FLAG_KEY]) is not bool:
raise TypeError(
f"MediaDescriptor.__init__(): Argument {MediaDescriptor.JELLYFIN_ORDER_FLAG_KEY} is required to be of type bool"
)
self.__jellyfinOrder = kwargs[MediaDescriptor.JELLYFIN_ORDER_FLAG_KEY]
else:
self.__jellyfinOrder = False
def getDefaultVideoTrack(self):
videoDefaultTracks = [
v
for v in self.getVideoTracks()
if TrackDisposition.DEFAULT in v.getDispositionSet()
]
if len(videoDefaultTracks) > 1:
raise ValueError(
"MediaDescriptor.getDefaultVideoTrack(): More than one default video track is not supported"
)
return videoDefaultTracks[0] if videoDefaultTracks else None
def getForcedVideoTrack(self):
videoForcedTracks = [
v
for v in self.getVideoTracks()
if TrackDisposition.FORCED in v.getDispositionSet()
]
if len(videoForcedTracks) > 1:
raise ValueError(
"MediaDescriptor.getForcedVideoTrack(): More than one forced video track is not supported"
)
return videoForcedTracks[0] if videoForcedTracks else None
def getDefaultAudioTrack(self):
audioDefaultTracks = [
a
for a in self.getAudioTracks()
if TrackDisposition.DEFAULT in a.getDispositionSet()
]
if len(audioDefaultTracks) > 1:
raise ValueError(
"MediaDescriptor.getDefaultAudioTrack(): More than one default audio track is not supported"
)
return audioDefaultTracks[0] if audioDefaultTracks else None
def getForcedAudioTrack(self):
audioForcedTracks = [
a
for a in self.getAudioTracks()
if TrackDisposition.FORCED in a.getDispositionSet()
]
if len(audioForcedTracks) > 1:
raise ValueError(
"MediaDescriptor.getForcedAudioTrack(): More than one forced audio track is not supported"
)
return audioForcedTracks[0] if audioForcedTracks else None
def getDefaultSubtitleTrack(self):
subtitleDefaultTracks = [
s
for s in self.getSubtitleTracks()
if TrackDisposition.DEFAULT in s.getDispositionSet()
]
if len(subtitleDefaultTracks) > 1:
raise ValueError(
"MediaDescriptor.getDefaultSubtitleTrack(): More than one default subtitle track is not supported"
)
return subtitleDefaultTracks[0] if subtitleDefaultTracks else None
def getForcedSubtitleTrack(self):
subtitleForcedTracks = [
s
for s in self.getSubtitleTracks()
if TrackDisposition.FORCED in s.getDispositionSet()
]
if len(subtitleForcedTracks) > 1:
raise ValueError(
"MediaDescriptor.getForcedSubtitleTrack(): More than one forced subtitle track is not supported"
)
return subtitleForcedTracks[0] if subtitleForcedTracks else None
def setDefaultSubTrack(self, trackType: TrackType, subIndex: int):
for t in self.getAllTrackDescriptors():
if t.getType() == trackType:
t.setDispositionFlag(
TrackDisposition.DEFAULT, t.getSubIndex() == int(subIndex)
)
def setForcedSubTrack(self, trackType: TrackType, subIndex: int):
for t in self.getAllTrackDescriptors():
if t.getType() == trackType:
t.setDispositionFlag(
TrackDisposition.FORCED, t.getSubIndex() == int(subIndex)
)
def checkDefaultAndForcedDispositions(self):
try:
self.getDefaultVideoTrack()
self.getForcedVideoTrack()
self.getDefaultAudioTrack()
self.getForcedAudioTrack()
self.getDefaultSubtitleTrack()
self.getForcedSubtitleTrack()
return True
except ValueError:
return False
def getReorderedTrackDescriptors(self):
videoTracks = self.sortSubIndices(self.getVideoTracks())
audioTracks = self.sortSubIndices(self.getAudioTracks())
subtitleTracks = self.sortSubIndices(self.getSubtitleTracks())
videoDefaultTrack = self.getDefaultVideoTrack()
self.getForcedVideoTrack()
audioDefaultTrack = self.getDefaultAudioTrack()
self.getForcedAudioTrack()
subtitleDefaultTrack = self.getDefaultSubtitleTrack()
self.getForcedSubtitleTrack()
if self.__jellyfinOrder:
if not videoDefaultTrack is None:
videoTracks.append(
videoTracks.pop(videoTracks.index(videoDefaultTrack))
)
if not audioDefaultTrack is None:
audioTracks.append(
audioTracks.pop(audioTracks.index(audioDefaultTrack))
)
if not subtitleDefaultTrack is None:
subtitleTracks.append(
subtitleTracks.pop(subtitleTracks.index(subtitleDefaultTrack))
)
reorderedTrackDescriptors = videoTracks + audioTracks + subtitleTracks
orderedSourceTrackSequence = [
t.getSourceIndex() for t in reorderedTrackDescriptors
]
if len(set(orderedSourceTrackSequence)) < len(orderedSourceTrackSequence):
raise ValueError(
f"Multiple streams originating from the same source stream not supported"
)
return reorderedTrackDescriptors
@classmethod
def fromFfprobe(cls, formatData, streamData):
kwargs = {}
if MediaDescriptor.FFPROBE_TAGS_KEY in formatData.keys():
kwargs[MediaDescriptor.TAGS_KEY] = formatData[
MediaDescriptor.FFPROBE_TAGS_KEY
]
kwargs[MediaDescriptor.TRACK_DESCRIPTOR_LIST_KEY] = []
# TODO: Evtl obsolet
subIndexCounters = {}
for streamObj in streamData:
ffprobeCodecType = streamObj[MediaDescriptor.FFPROBE_CODEC_TYPE_KEY]
trackType = TrackType.fromLabel(ffprobeCodecType)
if trackType != TrackType.UNKNOWN:
if trackType not in subIndexCounters.keys():
subIndexCounters[trackType] = 0
kwargs[MediaDescriptor.TRACK_DESCRIPTOR_LIST_KEY].append(
TrackDescriptor.fromFfprobe(
streamObj, subIndex=subIndexCounters[trackType]
)
)
subIndexCounters[trackType] += 1
return cls(**kwargs)
def getTags(self):
return self.__mediaTags
def sortSubIndices(
self, descriptors: List[TrackDescriptor]
) -> List[TrackDescriptor]:
subIndex = 0
for t in descriptors:
t.setSubIndex(subIndex)
subIndex += 1
return descriptors
def getAllTrackDescriptors(self) -> List[TrackDescriptor]:
return self.getVideoTracks() + self.getAudioTracks() + self.getSubtitleTracks()
def getVideoTracks(self) -> List[TrackDescriptor]:
return [
v for v in self.__trackDescriptors.copy() if v.getType() == TrackType.VIDEO
]
def getAudioTracks(self) -> List[TrackDescriptor]:
return [
a for a in self.__trackDescriptors.copy() if a.getType() == TrackType.AUDIO
]
def getSubtitleTracks(self) -> List[TrackDescriptor]:
return [
s
for s in self.__trackDescriptors.copy()
if s.getType() == TrackType.SUBTITLE
]
def getJellyfin(self):
return self.__jellyfinOrder
def setJellyfinOrder(self, state):
self.__jellyfinOrder = state
def getClearTags(self):
return self.__clearTags
def compare(self, vsMediaDescriptor: Self):
if not isinstance(vsMediaDescriptor, self.__class__):
raise click.ClickException(
f"MediaDescriptor.compare(): Argument is required to be of type {self.__class__}"
)
vsTags = vsMediaDescriptor.getTags()
tags = self.getTags()
# HINT: Some tags differ per file, for example creation_time, so these are removed before diff
for emt in MediaDescriptor.EXCLUDED_MEDIA_TAGS:
if emt in tags.keys():
del tags[emt]
if emt in vsTags.keys():
del vsTags[emt]
tagsDiff = dictDiff(vsTags, tags)
compareResult = {}
if tagsDiff:
compareResult[MediaDescriptor.TAGS_KEY] = tagsDiff
# Target track configuration (from DB)
# tracks = self.getAllTrackDescriptors()
tracks = self.getReorderedTrackDescriptors()
numTracks = len(tracks)
# Current track configuration (of file)
vsTracks = vsMediaDescriptor.getAllTrackDescriptors()
numVsTracks = len(vsTracks)
maxNumOfTracks = max(numVsTracks, numTracks)
trackCompareResult = {}
for tp in range(maxNumOfTracks):
# inspect/update funktionier nur so
if self.__jellyfinOrder:
vsTrackIndex = tracks[tp].getSourceIndex()
else:
vsTrackIndex = tp
# vsTrackIndex = tracks[tp].getSourceIndex()
# Will trigger if tracks are missing in file
if tp > (numVsTracks - 1):
if DIFF_ADDED_KEY not in trackCompareResult.keys():
trackCompareResult[DIFF_ADDED_KEY] = set()
trackCompareResult[DIFF_ADDED_KEY].add(tracks[tp].getIndex())
continue
# Will trigger if tracks are missing in DB definition
# New tracks will be added per update via this way
if tp > (numTracks - 1):
if DIFF_REMOVED_KEY not in trackCompareResult.keys():
trackCompareResult[DIFF_REMOVED_KEY] = {}
trackCompareResult[DIFF_REMOVED_KEY][
vsTracks[vsTrackIndex].getIndex()
] = vsTracks[vsTrackIndex]
continue
# assumption is made here that the track order will not change for all files of a sequence
trackDiff = tracks[tp].compare(vsTracks[vsTrackIndex])
if trackDiff:
if DIFF_CHANGED_KEY not in trackCompareResult.keys():
trackCompareResult[DIFF_CHANGED_KEY] = {}
trackCompareResult[DIFF_CHANGED_KEY][
vsTracks[vsTrackIndex].getIndex()
] = trackDiff
if trackCompareResult:
compareResult[MediaDescriptor.TRACKS_KEY] = trackCompareResult
return compareResult
def getImportFileTokens(self, use_sub_index: bool = True):
reorderedTrackDescriptors = self.getReorderedTrackDescriptors()
importFileTokens = []
for rtd in reorderedTrackDescriptors:
importedFilePath = rtd.getExternalSourceFilePath()
if not importedFilePath is None:
importFileTokens += [
"-i",
importedFilePath,
]
return importFileTokens
def getInputMappingTokens(self, use_sub_index: bool = True):
reorderedTrackDescriptors = self.getReorderedTrackDescriptors()
inputMappingTokens = []
filePointer = 1
for rtd in reorderedTrackDescriptors:
importedFilePath = rtd.getExternalSourceFilePath()
trackType = rtd.getType()
if use_sub_index:
if importedFilePath is None:
inputMappingTokens += [
"-map",
f"0:{trackType.indicator()}:{rtd.getSubIndex()}",
]
else:
inputMappingTokens += [
"-map",
f"{filePointer}:{trackType.indicator()}:0",
]
filePointer += 1
else:
inputMappingTokens += ["-map", f"0:{rtd.getIndex()}"]
return inputMappingTokens
def searchSubtitleFiles(searchDirectory, prefix):
sesl_match = re.compile(MediaDescriptor.SEASON_EPISODE_STREAM_LANGUAGE_MATCH)
availableFileSubtitleDescriptors = []
for subtitleFilename in os.listdir(searchDirectory):
if subtitleFilename.startswith(prefix) and subtitleFilename.endswith(
"." + MediaDescriptor.SUBTITLE_FILE_EXTENSION
):
sesl_result = sesl_match.search(subtitleFilename)
if sesl_result is not None:
subtitleFilePath = os.path.join(searchDirectory, subtitleFilename)
if os.path.isfile(subtitleFilePath):
subtitleFileDescriptor = {}
subtitleFileDescriptor["path"] = subtitleFilePath
subtitleFileDescriptor["season"] = int(sesl_result.group(1))
subtitleFileDescriptor["episode"] = int(sesl_result.group(2))
subtitleFileDescriptor["stream"] = int(sesl_result.group(3))
subtitleFileDescriptor["language"] = sesl_result.group(4)
availableFileSubtitleDescriptors.append(subtitleFileDescriptor)
click.echo(
f"Found {len(availableFileSubtitleDescriptors)} subtitles in files\n"
)
return availableFileSubtitleDescriptors
def importSubtitles(
self, searchDirectory, prefix, season: int = -1, episode: int = -1
):
availableFileSubtitleDescriptors = self.searchSubtitleFiles(
searchDirectory, prefix
)
subtitleTracks = self.getSubtitleTracks()
# if len(availableFileSubtitleDescriptors) != len(subtitleTracks):
# raise click.ClickException(f"MediaDescriptor.importSubtitles(): Number if subtitle files not matching number of subtitle tracks")
matchingFileSubtitleDescriptors = (
sorted(
[
d
for d in availableFileSubtitleDescriptors
if d["season"] == int(season) and d["episode"] == int(episode)
],
key=lambda d: d["stream"],
)
if availableFileSubtitleDescriptors
else []
)
for mfsd in matchingFileSubtitleDescriptors:
matchingSubtitleTrackDescriptor = [
s for s in subtitleTracks if s.getIndex() == mfsd["stream"]
]
if matchingSubtitleTrackDescriptor:
matchingSubtitleTrackDescriptor[0].setExternalSourceFilePath(
mfsd["path"]
)