click-textual
Maveno 1 year ago
parent 5e017a8373
commit eaee3b34da

@ -4,6 +4,10 @@ import os, sys, subprocess, json, click, time, re
from ffx.ffx_app import FfxApp
from ffx.media_descriptor import MediaDescriptor
from ffx.file_properties import FileProperties
VERSION='0.1.0'
DEFAULT_VIDEO_ENCODER = 'vp9'
@ -114,38 +118,39 @@ def generateOutputTokens(filepath, format, ext):
def generateAudioEncodingTokens(context, index, layout):
"""Generates ffmpeg options for one output audio stream including channel remapping, codec and bitrate"""
if layout == STREAM_LAYOUT_6_1:
return [f"-c:a:{index}",
'libopus',
f"-filter:a:{index}",
'channelmap=channel_layout=6.1',
f"-b:a:{index}",
context['bitrates']['dts']]
elif layout == STREAM_LAYOUT_5_1:
return [f"-c:a:{index}",
'libopus',
f"-filter:a:{index}",
"channelmap=FL-FL|FR-FR|FC-FC|LFE-LFE|SL-BL|SR-BR:5.1",
f"-b:a:{index}",
context['bitrates']['ac3']]
elif layout == STREAM_LAYOUT_STEREO:
return [f"-c:a:{index}",
'libopus',
f"-b:a:{index}",
context['bitrates']['stereo']]
elif layout == STREAM_LAYOUT_6CH:
return [f"-c:a:{index}",
'libopus',
f"-filter:a:{index}",
"channelmap=FL-FL|FR-FR|FC-FC|LFE-LFE|SL-BL|SR-BR:5.1",
f"-b:a:{index}",
context['bitrates']['ac3']]
else:
return []
pass
#
# if layout == STREAM_LAYOUT_6_1:
# return [f"-c:a:{index}",
# 'libopus',
# f"-filter:a:{index}",
# 'channelmap=channel_layout=6.1',
# f"-b:a:{index}",
# context['bitrates']['dts']]
#
# elif layout == STREAM_LAYOUT_5_1:
# return [f"-c:a:{index}",
# 'libopus',
# f"-filter:a:{index}",
# "channelmap=FL-FL|FR-FR|FC-FC|LFE-LFE|SL-BL|SR-BR:5.1",
# f"-b:a:{index}",
# context['bitrates']['ac3']]
#
# elif layout == STREAM_LAYOUT_STEREO:
# return [f"-c:a:{index}",
# 'libopus',
# f"-b:a:{index}",
# context['bitrates']['stereo']]
#
# elif layout == STREAM_LAYOUT_6CH:
# return [f"-c:a:{index}",
# 'libopus',
# f"-filter:a:{index}",
# "channelmap=FL-FL|FR-FR|FC-FC|LFE-LFE|SL-BL|SR-BR:5.1",
# f"-b:a:{index}",
# context['bitrates']['ac3']]
# else:
# return []
def generateClearTokens(streams):
@ -233,61 +238,74 @@ def help():
click.echo(f"Usage: ffx [input file] [output file] [vp9|av1] [q=[nn[,nn,...]]] [p=nn] [a=nnn[k]] [ac3=nnn[k]] [dts=nnn[k]] [crop]")
@click.argument('filename', nargs=1)
@ffx.command()
def streams(filename):
def inspect(filename):
try:
sd = getStreamDescriptor(filename)
except Exception as ex:
raise click.ClickException(f"This file does not contain any audiovisual data: {ex}")
for d in sd:
click.echo(f"{d['codec_name']}{' (' + str(d['channels']) + ')' if d['codec_type'] == 'audio' else ''}")
@ffx.command()
@click.pass_context
@click.argument('paths', nargs=-1)
@click.option('-l', '--label', type=str, default='', help='Label to be used as filename prefix')
@click.option('-sd', '--subtitle-directory', type=str, default='', help='Load subtitles from here')
@click.option('-sp', '--subtitle-prefix', type=str, default='', help='Subtitle filename prefix')
@click.option("-o", "--output-directory", type=str, default='')
@click.option("--dry-run", is_flag=True, default=False)
fp = FileProperties(filename)
md = fp.getMediaDescriptor()
def unmux(ctx,
label,
paths,
subtitle_directory,
subtitle_prefix,
output_directory,
dry_run):
print(md.getTags())
existingSourcePaths = [p for p in paths if os.path.isfile(p)]
click.echo(f"\nUnmuxing {len(existingSourcePaths)} files")
for sourcePath in existingSourcePaths:
for at in md.getAudioTracks():
print(f"Audio: {at.getLanguage()} {'|'.join([f"{k}={v}" for (k,v) in at.getTags().items()])}")
sd = getStreamDescriptor(sourcePath)
for st in md.getSubtitleTracks():
print(f"Subtitle: {st.getLanguage()} {'|'.join([[f"{k}={v}" for (k,v) in st.getTags().items()]])}")
print(f"\nFile {sourcePath}\n")
except Exception as ex:
raise click.ClickException(f"This file does not contain any audiovisual data: {ex}")
for v in sd['video']:
# for d in sd:
# click.echo(f"{d['codec_name']}{' (' + str(d['channels']) + ')' if d['codec_type'] == 'audio' else ''}")
if v['codec_name'] == 'h264':
commandSequence = ['ffmpeg', '-i', sourcePath, '-map', '0:v:0', '-c', 'copy', '-f', 'h264']
executeProcess()
for a in sd['audio']:
print(f"A: {a}\n")
for s in sd['subtitle']:
print(f"S: {s}\n")
# @ffx.command()
# @click.pass_context
#
# @click.argument('paths', nargs=-1)
# @click.option('-l', '--label', type=str, default='', help='Label to be used as filename prefix')
#
# @click.option('-sd', '--subtitle-directory', type=str, default='', help='Load subtitles from here')
# @click.option('-sp', '--subtitle-prefix', type=str, default='', help='Subtitle filename prefix')
#
# @click.option("-o", "--output-directory", type=str, default='')
#
# @click.option("--dry-run", is_flag=True, default=False)
#
#
# def unmux(ctx,
# label,
# paths,
# subtitle_directory,
# subtitle_prefix,
# output_directory,
# dry_run):
#
# existingSourcePaths = [p for p in paths if os.path.isfile(p)]
# click.echo(f"\nUnmuxing {len(existingSourcePaths)} files")
#
# for sourcePath in existingSourcePaths:
#
# sd = getStreamDescriptor(sourcePath)
#
# print(f"\nFile {sourcePath}\n")
#
# for v in sd['video']:
#
# if v['codec_name'] == 'h264':
#
# commandSequence = ['ffmpeg', '-i', sourcePath, '-map', '0:v:0', '-c', 'copy', '-f', 'h264']
# executeProcess()
#
# for a in sd['audio']:
# print(f"A: {a}\n")
# for s in sd['subtitle']:
# print(f"S: {s}\n")

@ -2,10 +2,10 @@ import os, re, click, json
from .media_descriptor import MediaDescriptor
from .track_type import TrackType
from .audio_layout import AudioLayout
#from .track_type import TrackType
#from .audio_layout import AudioLayout
from .track_disposition import TrackDisposition
#from .track_disposition import TrackDisposition
from .process import executeProcess
@ -18,7 +18,7 @@ class FileProperties():
EPISODE_INDICATOR_MATCH = '[eE]([0-9]+)'
def ___init__(self, sourcePath, ):
def __init__(self, sourcePath):
# Separate basedir, basename and extension for current source file
self.__sourcePath = sourcePath
@ -55,29 +55,29 @@ class FileProperties():
file_index += 1
matchingFileSubtitleDescriptors = sorted([d for d in availableFileSubtitleDescriptors if d['season'] == season and d['episode'] == episode], key=lambda d: d['stream']) if availableFileSubtitleDescriptors else []
print(f"season={season} episode={episode} file={file_index}")
# Assemble target filename tokens
targetFilenameTokens = []
targetFilenameExtension = DEFAULT_FILE_EXTENSION
if label:
targetFilenameTokens = [label]
if season > -1 and episode > -1:
targetFilenameTokens += [f"S{season:0{season_digits}d}E{episode:0{episode_digits}d}"]
elif episode > -1:
targetFilenameTokens += [f"E{episode:0{episode_digits}d}"]
else:
targetFilenameTokens += [f"{file_index:0{index_digits}d}"]
else:
targetFilenameTokens = [sourceFileBasename]
#
# matchingFileSubtitleDescriptors = sorted([d for d in availableFileSubtitleDescriptors if d['season'] == season and d['episode'] == episode], key=lambda d: d['stream']) if availableFileSubtitleDescriptors else []
#
# print(f"season={season} episode={episode} file={file_index}")
#
#
# # Assemble target filename tokens
# targetFilenameTokens = []
# targetFilenameExtension = DEFAULT_FILE_EXTENSION
#
# if label:
# targetFilenameTokens = [label]
#
# if season > -1 and episode > -1:
# targetFilenameTokens += [f"S{season:0{season_digits}d}E{episode:0{episode_digits}d}"]
# elif episode > -1:
# targetFilenameTokens += [f"E{episode:0{episode_digits}d}"]
# else:
# targetFilenameTokens += [f"{file_index:0{index_digits}d}"]
#
# else:
# targetFilenameTokens = [sourceFileBasename]
#
def getFormatData(self):
@ -185,37 +185,32 @@ class FileProperties():
return json.loads(ffprobeOutput)['streams']
def getTrackDescriptor(self, streamObj):
"""Convert the stream describing json object into a track descriptor"""
trackType = streamObj['codec_type']
descriptor = {}
if trackType in [t.label() for t in TrackType]:
descriptor['type'] = trackType
descriptor = {}
descriptor['disposition_list'] = [t for d in (k for (k,v) in streamObj['disposition'].items() if v) if (t := TrackDisposition.find(d)) if t is not None]
descriptor['tags'] = streamObj['tags'] if 'tags' in streamObj.keys() else {}
if trackType == TrackType.AUDIO.label():
descriptor['layout'] = AudioLayout.identify(streamObj)
return descriptor
# def getTrackDescriptor(self, streamObj):
# """Convert the stream describing json object into a track descriptor"""
#
# trackType = streamObj['codec_type']
#
# descriptor = {}
#
# if trackType in [t.label() for t in TrackType]:
#
# descriptor['type'] = trackType
#
# descriptor = {}
# descriptor['disposition_list'] = [t for d in (k for (k,v) in streamObj['disposition'].items() if v) if (t := TrackDisposition.find(d)) if t is not None]
#
# descriptor['tags'] = streamObj['tags'] if 'tags' in streamObj.keys() else {}
#
# if trackType == TrackType.AUDIO.label():
# descriptor['layout'] = AudioLayout.identify(streamObj)
#
# return descriptor
def getMediaDescriptor(self):
formatData = self.getFormatData()
streamData = self.getStreamData()
md = MediaDescriptor(tags=formatData['tags'] if 'tags' in formatData.keys() else {})
for streamObj in streamData:
return MediaDescriptor.fromFfprobe(self.getFormatData(), self.getStreamData())
md.appendTrack(streamObj)
# formatData = self.getFormatData()
# streamData = self.getStreamData()
return md

@ -73,6 +73,8 @@ class IsoLanguage(Enum):
VIETNAMESE = {"name": "Vietnamese", "iso639_1": "vi", "iso639_2": "vie"}
WELSH = {"name": "Welsh", "iso639_1": "cy", "iso639_2": "wel"}
UNDEFINED = {"name": "undefined", "iso639_1": "xx", "iso639_2": "und"}
@staticmethod
def find(label : str):
@ -81,14 +83,14 @@ class IsoLanguage(Enum):
if closestMatches:
foundLangs = [l for l in IsoLanguage if l.value['name'] == closestMatches[0]]
return foundLangs[0] if foundLangs else None
return foundLangs[0] if foundLangs else IsoLanguage.UNDEFINED
else:
return None
return IsoLanguage.UNDEFINED
@staticmethod
def findThreeLetter(theeLetter : str):
foundLangs = [l for l in IsoLanguage if l.value['iso639_2'] == str(theeLetter)]
return foundLangs[0] if foundLangs else None
return foundLangs[0] if foundLangs else IsoLanguage.UNDEFINED
# def get(lang : str):

@ -1,29 +1,43 @@
from ffx.track_type import TrackType
from ffx.track_descriptor import TrackDescriptor
class MediaDescriptor():
"""This class represents the structural content of a media file including streams and metadata"""
def __init__(self, **kwargs):
self.__mediaTags = kwargs['tags'] if 'tags' in kwargs.keys() else {}
self.__trackDescriptors = kwargs['trackDescriptors'] if 'trackDescriptors' in kwargs.keys() else {}
self.__clearTags = kwargs['clearTags'] if 'clearTags' in kwargs.keys() else False
@classmethod
def fromFfprobe(cls, formatData, streamData):
descriptors = {}
def __init__(self, tags = {}, clear_tags = False, tracks = []):
for streamObj in streamData:
# self.__metaTags = mediaDescriptor['tags'] if 'tags' in mediaDescriptor.keys() else {}
self.__tags = tags
trackType = TrackType.fromLabel(streamObj['codec_type'])
self.__tracks = {}
if trackType != TrackType.UNKNOWN:
# self.__videoTracks = mediaDescriptor[TrackType.VIDEO.label()] if TrackType.VIDEO.label() in mediaDescriptor.keys() else []
# self.__audioTracks = mediaDescriptor[TrackType.AUDIO.label()] if TrackType.AUDIO.label() in mediaDescriptor.keys() else []
# self.__subtitleTracks = mediaDescriptor[TrackType.SUBTITLE.label()] if TrackType.SUBTITLE.label() in mediaDescriptor.keys() else []
if trackType.label() not in descriptors.keys():
descriptors[trackType.label()] = []
self.__clearTags = clear_tags
descriptors[trackType.label()].append(TrackDescriptor.fromFfprobe(streamObj))
for t in tracks:
self.appendTrack(t)
return cls(tags=formatData['tags'] if 'tags' in formatData.keys() else {},
trackDescriptors = descriptors)
def appendTrack(self, trackDescriptor):
def getTags(self):
return self.__mediaTags
ttype = trackDescriptor['type'].label()
if ttype not in self.__tracks.keys():
self.__tracks[ttype] = []
def getAudioTracks(self):
return self.__trackDescriptors[TrackType.AUDIO.label()] if TrackType.AUDIO.label() in self.__trackDescriptors.keys() else []
self.__tracks[ttype] = trackDescriptor
def getSubtitleTracks(self):
return self.__trackDescriptors[TrackType.SUBTITLE.label()] if TrackType.SUBTITLE.label() in self.__trackDescriptors.keys() else []

@ -54,3 +54,16 @@ class Pattern(Base):
md.appendTrack(t.getDescriptor())
return md
def getId(self):
return int(self.id)
def getPattern(self):
return str(self.pattern)
def getShowId(self):
return int(self.show_id)
def getTags(self):
return {str(k.value):str(v.value) for (k,v) in self.media_tags}

@ -35,8 +35,8 @@ class Track(Base):
pattern = relationship('Pattern', back_populates='tracks')
language = Column(String) # IsoLanguage threeLetter
title = Column(String)
# language = Column(String) # IsoLanguage threeLetter
# title = Column(String)
track_tags = relationship('TrackTag', back_populates='track', cascade="all, delete")
@ -51,9 +51,9 @@ class Track(Base):
if trackType is not None:
self.track_type = int(trackType.value)
language = kwargs.pop('language', None)
if language is not None:
self.language = str(language.threeLetter())
# language = kwargs.pop('language', None)
# if language is not None:
# self.language = str(language.threeLetter())
dispositionList = kwargs.pop('disposition_flags', None)
if dispositionList is not None:
@ -62,6 +62,87 @@ class Track(Base):
super().__init__(**kwargs)
@classmethod
def fromStreamObj(cls, streamObj, subIndex, patternId):
"""{
'index': 4,
'codec_name': 'hdmv_pgs_subtitle',
'codec_long_name': 'HDMV Presentation Graphic Stream subtitles',
'codec_type': 'subtitle',
'codec_tag_string': '[0][0][0][0]',
'codec_tag': '0x0000',
'r_frame_rate': '0/0',
'avg_frame_rate': '0/0',
'time_base': '1/1000',
'start_pts': 0,
'start_time': '0.000000',
'duration_ts': 1421035,
'duration': '1421.035000',
'disposition': {
'default': 1,
'dub': 0,
'original': 0,
'comment': 0,
'lyrics': 0,
'karaoke': 0,
'forced': 0,
'hearing_impaired': 0,
'visual_impaired': 0,
'clean_effects': 0,
'attached_pic': 0,
'timed_thumbnails': 0,
'non_diegetic': 0,
'captions': 0,
'descriptions': 0,
'metadata': 0,
'dependent': 0,
'still_image': 0
},
'tags': {
'language': 'ger',
'title': 'German Full'
}
}
# v1.x
id = Column(Integer, primary_key=True, autoincrement = True)
# P=pattern_id+sub_index+track_type
track_type = Column(Integer) # TrackType
sub_index = Column(Integer)
# v1.x
pattern_id = Column(Integer, ForeignKey('patterns.id', ondelete='CASCADE'))
pattern = relationship('Pattern', back_populates='tracks')
language = Column(String) # IsoLanguage threeLetter
title = Column(String)
track_tags = relationship('TrackTag', back_populates='track', cascade='all, delete')
disposition_flags = Column(Integer)
"""
trackType = streamObj['codec_type']
if trackType in [t.label() for t in TrackType]:
return cls(pattern_id = patternId,
sub_index = int(subIndex),
track_type = trackType,
disposition_flags = sum([2**t.index() for (k,v) in streamObj['disposition'].items() if v and (t := TrackDisposition.find(k)) is not None]))
else:
return None
# def getDescriptor(self):
#
# descriptor = {}
@ -95,10 +176,12 @@ class Track(Base):
return int(self.sub_index)
def getLanguage(self):
return IsoLanguage.findThreeLetter(self.language)
tags = {t.key:t.value for t in self.track_tags}
return IsoLanguage.findThreeLetter(tags['language']) if 'language' in tags.keys() else IsoLanguage.UNKNOWN
def getTitle(self):
return str(self.title)
tags = {t.key:t.value for t in self.track_tags}
return tags['title'] if 'title' in tags.keys() else ''
def getDispositionList(self):
return TrackDisposition.toList(self.disposition_flags)

@ -1,11 +1,98 @@
from .iso_language import IsoLanguage
from .track_type import TrackType
from .audio_layout import AudioLayout
from .track_disposition import TrackDisposition
class StreamDescriptor():
pass
class TrackDescriptor():
FFPROBE_DISPOSITION_KEY = 'disposition'
FFPROBE_TAGS_KEY = 'tags'
def getTrack(self):
pass
def __init__(self, **kwargs):
# self.__index = int(kwargs['index']) if 'index' in kwargs.keys() else -1
# self.__subIndex = int(kwargs['sub_index']) if 'sub_index' in kwargs.keys() else -1
self.__trackType = kwargs['trackType'] if 'trackType' in kwargs.keys() else TrackType.UNKNOWN
self.__trackTags = kwargs['tags'] if 'tags' in kwargs.keys() else {}
self.__dispositionSet = kwargs['dispositionSet'] if 'dispositionSet' in kwargs.keys() else set()
self.__audioLayout = kwargs['audioLayout'] if self.__trackType == TrackType.AUDIO and 'audioLayout' in kwargs.keys() else AudioLayout.LAYOUT_UNDEFINED
@classmethod
def fromFfprobe(cls, streamObj):
"""Processes ffprobe stream data as array with elements according to the following example
{
"index": 4,
"codec_name": "hdmv_pgs_subtitle",
"codec_long_name": "HDMV Presentation Graphic Stream subtitles",
"codec_type": "subtitle",
"codec_tag_string": "[0][0][0][0]",
"codec_tag": "0x0000",
"r_frame_rate": "0/0",
"avg_frame_rate": "0/0",
"time_base": "1/1000",
"start_pts": 0,
"start_time": "0.000000",
"duration_ts": 1421035,
"duration": "1421.035000",
"disposition": {
"default": 1,
"dub": 0,
"original": 0,
"comment": 0,
"lyrics": 0,
"karaoke": 0,
"forced": 0,
"hearing_impaired": 0,
"visual_impaired": 0,
"clean_effects": 0,
"attached_pic": 0,
"timed_thumbnails": 0,
"non_diegetic": 0,
"captions": 0,
"descriptions": 0,
"metadata": 0,
"dependent": 0,
"still_image": 0
},
"tags": {
"language": "ger",
"title": "German Full"
}
}
"""
trackType = TrackType.fromLabel(streamObj['codec_type']) if 'codec_type' in streamObj.keys() else TrackType.UNKNOWN
if trackType != TrackType.UNKNOWN:
return cls(trackType = trackType,
dispositionSet = {t for d in (k for (k,v) in streamObj[TrackDescriptor.FFPROBE_DISPOSITION_KEY].items() if v) if (t := TrackDisposition.find(d)) if t is not None} if TrackDescriptor.FFPROBE_DISPOSITION_KEY in streamObj.keys() else set(),
tags = streamObj[TrackDescriptor.FFPROBE_TAGS_KEY] if TrackDescriptor.FFPROBE_TAGS_KEY in streamObj.keys() else {},
audioLayout = AudioLayout.identify(streamObj) if trackType == TrackType.AUDIO.label() else AudioLayout.LAYOUT_UNDEFINED)
else:
return None
def getLanguage(self):
if 'language' in self.__trackTags.keys():
return IsoLanguage.findThreeLetter(self.__trackTags['language'])
else:
return IsoLanguage.UNKNOWN
def getTitle(self):
if 'title' in self.__trackTags.keys():
return str(self.__trackTags['title'])
else:
return ''
def getAudioLayout(self):
return self.__audioLayout
def getTags(self):
return self.__trackTags

@ -49,8 +49,8 @@ class TrackDisposition(Enum):
return dispositionList
@staticmethod
def find(disposition):
matchingDispositions = [d for d in TrackDisposition if d.label() == str(disposition)]
def find(label):
matchingDispositions = [d for d in TrackDisposition if d.label() == str(label)]
if matchingDispositions:
return matchingDispositions[0]
else:

@ -2,18 +2,33 @@ from enum import Enum
class TrackType(Enum):
VIDEO = 1
AUDIO = 2
SUBTITLE = 3
VIDEO = {'label': 'video', 'index': 1}
AUDIO = {'label': 'audio', 'index': 2}
SUBTITLE = {'label': 'subtitle', 'index': 3}
UNKNOWN = {'label': 'unknown', 'index': 0}
def label(self):
"""Returns the stream type as string"""
return str(self.value['label'])
def index(self):
"""Returns the stream type index"""
return int(self.value['index'])
labels = {
TrackType.VIDEO: "video",
TrackType.AUDIO: "audio",
TrackType.SUBTITLE: "subtitle"
}
@staticmethod
def fromLabel(label):
tlist = [t for t in TrackType if t.value['label'] == label]
if tlist:
return tlist[0]
else:
return TrackType.UNKNOWN
return labels.get(self, "undefined")
@staticmethod
def fromIndex(index):
tlist = [t for t in TrackType if t.value['index'] == index]
if tlist:
return tlist[0]
else:
return TrackType.UNKNOWN

Loading…
Cancel
Save