@ -1,118 +1,174 @@
import os , re , click
import os
import re
import click
from typing import List , Self
from ffx . track_type import TrackType
from ffx . track_disposition import TrackDisposition
from ffx . file_properties import FileProperties
from ffx . track_descriptor import TrackDescriptor
from ffx . helper import dictDiff , DIFF_ADDED_KEY , DIFF_CHANGED_KEY , DIFF_REMOVED_KEY
class MediaDescriptor () :
class MediaDescriptor :
""" This class represents the structural content of a media file including streams and metadata """
CONTEXT_KEY = ' context '
CONTEXT_KEY = " context "
TAGS_KEY = ' tags '
TRACKS_KEY = ' tracks '
TAGS_KEY = " tags "
TRACKS_KEY = " tracks "
TRACK_DESCRIPTOR_LIST_KEY = ' track_descriptors '
CLEAR_TAGS_FLAG_KEY = ' clear_tags '
TRACK_DESCRIPTOR_LIST_KEY = " track_descriptors "
CLEAR_TAGS_FLAG_KEY = " clear_tags "
FFPROBE_DISPOSITION_KEY = ' disposition '
FFPROBE_TAGS_KEY = ' tags '
FFPROBE_CODEC_TYPE_KEY = ' codec_type '
FFPROBE_DISPOSITION_KEY = " disposition "
FFPROBE_TAGS_KEY = " tags "
FFPROBE_CODEC_TYPE_KEY = " codec_type "
JELLYFIN_ORDER_FLAG_KEY = ' jellyfin_order '
JELLYFIN_ORDER_FLAG_KEY = " jellyfin_order "
EXCLUDED_MEDIA_TAGS = [
' creation_time '
]
EXCLUDED_MEDIA_TAGS = [ " creation_time " ]
SEASON_EPISODE_STREAM_LANGUAGE_MATCH = ' [sS]([0-9]+)[eE]([0-9]+)_([0-9]+)_([a-z] {3} ) '
SUBTITLE_FILE_EXTENSION = ' vtt '
def __init__ ( self , * * kwargs ) :
if MediaDescriptor . TAGS_KEY in kwargs . keys ( ) :
if type ( kwargs [ MediaDescriptor . TAGS_KEY ] ) is not dict :
raise TypeError ( f " MediaDescriptor.__init__(): Argument { MediaDescriptor . TAGS_KEY } is required to be of type dict " )
raise TypeError (
f " MediaDescriptor.__init__(): Argument {
MediaDescriptor . TAGS_KEY } is required to be of type dict "
)
self . __mediaTags = kwargs [ MediaDescriptor . TAGS_KEY ]
else :
self . __mediaTags = { }
if MediaDescriptor . TRACK_DESCRIPTOR_LIST_KEY in kwargs . keys ( ) :
if type ( kwargs [ MediaDescriptor . TRACK_DESCRIPTOR_LIST_KEY ] ) is not list : # Use List typehint for TrackDescriptor as well if it works
raise TypeError ( f " MediaDescriptor.__init__(): Argument { MediaDescriptor . TRACK_DESCRIPTOR_LIST_KEY } is required to be of type list " )
if (
type ( kwargs [ MediaDescriptor . TRACK_DESCRIPTOR_LIST_KEY ] ) is not list
) : # Use List typehint for TrackDescriptor as well if it works
raise TypeError (
f " MediaDescriptor.__init__(): Argument {
MediaDescriptor . TRACK_DESCRIPTOR_LIST_KEY } is required to be of type list "
)
for d in kwargs [ MediaDescriptor . TRACK_DESCRIPTOR_LIST_KEY ] :
if type ( d ) is not TrackDescriptor :
raise TypeError ( f " TrackDesciptor.__init__(): All elements of argument list { MediaDescriptor . TRACK_DESCRIPTOR_LIST_KEY } are required to be of type TrackDescriptor " )
raise TypeError (
f " TrackDesciptor.__init__(): All elements of argument list {
MediaDescriptor . TRACK_DESCRIPTOR_LIST_KEY } are required to be of type TrackDescriptor "
)
self . __trackDescriptors = kwargs [ MediaDescriptor . TRACK_DESCRIPTOR_LIST_KEY ]
else :
self . __trackDescriptors = [ ]
if MediaDescriptor . CLEAR_TAGS_FLAG_KEY in kwargs . keys ( ) :
if type ( kwargs [ MediaDescriptor . CLEAR_TAGS_FLAG_KEY ] ) is not bool :
raise TypeError ( f " MediaDescriptor.__init__(): Argument { MediaDescriptor . CLEAR_TAGS_FLAG_KEY } is required to be of type bool " )
raise TypeError (
f " MediaDescriptor.__init__(): Argument {
MediaDescriptor . CLEAR_TAGS_FLAG_KEY } is required to be of type bool "
)
self . __clearTags = kwargs [ MediaDescriptor . CLEAR_TAGS_FLAG_KEY ]
else :
self . __clearTags = False
if MediaDescriptor . JELLYFIN_ORDER_FLAG_KEY in kwargs . keys ( ) :
if type ( kwargs [ MediaDescriptor . JELLYFIN_ORDER_FLAG_KEY ] ) is not bool :
raise TypeError ( f " MediaDescriptor.__init__(): Argument { MediaDescriptor . JELLYFIN_ORDER_FLAG_KEY } is required to be of type bool " )
raise TypeError (
f " MediaDescriptor.__init__(): Argument {
MediaDescriptor . JELLYFIN_ORDER_FLAG_KEY } is required to be of type bool "
)
self . __jellyfinOrder = kwargs [ MediaDescriptor . JELLYFIN_ORDER_FLAG_KEY ]
else :
self . __jellyfinOrder = False
def getDefaultVideoTrack ( self ) :
videoDefaultTracks = [ v for v in self . getVideoTracks ( ) if TrackDisposition . DEFAULT in v . getDispositionSet ( ) ]
videoDefaultTracks = [
v
for v in self . getVideoTracks ( )
if TrackDisposition . DEFAULT in v . getDispositionSet ( )
]
if len ( videoDefaultTracks ) > 1 :
raise ValueError ( ' MediaDescriptor.getDefaultVideoTrack(): More than one default video track is not supported ' )
raise ValueError (
" MediaDescriptor.getDefaultVideoTrack(): More than one default video track is not supported "
)
return videoDefaultTracks [ 0 ] if videoDefaultTracks else None
def getForcedVideoTrack ( self ) :
videoForcedTracks = [ v for v in self . getVideoTracks ( ) if TrackDisposition . FORCED in v . getDispositionSet ( ) ]
videoForcedTracks = [
v
for v in self . getVideoTracks ( )
if TrackDisposition . FORCED in v . getDispositionSet ( )
]
if len ( videoForcedTracks ) > 1 :
raise ValueError ( ' MediaDescriptor.getForcedVideoTrack(): More than one forced video track is not supported ' )
raise ValueError (
" MediaDescriptor.getForcedVideoTrack(): More than one forced video track is not supported "
)
return videoForcedTracks [ 0 ] if videoForcedTracks else None
def getDefaultAudioTrack ( self ) :
audioDefaultTracks = [ a for a in self . getAudioTracks ( ) if TrackDisposition . DEFAULT in a . getDispositionSet ( ) ]
audioDefaultTracks = [
a
for a in self . getAudioTracks ( )
if TrackDisposition . DEFAULT in a . getDispositionSet ( )
]
if len ( audioDefaultTracks ) > 1 :
raise ValueError ( ' MediaDescriptor.getDefaultAudioTrack(): More than one default audio track is not supported ' )
raise ValueError (
" MediaDescriptor.getDefaultAudioTrack(): More than one default audio track is not supported "
)
return audioDefaultTracks [ 0 ] if audioDefaultTracks else None
def getForcedAudioTrack ( self ) :
audioForcedTracks = [ a for a in self . getAudioTracks ( ) if TrackDisposition . FORCED in a . getDispositionSet ( ) ]
audioForcedTracks = [
a
for a in self . getAudioTracks ( )
if TrackDisposition . FORCED in a . getDispositionSet ( )
]
if len ( audioForcedTracks ) > 1 :
raise ValueError ( ' MediaDescriptor.getForcedAudioTrack(): More than one forced audio track is not supported ' )
raise ValueError (
" MediaDescriptor.getForcedAudioTrack(): More than one forced audio track is not supported "
)
return audioForcedTracks [ 0 ] if audioForcedTracks else None
def getDefaultSubtitleTrack ( self ) :
subtitleDefaultTracks = [ s for s in self . getSubtitleTracks ( ) if TrackDisposition . DEFAULT in s . getDispositionSet ( ) ]
subtitleDefaultTracks = [
s
for s in self . getSubtitleTracks ( )
if TrackDisposition . DEFAULT in s . getDispositionSet ( )
]
if len ( subtitleDefaultTracks ) > 1 :
raise ValueError ( ' MediaDescriptor.getDefaultSubtitleTrack(): More than one default subtitle track is not supported ' )
raise ValueError (
" MediaDescriptor.getDefaultSubtitleTrack(): More than one default subtitle track is not supported "
)
return subtitleDefaultTracks [ 0 ] if subtitleDefaultTracks else None
def getForcedSubtitleTrack ( self ) :
subtitleForcedTracks = [ s for s in self . getSubtitleTracks ( ) if TrackDisposition . FORCED in s . getDispositionSet ( ) ]
subtitleForcedTracks = [
s
for s in self . getSubtitleTracks ( )
if TrackDisposition . FORCED in s . getDispositionSet ( )
]
if len ( subtitleForcedTracks ) > 1 :
raise ValueError ( ' MediaDescriptor.getForcedSubtitleTrack(): More than one forced subtitle track is not supported ' )
raise ValueError (
" MediaDescriptor.getForcedSubtitleTrack(): More than one forced subtitle track is not supported "
)
return subtitleForcedTracks [ 0 ] if subtitleForcedTracks else None
def setDefaultSubTrack ( self , trackType : TrackType , subIndex : int ) :
def setDefaultSubTrack ( self , trackType : TrackType , subIndex : int ) :
for t in self . getAllTrackDescriptors ( ) :
if t . getType ( ) == trackType :
t . setDispositionFlag ( TrackDisposition . DEFAULT , t . getSubIndex ( ) == int ( subIndex ) )
t . setDispositionFlag (
TrackDisposition . DEFAULT , t . getSubIndex ( ) == int ( subIndex )
)
def setForcedSubTrack ( self , trackType : TrackType , subIndex : int ) :
def setForcedSubTrack ( self , trackType : TrackType , subIndex : int ) :
for t in self . getAllTrackDescriptors ( ) :
if t . getType ( ) == trackType :
t . setDispositionFlag ( TrackDisposition . FORCED , t . getSubIndex ( ) == int ( subIndex ) )
t . setDispositionFlag (
TrackDisposition . FORCED , t . getSubIndex ( ) == int ( subIndex )
)
def getReorderedTrackDescriptors ( self ) :
@ -129,32 +185,43 @@ class MediaDescriptor():
if self . __jellyfinOrder :
if not videoDefaultTrack is None :
videoTracks . append ( videoTracks . pop ( videoTracks . index ( videoDefaultTrack ) ) )
videoTracks . append (
videoTracks . pop ( videoTracks . index ( videoDefaultTrack ) )
)
if not audioDefaultTrack is None :
audioTracks . append ( audioTracks . pop ( audioTracks . index ( audioDefaultTrack ) ) )
audioTracks . append (
audioTracks . pop ( audioTracks . index ( audioDefaultTrack ) )
)
if not subtitleDefaultTrack is None :
subtitleTracks . append ( subtitleTracks . pop ( subtitleTracks . index ( subtitleDefaultTrack ) ) )
subtitleTracks . append (
subtitleTracks . pop ( subtitleTracks . index ( subtitleDefaultTrack ) )
)
reorderedTrackDescriptors = videoTracks + audioTracks + subtitleTracks
orderedSourceTrackSequence = [ t . getSourceIndex ( ) for t in reorderedTrackDescriptors ]
orderedSourceTrackSequence = [
t . getSourceIndex ( ) for t in reorderedTrackDescriptors
]
if len ( set ( orderedSourceTrackSequence ) ) < len ( orderedSourceTrackSequence ) :
raise ValueError ( f " Multiple streams originating from the same source stream not supported " )
raise ValueError (
f " Multiple streams originating from the same source stream not supported "
)
return reorderedTrackDescriptors
@classmethod
def fromFfprobe ( cls , formatData , streamData ) :
kwargs = { }
if MediaDescriptor . FFPROBE_TAGS_KEY in formatData . keys ( ) :
kwargs [ MediaDescriptor . TAGS_KEY ] = formatData [ MediaDescriptor . FFPROBE_TAGS_KEY ]
kwargs [ MediaDescriptor . TAGS_KEY ] = formatData [
MediaDescriptor . FFPROBE_TAGS_KEY
]
kwargs [ MediaDescriptor . TRACK_DESCRIPTOR_LIST_KEY ] = [ ]
# TODO: Evtl obsolet
# TODO: Evtl obsolet
subIndexCounters = { }
for streamObj in streamData :
@ -167,38 +234,46 @@ class MediaDescriptor():
if trackType not in subIndexCounters . keys ( ) :
subIndexCounters [ trackType ] = 0
kwargs [ MediaDescriptor . TRACK_DESCRIPTOR_LIST_KEY ] . append ( TrackDescriptor . fromFfprobe ( streamObj ,
subIndex = subIndexCounters [ trackType ] ) )
kwargs [ MediaDescriptor . TRACK_DESCRIPTOR_LIST_KEY ] . append (
TrackDescriptor . fromFfprobe (
streamObj , subIndex = subIndexCounters [ trackType ]
)
)
subIndexCounters [ trackType ] + = 1
return cls ( * * kwargs )
def getTags ( self ) :
return self . __mediaTags
def sortSubIndices ( self , descriptors : List [ TrackDescriptor ] ) - > List [ TrackDescriptor ] :
def sortSubIndices (
self , descriptors : List [ TrackDescriptor ]
) - > List [ TrackDescriptor ] :
subIndex = 0
for t in descriptors :
t . setSubIndex ( subIndex )
subIndex + = 1
return descriptors
def getAllTrackDescriptors ( self ) - > List [ TrackDescriptor ] :
return self . getVideoTracks ( ) + self . getAudioTracks ( ) + self . getSubtitleTracks ( )
def getVideoTracks ( self ) - > List [ TrackDescriptor ] :
return [ v for v in self . __trackDescriptors . copy ( ) if v . getType ( ) == TrackType . VIDEO ]
return [
v for v in self . __trackDescriptors . copy ( ) if v . getType ( ) == TrackType . VIDEO
]
def getAudioTracks ( self ) - > List [ TrackDescriptor ] :
return [ a for a in self . __trackDescriptors . copy ( ) if a . getType ( ) == TrackType . AUDIO ]
return [
a for a in self . __trackDescriptors . copy ( ) if a . getType ( ) == TrackType . AUDIO
]
def getSubtitleTracks ( self ) - > List [ TrackDescriptor ] :
return [ s for s in self . __trackDescriptors . copy ( ) if s . getType ( ) == TrackType . SUBTITLE ]
return [
s
for s in self . __trackDescriptors . copy ( )
if s . getType ( ) == TrackType . SUBTITLE
]
def getJellyfin ( self ) :
return self . __jellyfinOrder
@ -209,17 +284,18 @@ class MediaDescriptor():
def getClearTags ( self ) :
return self . __clearTags
def compare ( self , vsMediaDescriptor : Self ) :
def compare ( self , vsMediaDescriptor : Self ) :
if not isinstance ( vsMediaDescriptor , self . __class__ ) :
raise click . ClickException ( f " MediaDescriptor.compare(): Argument is required to be of type { self . __class__ } " )
raise click . ClickException (
f " MediaDescriptor.compare(): Argument is required to be of type {
self . __class__ } "
)
vsTags = vsMediaDescriptor . getTags ( )
tags = self . getTags ( )
# HINT: Some tags differ per file, for example creation_time, so these are removed before diff
# HINT: Some tags differ per file, for example creation_time, so these are removed before diff
for emt in MediaDescriptor . EXCLUDED_MEDIA_TAGS :
if emt in tags . keys ( ) :
del tags [ emt ]
@ -233,9 +309,8 @@ class MediaDescriptor():
if tagsDiff :
compareResult [ MediaDescriptor . TAGS_KEY ] = tagsDiff
# Target track configuration (from DB)
# tracks = self.getAllTrackDescriptors()
# tracks = self.getAllTrackDescriptors()
tracks = self . getReorderedTrackDescriptors ( )
numTracks = len ( tracks )
@ -245,7 +320,6 @@ class MediaDescriptor():
maxNumOfTracks = max ( numVsTracks , numTracks )
trackCompareResult = { }
for tp in range ( maxNumOfTracks ) :
@ -269,7 +343,9 @@ class MediaDescriptor():
if tp > ( numTracks - 1 ) :
if DIFF_REMOVED_KEY not in trackCompareResult . keys ( ) :
trackCompareResult [ DIFF_REMOVED_KEY ] = { }
trackCompareResult [ DIFF_REMOVED_KEY ] [ vsTracks [ vsTrackIndex ] . getIndex ( ) ] = vsTracks [ vsTrackIndex ]
trackCompareResult [ DIFF_REMOVED_KEY ] [
vsTracks [ vsTrackIndex ] . getIndex ( )
] = vsTracks [ vsTrackIndex ]
continue
# assumption is made here that the track order will not change for all files of a sequence
@ -278,62 +354,120 @@ class MediaDescriptor():
if trackDiff :
if DIFF_CHANGED_KEY not in trackCompareResult . keys ( ) :
trackCompareResult [ DIFF_CHANGED_KEY ] = { }
trackCompareResult [ DIFF_CHANGED_KEY ] [ vsTracks [ vsTrackIndex ] . getIndex ( ) ] = trackDiff
trackCompareResult [ DIFF_CHANGED_KEY ] [
vsTracks [ vsTrackIndex ] . getIndex ( )
] = trackDiff
if trackCompareResult :
compareResult [ MediaDescriptor . TRACKS_KEY ] = trackCompareResult
return compareResult
def getImportFileTokens ( self , use_sub_index : bool = True ) :
reorderedTrackDescriptors = self . getReorderedTrackDescriptors ( )
importFileTokens = [ ]
for rtd in reorderedTrackDescriptors :
def getInputMappingTokens ( self , use_sub_index : bool = True ) :
importedFilePath = rtd . getExternalSourceFilePath ( )
if not importedFilePath is None :
importFileTokens + = [
" -i " ,
importedFilePath ,
]
return importFileTokens
def getInputMappingTokens ( self , use_sub_index : bool = True ) :
reorderedTrackDescriptors = self . getReorderedTrackDescriptors ( )
inputMappingTokens = [ ]
filePointer = 1
for rtd in reorderedTrackDescriptors :
importedFilePath = rtd . getExternalSourceFilePath ( )
trackType = rtd . getType ( )
if use_sub_index :
inputMappingTokens + = [ ' -map ' , f " 0: { trackType . indicator ( ) } : { rtd . getSubIndex ( ) } " ]
if importedFilePath is None :
inputMappingTokens + = [
" -map " ,
f " 0: { trackType . indicator ( ) } : { rtd . getSubIndex ( ) } " ,
]
else :
inputMappingTokens + = [
" -map " ,
f " { filePointer } : { trackType . indicator ( ) } :0 " ,
]
filePointer + = 1
else :
inputMappingTokens + = [ ' -map ' , f " 0: { rtd . getIndex ( ) } " ]
inputMappingTokens + = [ " -map " , f " 0: { rtd . getIndex ( ) } " ]
return inputMappingTokens
# matchingFileSubtitleDescriptors = sorted([d for d in availableFileSubtitleDescriptors if d['season'] == season and d['episode'] == episode], key=lambda d: d['stream']) if availableFileSubtitleDescriptors else []
def searchSubtitleFiles ( searchDirectory , prefix ) :
sesl_match = re . compile ( FileProperties . SEASON_EPISODE_STREAM_LANGUAGE_MATCH )
sesl_match = re . compile ( MediaDescriptor . SEASON_EPISODE_STREAM_LANGUAGE_MATCH )
availableFileSubtitleDescriptors = [ ]
for subtitleFilename in os . listdir ( searchDirectory ) :
if subtitleFilename . startswith ( prefix ) and subtitleFilename . endswith ( ' . ' + FileProperties . SUBTITLE_FILE_EXTENSION ) :
if subtitleFilename . startswith ( prefix ) and subtitleFilename . endswith (
" . " + MediaDescriptor . SUBTITLE_FILE_EXTENSION
) :
sesl_result = sesl_match . search ( subtitleFilename )
if sesl_result is not None :
subtitleFilePath = os . path . join ( searchDirectory , subtitleFilename )
if os . path . isfile ( subtitleFilePath ) :
subtitleFileDescriptor = { }
subtitleFileDescriptor [ ' path ' ] = subtitleFilePath
subtitleFileDescriptor [ ' season ' ] = int ( sesl_result . group ( 1 ) )
subtitleFileDescriptor [ ' episode ' ] = int ( sesl_result . group ( 2 ) )
subtitleFileDescriptor [ ' stream ' ] = int ( sesl_result . group ( 3 ) )
subtitleFileDescriptor [ ' language ' ] = sesl_result . group ( 4 )
subtitleFileDescriptor [ " path " ] = subtitleFilePath
subtitleFileDescriptor [ " season " ] = int ( sesl_result . group ( 1 ) )
subtitleFileDescriptor [ " episode " ] = int ( sesl_result . group ( 2 ) )
subtitleFileDescriptor [ " stream " ] = int ( sesl_result . group ( 3 ) )
subtitleFileDescriptor [ " language " ] = sesl_result . group ( 4 )
availableFileSubtitleDescriptors . append ( subtitleFileDescriptor )
click . echo ( f " Found { len ( availableFileSubtitleDescriptors ) } subtitles in files \n " )
click . echo (
f " Found { len ( availableFileSubtitleDescriptors ) } subtitles in files \n "
)
return availableFileSubtitleDescriptors
def importSubtitles ( self , searchDirectory , prefix ) :
availableFileSubtitleDescriptors = self . searchSubtitleFiles ( searchDirectory , prefix )
if len ( availableFileSubtitleDescriptors ) != len ( self . getSubtitleTracks ( ) ) :
raise click . ClickException ( f " MediaDescriptor.importSubtitles(): Number if subtitle files not matching number of subtitle tracks " )
def importSubtitles (
self , searchDirectory , prefix , season : int = - 1 , episode : int = - 1
) :
availableFileSubtitleDescriptors = self . searchSubtitleFiles (
searchDirectory , prefix
)
subtitleTracks = self . getSubtitleTracks ( )
# if len(availableFileSubtitleDescriptors) != len(subtitleTracks):
# raise click.ClickException(f"MediaDescriptor.importSubtitles(): Number if subtitle files not matching number of subtitle tracks")
matchingFileSubtitleDescriptors = (
sorted (
[
d
for d in availableFileSubtitleDescriptors
if d [ " season " ] == int ( season ) and d [ " episode " ] == int ( episode )
] ,
key = lambda d : d [ " stream " ] ,
)
if availableFileSubtitleDescriptors
else [ ]
)
for mfsd in matchingFileSubtitleDescriptors :
matchingSubtitleTrackDescriptor = [
s for s in subtitleTracks if s . getIndex ( ) == mfsd [ " stream " ]
]
if matchingSubtitleTrackDescriptor :
matchingSubtitleTrackDescriptor [ 0 ] . setExternalSourceFilePath (
mfsd [ " path " ]
)