33 Commits

Author SHA1 Message Date
Maveno
882d021bb6 RFC imports 2024-10-06 16:24:01 +02:00
Maveno
131cca2c53 tracktags UI mwe 2024-10-06 16:07:23 +02:00
Maveno
a03449a32b add/edit tag to ui 2024-10-06 12:21:27 +02:00
Maveno
7d7e43c6f0 rework descriptors raw 2024-10-06 09:58:37 +02:00
Maveno
1c9f67e47a nightl 2024-10-05 22:41:57 +02:00
Maveno
eaee3b34da nightl 2024-10-04 23:45:25 +02:00
Maveno
5e017a8373 nightl 2024-10-03 23:22:32 +02:00
123d8659e1 inc 2024-10-01 22:21:14 +02:00
82b257d809 inc 2024-10-01 12:27:23 +00:00
Maveno
8ec7f9c2d1 inc 2024-10-01 14:23:43 +02:00
Maveno
7cc6efb9f1 nightly 2024-09-29 23:00:17 +02:00
Maveno
87ccb7e8a6 fix pattern edit 2024-09-29 21:42:42 +02:00
Maveno
3765f25fd8 ff 2024-09-29 16:15:56 +02:00
Maveno
74dfbe30d7 Fix delete tracks 2024-09-29 16:13:14 +02:00
Maveno
84baeb2d87 add selection exceptions 2024-09-29 13:12:37 +02:00
Maveno
9df5973676 inc delete tracks 2024-09-29 13:07:34 +02:00
Maveno
b492ebdab9 inc edit tracks 2024-09-29 12:02:44 +02:00
Maveno
1ae52399b9 nightly 2024-09-28 19:44:12 +02:00
3008d66dfe ninc 2024-09-27 22:10:21 +02:00
9f22c70e89 inc 2024-09-27 16:33:57 +02:00
a5d568ba34 inc streams ui 2024-09-27 15:51:34 +02:00
73c957c9bb streams ui stub 2024-09-27 13:52:48 +02:00
7c899e32bb Rework MVC 2024-09-27 12:51:23 +02:00
e325aaa529 tmdb controller mwe 2024-09-27 11:15:19 +02:00
fdc8f8f602 nightly 2024-09-26 19:10:36 +02:00
7fc025821b Fix delete cascade 2024-09-26 17:50:48 +02:00
Maveno
322321b1ed inc 2024-09-26 15:07:45 +02:00
Maveno
a46a2b421e Delete Show and confirmation 2024-09-26 12:59:07 +02:00
Maveno
0cda6390cd edit show mwe 2024-09-26 11:02:23 +02:00
Maveno
c963c2c675 nightly 2024-09-26 00:18:53 +02:00
Maveno
37786f56b5 mwe full cycle form input 2024-09-25 19:24:27 +02:00
Maveno
24e85d7005 Shows Editor Screen vis 2024-09-24 19:01:58 +02:00
Maveno
9b57007d5e textual mwe 2024-09-22 21:50:16 +02:00
45 changed files with 4048 additions and 361 deletions

2
.gitignore vendored Normal file
View File

@@ -0,0 +1,2 @@
__pycache__
junk/

View File

@@ -0,0 +1,32 @@
import os
from ffx.pattern_controller import PatternController
from ffx.model.show import Base
from sqlalchemy import create_engine, Column, Integer, String, ForeignKey
from sqlalchemy.orm import relationship, sessionmaker, Mapped, backref
filename = 'Boruto.Naruto.Next.Generations.S01E256.GerEngSub.AAC.1080p.WebDL.x264-Tanuki.mkv'
# Data 'input' variable
context = {}
# Initialize DB
homeDir = os.path.expanduser("~")
ffxVarDir = os.path.join(homeDir, '.local', 'var', 'ffx')
if not os.path.exists(ffxVarDir):
os.makedirs(ffxVarDir)
context['database_url'] = f"sqlite:///{os.path.join(ffxVarDir, 'ffx.db')}"
context['database_engine'] = create_engine(context['database_url'])
context['database_session'] = sessionmaker(bind=context['database_engine'])
Base.metadata.create_all(context['database_engine'])
pc = PatternController(context)
print(pc.matchFilename(filename))

View File

@@ -1,10 +1,10 @@
#! /usr/bin/python3 #! /usr/bin/python3
import os, sys, subprocess, json, click, time import os, sys, subprocess, json, click, time, re
from textual.app import App, ComposeResult from textual.app import App, ComposeResult
from textual.screen import Screen from textual.screen import Screen
from textual.widgets import Header, Footer, Placeholder from textual.widgets import Header, Footer, Placeholder, Label
VERSION='0.1.0' VERSION='0.1.0'
@@ -35,7 +35,7 @@ MKVMERGE_METADATA_KEYS = ['BPS',
'_STATISTICS_WRITING_DATE_UTC', '_STATISTICS_WRITING_DATE_UTC',
'_STATISTICS_TAGS'] '_STATISTICS_TAGS']
FILE_EXTENSION = ['mkv', 'mp4', 'avi', 'flv', 'webm'] FILE_EXTENSIONS = ['mkv', 'mp4', 'avi', 'flv', 'webm']
COMMAND_TOKENS = ['ffmpeg', '-y', '-i'] COMMAND_TOKENS = ['ffmpeg', '-y', '-i']
@@ -50,6 +50,9 @@ STREAM_LAYOUT_5_1 = '5.1(side)'
STREAM_LAYOUT_STEREO = 'stereo' STREAM_LAYOUT_STEREO = 'stereo'
STREAM_LAYOUT_6CH = '6ch' STREAM_LAYOUT_6CH = '6ch'
SEASON_EPISODE_INDICATOR_MATCH = '([sS][0-9]+)([eE][0-9]+)'
SEASON_INDICATOR_MATCH = '([sS][0-9]+)'
EPISODE_INDICATOR_MATCH = '([eE][0-9]+)'
class DashboardScreen(Screen): class DashboardScreen(Screen):
@@ -65,6 +68,14 @@ class DashboardScreen(Screen):
yield Placeholder("Dashboard Screen") yield Placeholder("Dashboard Screen")
yield Footer() yield Footer()
class WarningScreen(Screen):
def __init__(self):
super().__init__()
context = self.app.getContext()
def compose(self) -> ComposeResult:
yield Label("Warning! This file is not compliant to the defined source schema!")
yield Footer()
class SettingsScreen(Screen): class SettingsScreen(Screen):
def __init__(self): def __init__(self):
@@ -87,29 +98,31 @@ class HelpScreen(Screen):
class ModesApp(App): class ModesApp(App):
BINDINGS = [ BINDINGS = [
("d", "switch_mode('dashboard')", "Dashboard"), ("q", "quit()", "Quit"),
("s", "switch_mode('settings')", "Settings"), # ("d", "switch_mode('dashboard')", "Dashboard"),
("h", "switch_mode('help')", "Help"), # ("s", "switch_mode('settings')", "Settings"),
# ("h", "switch_mode('help')", "Help"),
] ]
MODES = { MODES = {
"warning": WarningScreen,
"dashboard": DashboardScreen, "dashboard": DashboardScreen,
"settings": SettingsScreen, "settings": SettingsScreen,
"help": HelpScreen, "help": HelpScreen,
} }
def __init__(self, context = {}): def __init__(self, context = {}):
super().__init__() super().__init__()
self.context = context self.context = context
def on_mount(self) -> None: def on_mount(self) -> None:
self.switch_mode("dashboard") self.switch_mode("warning")
def getContext(self): def getContext(self):
return self.context return self.context
def executeProcess(commandSequence): def executeProcess(commandSequence):
process = subprocess.Popen(commandSequence, stdout=subprocess.PIPE, stderr=subprocess.PIPE) process = subprocess.Popen(commandSequence, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
@@ -354,8 +367,10 @@ def streams(filename):
@click.option("-c", "--clear-metadata", is_flag=True, default=False) @click.option("-c", "--clear-metadata", is_flag=True, default=False)
@click.option("-d", "--denoise", is_flag=True, default=False) @click.option("-d", "--denoise", is_flag=True, default=False)
@click.option("-o", "--output-directory", type=str, default='')
def convert(ctx, paths, label, video_encoder, quality, preset, stereo_bitrate, ac3_bitrate, dts_bitrate, crop, clear_metadata, default_subtitle, forced_audio, default_audio, denoise):
def convert(ctx, paths, label, video_encoder, quality, preset, stereo_bitrate, ac3_bitrate, dts_bitrate, crop, clear_metadata, default_subtitle, forced_audio, default_audio, denoise, output_directory):
"""Batch conversion of audiovideo files in format suitable for web playback, e.g. jellyfin """Batch conversion of audiovideo files in format suitable for web playback, e.g. jellyfin
Files found under PATHS will be converted according to parameters. Files found under PATHS will be converted according to parameters.
@@ -363,56 +378,79 @@ def convert(ctx, paths, label, video_encoder, quality, preset, stereo_bitrate, a
Suffices will we appended to filename in case of multiple created files Suffices will we appended to filename in case of multiple created files
or if the filename has not changed.""" or if the filename has not changed."""
#startTime = time.perf_counter() startTime = time.perf_counter()
#sourcePath = paths[0] context = ctx.obj
#targetFilename = paths[1]
#if not os.path.isfile(sourcePath):
# raise click.ClickException(f"There is no file with path {sourcePath}")
#click.echo(f"src: {sourcePath} tgt: {targetFilename}")
#click.echo(f"ve={video_encoder}") click.echo(f"\nVideo encoder: {video_encoder}")
qualityTokens = quality.split(',')
q_list = [q for q in qualityTokens if q.isnumeric()]
click.echo(f"Qualities: {q_list}")
ctx.obj['bitrates'] = {}
ctx.obj['bitrates']['stereo'] = str(stereo_bitrate) if str(stereo_bitrate).endswith('k') else f"{stereo_bitrate}k"
ctx.obj['bitrates']['ac3'] = str(ac3_bitrate) if str(ac3_bitrate).endswith('k') else f"{ac3_bitrate}k"
ctx.obj['bitrates']['dts'] = str(dts_bitrate) if str(dts_bitrate).endswith('k') else f"{dts_bitrate}k"
click.echo(f"Stereo bitrate: {ctx.obj['bitrates']['stereo']}")
click.echo(f"AC3 bitrate: {ctx.obj['bitrates']['ac3']}")
click.echo(f"DTS bitrate: {ctx.obj['bitrates']['dts']}")
ctx.obj['perform_crop'] = (crop != 'none')
if ctx.obj['perform_crop']:
cropTokens = crop.split(',')
if cropTokens and len(cropTokens) == 2:
ctx.obj['crop_start'], ctx.obj['crop_length'] = crop.split(',')
else:
ctx.obj['crop_start'] = DEFAULT_CROP_START
ctx.obj['crop_length'] = DEFAULT_CROP_LENGTH
click.echo(f"crop start={ctx.obj['crop_start']} length={ctx.obj['crop_length']}")
#qualityTokens = quality.split(',') click.echo(f"\nRunning {len(paths) * len(q_list)} jobs")
#q_list = [q for q in qualityTokens if q.isnumeric()]
se_match = re.compile(SEASON_EPISODE_INDICATOR_MATCH)
#click.echo(q_list) s_match = re.compile(SEASON_INDICATOR_MATCH)
e_match = re.compile(EPISODE_INDICATOR_MATCH)
#ctx.obj['bitrates'] = {}
#ctx.obj['bitrates']['stereo'] = str(stereo_bitrate) if str(stereo_bitrate).endswith('k') else f"{stereo_bitrate}k"
#ctx.obj['bitrates']['ac3'] = str(ac3_bitrate) if str(ac3_bitrate).endswith('k') else f"{ac3_bitrate}k"
#ctx.obj['bitrates']['dts'] = str(dts_bitrate) if str(dts_bitrate).endswith('k') else f"{dts_bitrate}k"
#click.echo(f"a={ctx.obj['bitrates']['stereo']}") for sourcePath in paths:
#click.echo(f"ac3={ctx.obj['bitrates']['ac3']}")
#click.echo(f"dts={ctx.obj['bitrates']['dts']}")
#performCrop = (crop != 'none') if not os.path.isfile(sourcePath):
click.echo(f"There is no file with path {sourcePath}, skipping ...")
continue
sourceDirectory = os.path.dirname(sourcePath)
sourceFilename = os.path.basename(sourcePath)
sourcePathTokens = sourceFilename.split('.')
if sourcePathTokens[-1] in FILE_EXTENSIONS:
sourceFileBasename = '.'.join(sourcePathTokens[:-1])
sourceFilenameExtension = sourcePathTokens[-1]
else:
sourceFileBasename = sourceFilename
sourceFilenameExtension = ''
#click.echo(f"dir={sourceDirectory} base={sourceFileBasename} ext={sourceFilenameExtension}")
click.echo(f"\nProcessing file {sourcePath}")
#if performCrop: se_result = se_match.search(sourceFilename)
s_result = s_match.search(sourceFilename)
e_result = e_match.search(sourceFilename)
#cropTokens = crop.split(',')
#if cropTokens and len(cropTokens) == 2:
#cropStart, cropLength = crop.split(',')
#else:
#cropStart = DEFAULT_CROP_START
#cropLength = DEFAULT_CROP_LENGTH
#click.echo(f"crop start={cropStart} length={cropLength}")
#click.echo(f"\nRunning {len(q_list)} jobs")
#streamDescriptor = getStreamDescriptor(sourcePath) #streamDescriptor = getStreamDescriptor(sourcePath)
@@ -497,15 +535,18 @@ def convert(ctx, paths, label, video_encoder, quality, preset, stereo_bitrate, a
#executeProcess(commandSequence2) #executeProcess(commandSequence2)
#click.echo('\nDONE\n') #app = ModesApp(ctx.obj)
#app.run()
#endTime = time.perf_counter() #click.confirm('Warning! This file is not compliant to the defined source schema! Do you want to continue?', abort=True)
#click.echo(f"Time elapsed {endTime - startTime}")
click.echo('\nDONE\n')
app = ModesApp(ctx.obj) endTime = time.perf_counter()
app.run() click.echo(f"Time elapsed {endTime - startTime}")
click.echo(f"app result: {app.getContext()}")
# click.echo(f"app result: {app.getContext()}")

32
bin/check.py Normal file
View File

@@ -0,0 +1,32 @@
import os
from ffx.pattern_controller import PatternController
from ffx.model.show import Base
from sqlalchemy import create_engine, Column, Integer, String, ForeignKey
from sqlalchemy.orm import relationship, sessionmaker, Mapped, backref
filename = 'Boruto.Naruto.Next.Generations.S01E256.GerEngSub.AAC.1080p.WebDL.x264-Tanuki.mkv'
# Data 'input' variable
context = {}
# Initialize DB
homeDir = os.path.expanduser("~")
ffxVarDir = os.path.join(homeDir, '.local', 'var', 'ffx')
if not os.path.exists(ffxVarDir):
os.makedirs(ffxVarDir)
context['database_url'] = f"sqlite:///{os.path.join(ffxVarDir, 'ffx.db')}"
context['database_engine'] = create_engine(context['database_url'])
context['database_session'] = sessionmaker(bind=context['database_engine'])
Base.metadata.create_all(context['database_engine'])
pc = PatternController(context)
print(pc.matchFilename(filename))

View File

@@ -2,9 +2,12 @@
import os, sys, subprocess, json, click, time, re import os, sys, subprocess, json, click, time, re
from textual.app import App, ComposeResult from ffx.ffx_app import FfxApp
from textual.screen import Screen
from textual.widgets import Header, Footer, Placeholder, Label from ffx.database import databaseContext
from ffx.media_descriptor import MediaDescriptor
from ffx.file_properties import FileProperties
VERSION='0.1.0' VERSION='0.1.0'
@@ -44,11 +47,6 @@ STREAM_TYPE_VIDEO = 'video'
STREAM_TYPE_AUDIO = 'audio' STREAM_TYPE_AUDIO = 'audio'
STREAM_TYPE_SUBTITLE = 'subtitle' STREAM_TYPE_SUBTITLE = 'subtitle'
STREAM_LAYOUT_6_1 = '6.1'
STREAM_LAYOUT_5_1 = '5.1(side)'
STREAM_LAYOUT_STEREO = 'stereo'
STREAM_LAYOUT_6CH = '6ch'
SEASON_EPISODE_INDICATOR_MATCH = '[sS]([0-9]+)[eE]([0-9]+)' SEASON_EPISODE_INDICATOR_MATCH = '[sS]([0-9]+)[eE]([0-9]+)'
EPISODE_INDICATOR_MATCH = '[eE]([0-9]+)' EPISODE_INDICATOR_MATCH = '[eE]([0-9]+)'
SEASON_EPISODE_STREAM_LANGUAGE_MATCH = '[sS]([0-9]+)[eE]([0-9]+)_([0-9]+)_([a-z]{3})' SEASON_EPISODE_STREAM_LANGUAGE_MATCH = '[sS]([0-9]+)[eE]([0-9]+)_([0-9]+)_([a-z]{3})'
@@ -56,185 +54,6 @@ SEASON_EPISODE_STREAM_LANGUAGE_MATCH = '[sS]([0-9]+)[eE]([0-9]+)_([0-9]+)_([a-z]
SUBTITLE_FILE_EXTENSION = 'vtt' SUBTITLE_FILE_EXTENSION = 'vtt'
class DashboardScreen(Screen):
def __init__(self):
super().__init__()
context = self.app.getContext()
context['dashboard'] = 'dashboard'
def compose(self) -> ComposeResult:
yield Header(show_clock=True)
yield Placeholder("Dashboard Screen")
yield Footer()
class WarningScreen(Screen):
def __init__(self):
super().__init__()
context = self.app.getContext()
def compose(self) -> ComposeResult:
yield Label("Warning! This file is not compliant to the defined source schema!")
yield Footer()
class SettingsScreen(Screen):
def __init__(self):
super().__init__()
context = self.app.getContext()
def compose(self) -> ComposeResult:
yield Placeholder("Settings Screen")
yield Footer()
class HelpScreen(Screen):
def __init__(self):
super().__init__()
context = self.app.getContext()
def compose(self) -> ComposeResult:
yield Placeholder("Help Screen")
yield Footer()
class ModesApp(App):
BINDINGS = [
("q", "quit()", "Quit"),
# ("d", "switch_mode('dashboard')", "Dashboard"),
# ("s", "switch_mode('settings')", "Settings"),
# ("h", "switch_mode('help')", "Help"),
]
MODES = {
"warning": WarningScreen,
"dashboard": DashboardScreen,
"settings": SettingsScreen,
"help": HelpScreen,
}
def __init__(self, context = {}):
super().__init__()
self.context = context
def on_mount(self) -> None:
self.switch_mode("warning")
def getContext(self):
return self.context
def executeProcess(commandSequence):
process = subprocess.Popen(commandSequence, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
output, error = process.communicate()
return output.decode('utf-8'), error.decode('utf-8'), process.returncode
#[{'index': 0, 'codec_name': 'vp9', 'codec_long_name': 'Google VP9', 'profile': 'Profile 0', 'codec_type': 'video', 'codec_tag_string': '[0][0][0][0]', 'codec_tag': '0x0000', 'width': 1920, 'height': 1080, 'coded_width': 1920, 'coded_height': 1080, 'closed_captions': 0, 'film_grain': 0, 'has_b_frames': 0, 'sample_aspect_ratio': '1:1', 'display_aspect_ratio': '16:9', 'pix_fmt': 'yuv420p', 'level': -99, 'color_range': 'tv', 'chroma_location': 'left', 'field_order': 'progressive', 'refs': 1, 'r_frame_rate': '24000/1001', 'avg_frame_rate': '24000/1001', 'time_base': '1/1000', 'start_pts': 0, 'start_time': '0.000000', 'disposition': {'default': 1, 'dub': 0, 'original': 0, 'comment': 0, 'lyrics': 0, 'karaoke': 0, 'forced': 0, 'hearing_impaired': 0, 'visual_impaired': 0, 'clean_effects': 0, 'attached_pic': 0, 'timed_thumbnails': 0, 'non_diegetic': 0, 'captions': 0, 'descriptions': 0, 'metadata': 0, 'dependent': 0, 'still_image': 0}, 'tags': {'BPS': '7974017', 'NUMBER_OF_FRAMES': '34382', 'NUMBER_OF_BYTES': '1429358655', '_STATISTICS_WRITING_APP': "mkvmerge v63.0.0 ('Everything') 64-bit", '_STATISTICS_WRITING_DATE_UTC': '2023-10-07 13:59:46', '_STATISTICS_TAGS': 'BPS DURATION NUMBER_OF_FRAMES NUMBER_OF_BYTES', 'ENCODER': 'Lavc61.3.100 libvpx-vp9', 'DURATION': '00:23:54.016000000'}}]
#[{'index': 1, 'codec_name': 'opus', 'codec_long_name': 'Opus (Opus Interactive Audio Codec)', 'codec_type': 'audio', 'codec_tag_string': '[0][0][0][0]', 'codec_tag': '0x0000', 'sample_fmt': 'fltp', 'sample_rate': '48000', 'channels': 2, 'channel_layout': 'stereo', 'bits_per_sample': 0, 'initial_padding': 312, 'r_frame_rate': '0/0', 'avg_frame_rate': '0/0', 'time_base': '1/1000', 'start_pts': -7, 'start_time': '-0.007000', 'extradata_size': 19, 'disposition': {'default': 1, 'dub': 0, 'original': 0, 'comment': 0, 'lyrics': 0, 'karaoke': 0, 'forced': 0, 'hearing_impaired': 0, 'visual_impaired': 0, 'clean_effects': 0, 'attached_pic': 0, 'timed_thumbnails': 0, 'non_diegetic': 0, 'captions': 0, 'descriptions': 0, 'metadata': 0, 'dependent': 0, 'still_image': 0}, 'tags': {'language': 'jpn', 'title': 'Japanisch', 'BPS': '128000', 'NUMBER_OF_FRAMES': '61763', 'NUMBER_OF_BYTES': '22946145', '_STATISTICS_WRITING_APP': "mkvmerge v63.0.0 ('Everything') 64-bit", '_STATISTICS_WRITING_DATE_UTC': '2023-10-07 13:59:46', '_STATISTICS_TAGS': 'BPS DURATION NUMBER_OF_FRAMES NUMBER_OF_BYTES', 'ENCODER': 'Lavc61.3.100 libopus', 'DURATION': '00:23:54.141000000'}}]
#[{'index': 2, 'codec_name': 'webvtt', 'codec_long_name': 'WebVTT subtitle', 'codec_type': 'subtitle', 'codec_tag_string': '[0][0][0][0]', 'codec_tag': '0x0000', 'r_frame_rate': '0/0', 'avg_frame_rate': '0/0', 'time_base': '1/1000', 'start_pts': -7, 'start_time': '-0.007000', 'duration_ts': 1434141, 'duration': '1434.141000', 'disposition': {'default': 1, 'dub': 0, 'original': 0, 'comment': 0, 'lyrics': 0, 'karaoke': 0, 'forced': 0, 'hearing_impaired': 0, 'visual_impaired': 0, 'clean_effects': 0, 'attached_pic': 0, 'timed_thumbnails': 0, 'non_diegetic': 0, 'captions': 0, 'descriptions': 0, 'metadata': 0, 'dependent': 0, 'still_image': 0}, 'tags': {'language': 'ger', 'title': 'Deutsch [Full]', 'BPS': '118', 'NUMBER_OF_FRAMES': '300', 'NUMBER_OF_BYTES': '21128', '_STATISTICS_WRITING_APP': "mkvmerge v63.0.0 ('Everything') 64-bit", '_STATISTICS_WRITING_DATE_UTC': '2023-10-07 13:59:46', '_STATISTICS_TAGS': 'BPS DURATION NUMBER_OF_FRAMES NUMBER_OF_BYTES', 'ENCODER': 'Lavc61.3.100 webvtt', 'DURATION': '00:23:54.010000000'}}, {'index': 3, 'codec_name': 'webvtt', 'codec_long_name': 'WebVTT subtitle', 'codec_type': 'subtitle', 'codec_tag_string': '[0][0][0][0]', 'codec_tag': '0x0000', 'r_frame_rate': '0/0', 'avg_frame_rate': '0/0', 'time_base': '1/1000', 'start_pts': -7, 'start_time': '-0.007000', 'duration_ts': 1434141, 'duration': '1434.141000', 'disposition': {'default': 0, 'dub': 0, 'original': 0, 'comment': 0, 'lyrics': 0, 'karaoke': 0, 'forced': 0, 'hearing_impaired': 0, 'visual_impaired': 0, 'clean_effects': 0, 'attached_pic': 0, 'timed_thumbnails': 0, 'non_diegetic': 0, 'captions': 0, 'descriptions': 0, 'metadata': 0, 'dependent': 0, 'still_image': 0}, 'tags': {'language': 'eng', 'title': 'Englisch [Full]', 'BPS': '101', 'NUMBER_OF_FRAMES': '276', 'NUMBER_OF_BYTES': '16980', '_STATISTICS_WRITING_APP': "mkvmerge v63.0.0 ('Everything') 64-bit", '_STATISTICS_WRITING_DATE_UTC': '2023-10-07 13:59:46', '_STATISTICS_TAGS': 'BPS DURATION NUMBER_OF_FRAMES NUMBER_OF_BYTES', 'ENCODER': 'Lavc61.3.100 webvtt', 'DURATION': '00:23:53.230000000'}}]
def getStreamData(filepath):
"""Returns ffprobe stream data as array with elements according to the following example
{
"index": 4,
"codec_name": "hdmv_pgs_subtitle",
"codec_long_name": "HDMV Presentation Graphic Stream subtitles",
"codec_type": "subtitle",
"codec_tag_string": "[0][0][0][0]",
"codec_tag": "0x0000",
"r_frame_rate": "0/0",
"avg_frame_rate": "0/0",
"time_base": "1/1000",
"start_pts": 0,
"start_time": "0.000000",
"duration_ts": 1421035,
"duration": "1421.035000",
"disposition": {
"default": 1,
"dub": 0,
"original": 0,
"comment": 0,
"lyrics": 0,
"karaoke": 0,
"forced": 0,
"hearing_impaired": 0,
"visual_impaired": 0,
"clean_effects": 0,
"attached_pic": 0,
"timed_thumbnails": 0,
"non_diegetic": 0,
"captions": 0,
"descriptions": 0,
"metadata": 0,
"dependent": 0,
"still_image": 0
},
"tags": {
"language": "ger",
"title": "German Full"
}
}
"""
ffprobeOutput, ffprobeError, returnCode = executeProcess(["ffprobe",
"-show_streams",
"-of", "json",
filepath])
if 'Invalid data found when processing input' in ffprobeError:
raise Exception(f"File {filepath} does not contain valid stream data")
if returnCode != 0:
raise Exception(f"ffprobe returned with error {returnCode}")
return json.loads(ffprobeOutput)['streams']
def getStreamDescriptor(filename):
streamData = getStreamData(filename)
descriptor = {}
descriptor['video'] = []
descriptor['audio'] = []
descriptor['subtitle'] = []
for subStream in streamData:
if not 'disposition' in subStream.keys():
subStream['disposition'] = {}
if not 'default' in subStream['disposition'].keys():
subStream['disposition']['default'] = 0
if not 'forced' in subStream['disposition'].keys():
subStream['disposition']['forced'] = 0
if not 'tags' in subStream.keys():
subStream['tags'] = {}
if not 'language' in subStream['tags'].keys():
subStream['tags']['language'] = 'undefined'
if not 'title' in subStream['tags'].keys():
subStream['tags']['title'] = 'undefined'
if subStream['codec_type'] == STREAM_TYPE_AUDIO:
if 'channel_layout' in subStream.keys():
subStream['audio_layout'] = subStream['channel_layout']
elif subStream['channels'] == 6:
subStream['audio_layout'] = STREAM_LAYOUT_6CH
else:
subStream['audio_layout'] = 'undefined'
descriptor[subStream['codec_type']].append(subStream)
descriptor[subStream['codec_type']][-1]['sub_index'] = len(descriptor[subStream['codec_type']]) - 1
return descriptor
def getModifiedStreamOrder(length, last): def getModifiedStreamOrder(length, last):
"""This is jellyfin specific as the last stream in the order is set as default""" """This is jellyfin specific as the last stream in the order is set as default"""
@@ -301,38 +120,39 @@ def generateOutputTokens(filepath, format, ext):
def generateAudioEncodingTokens(context, index, layout): def generateAudioEncodingTokens(context, index, layout):
"""Generates ffmpeg options for one output audio stream including channel remapping, codec and bitrate""" """Generates ffmpeg options for one output audio stream including channel remapping, codec and bitrate"""
pass
if layout == STREAM_LAYOUT_6_1: #
return [f"-c:a:{index}", # if layout == STREAM_LAYOUT_6_1:
'libopus', # return [f"-c:a:{index}",
f"-filter:a:{index}", # 'libopus',
'channelmap=channel_layout=6.1', # f"-filter:a:{index}",
f"-b:a:{index}", # 'channelmap=channel_layout=6.1',
context['bitrates']['dts']] # f"-b:a:{index}",
# context['bitrates']['dts']]
elif layout == STREAM_LAYOUT_5_1: #
return [f"-c:a:{index}", # elif layout == STREAM_LAYOUT_5_1:
'libopus', # return [f"-c:a:{index}",
f"-filter:a:{index}", # 'libopus',
"channelmap=FL-FL|FR-FR|FC-FC|LFE-LFE|SL-BL|SR-BR:5.1", # f"-filter:a:{index}",
f"-b:a:{index}", # "channelmap=FL-FL|FR-FR|FC-FC|LFE-LFE|SL-BL|SR-BR:5.1",
context['bitrates']['ac3']] # f"-b:a:{index}",
# context['bitrates']['ac3']]
elif layout == STREAM_LAYOUT_STEREO: #
return [f"-c:a:{index}", # elif layout == STREAM_LAYOUT_STEREO:
'libopus', # return [f"-c:a:{index}",
f"-b:a:{index}", # 'libopus',
context['bitrates']['stereo']] # f"-b:a:{index}",
# context['bitrates']['stereo']]
elif layout == STREAM_LAYOUT_6CH: #
return [f"-c:a:{index}", # elif layout == STREAM_LAYOUT_6CH:
'libopus', # return [f"-c:a:{index}",
f"-filter:a:{index}", # 'libopus',
"channelmap=FL-FL|FR-FR|FC-FC|LFE-LFE|SL-BL|SR-BR:5.1", # f"-filter:a:{index}",
f"-b:a:{index}", # "channelmap=FL-FL|FR-FR|FC-FC|LFE-LFE|SL-BL|SR-BR:5.1",
context['bitrates']['ac3']] # f"-b:a:{index}",
else: # context['bitrates']['ac3']]
return [] # else:
# return []
def generateClearTokens(streams): def generateClearTokens(streams):
@@ -403,8 +223,9 @@ def searchSubtitleFiles(dir, prefix):
@click.pass_context @click.pass_context
def ffx(ctx): def ffx(ctx):
"""FFX""" """FFX"""
ctx.obj = {} ctx.obj = {}
pass ctx.obj['database'] = databaseContext()
# Define a subcommand # Define a subcommand
@@ -420,16 +241,94 @@ def help():
click.echo(f"Usage: ffx [input file] [output file] [vp9|av1] [q=[nn[,nn,...]]] [p=nn] [a=nnn[k]] [ac3=nnn[k]] [dts=nnn[k]] [crop]") click.echo(f"Usage: ffx [input file] [output file] [vp9|av1] [q=[nn[,nn,...]]] [p=nn] [a=nnn[k]] [ac3=nnn[k]] [dts=nnn[k]] [crop]")
@click.argument('filename', nargs=1)
@ffx.command() @ffx.command()
def streams(filename): @click.pass_context
@click.argument('filename', nargs=1)
def inspect(ctx, filename):
# if 'database' not in ctx.obj.keys():
# ctx.obj['database'] = databaseContext()
try: try:
sd = getStreamDescriptor(filename)
fp = FileProperties(ctx.obj, filename)
md = fp.getMediaDescriptor()
print(md.getTags())
for at in md.getAudioTracks():
print(f"Audio: {at.getLanguage()} {'|'.join([f"{k}={v}" for (k,v) in at.getTags().items()])}")
for st in md.getSubtitleTracks():
print(f"Subtitle: {st.getLanguage()} {'|'.join([[f"{k}={v}" for (k,v) in st.getTags().items()]])}")
except Exception as ex: except Exception as ex:
raise click.ClickException(f"This file does not contain any audiovisual data: {ex}") raise click.ClickException(f"This file does not contain any audiovisual data: {ex}")
for d in sd:
click.echo(f"{d['codec_name']}{' (' + str(d['channels']) + ')' if d['codec_type'] == 'audio' else ''}") # for d in sd:
# click.echo(f"{d['codec_name']}{' (' + str(d['channels']) + ')' if d['codec_type'] == 'audio' else ''}")
# @ffx.command()
# @click.pass_context
#
# @click.argument('paths', nargs=-1)
# @click.option('-l', '--label', type=str, default='', help='Label to be used as filename prefix')
#
# @click.option('-sd', '--subtitle-directory', type=str, default='', help='Load subtitles from here')
# @click.option('-sp', '--subtitle-prefix', type=str, default='', help='Subtitle filename prefix')
#
# @click.option("-o", "--output-directory", type=str, default='')
#
# @click.option("--dry-run", is_flag=True, default=False)
#
#
# def unmux(ctx,
# label,
# paths,
# subtitle_directory,
# subtitle_prefix,
# output_directory,
# dry_run):
#
# existingSourcePaths = [p for p in paths if os.path.isfile(p)]
# click.echo(f"\nUnmuxing {len(existingSourcePaths)} files")
#
# for sourcePath in existingSourcePaths:
#
# sd = getStreamDescriptor(sourcePath)
#
# print(f"\nFile {sourcePath}\n")
#
# for v in sd['video']:
#
# if v['codec_name'] == 'h264':
#
# commandSequence = ['ffmpeg', '-i', sourcePath, '-map', '0:v:0', '-c', 'copy', '-f', 'h264']
# executeProcess()
#
# for a in sd['audio']:
# print(f"A: {a}\n")
# for s in sd['subtitle']:
# print(f"S: {s}\n")
@ffx.command()
@click.pass_context
def shows(ctx):
# if 'database' not in ctx.obj.keys():
# ctx.obj['database'] = databaseContext()
app = FfxApp(ctx.obj)
app.run()
@@ -514,6 +413,10 @@ def convert(ctx,
context = ctx.obj context = ctx.obj
if 'database' not in context.keys():
context['database'] = databaseContext()
click.echo(f"\nVideo encoder: {video_encoder}") click.echo(f"\nVideo encoder: {video_encoder}")
qualityTokens = quality.split(',') qualityTokens = quality.split(',')
@@ -580,6 +483,9 @@ def convert(ctx,
for sourcePath in existingSourcePaths: for sourcePath in existingSourcePaths:
###
###
# Separate basedir, basename and extension for current source file # Separate basedir, basename and extension for current source file
sourceDirectory = os.path.dirname(sourcePath) sourceDirectory = os.path.dirname(sourcePath)
sourceFilename = os.path.basename(sourcePath) sourceFilename = os.path.basename(sourcePath)
@@ -637,6 +543,8 @@ def convert(ctx,
else: else:
targetFilenameTokens = [sourceFileBasename] targetFilenameTokens = [sourceFileBasename]
###
###
# Load source stream descriptor # Load source stream descriptor
try: try:

View File

@@ -0,0 +1,66 @@
import os, re, click
class FileProperties():
FILE_EXTENSIONS = ['mkv', 'mp4', 'avi', 'flv', 'webm']
SEASON_EPISODE_INDICATOR_MATCH = '[sS]([0-9]+)[eE]([0-9]+)'
EPISODE_INDICATOR_MATCH = '[eE]([0-9]+)'
def ___init__(self, sourcePath, ):
# Separate basedir, basename and extension for current source file
self.__sourceDirectory = os.path.dirname(sourcePath)
self.__sourceFilename = os.path.basename(sourcePath)
sourcePathTokens = self.__sourceFilename.split('.')
if sourcePathTokens[-1] in FilenameController.FILE_EXTENSIONS:
self.__sourceFileBasename = '.'.join(sourcePathTokens[:-1])
self.__sourceFilenameExtension = sourcePathTokens[-1]
else:
self.__sourceFileBasename = self.__sourceFilename
self.__sourceFilenameExtension = ''
se_match = re.compile(FilenameController.SEASON_EPISODE_INDICATOR_MATCH)
e_match = re.compile(FilenameController.EPISODE_INDICATOR_MATCH)
se_result = se_match.search(self.__sourceFilename)
e_result = e_match.search(self.__sourceFilename)
self.__season = -1
self.__episode = -1
file_index = 0
if se_result is not None:
self.__season = int(se_result.group(1))
self.__episode = int(se_result.group(2))
elif e_result is not None:
self.__episode = int(e_result.group(1))
else:
file_index += 1
matchingFileSubtitleDescriptors = sorted([d for d in availableFileSubtitleDescriptors if d['season'] == season and d['episode'] == episode], key=lambda d: d['stream']) if availableFileSubtitleDescriptors else []
print(f"season={season} episode={episode} file={file_index}")
# Assemble target filename tokens
targetFilenameTokens = []
targetFilenameExtension = DEFAULT_FILE_EXTENSION
if label:
targetFilenameTokens = [label]
if season > -1 and episode > -1:
targetFilenameTokens += [f"S{season:0{season_digits}d}E{episode:0{episode_digits}d}"]
elif episode > -1:
targetFilenameTokens += [f"E{episode:0{episode_digits}d}"]
else:
targetFilenameTokens += [f"{file_index:0{index_digits}d}"]
else:
targetFilenameTokens = [sourceFileBasename]

0
bin/ffx/__init__.py Normal file
View File

46
bin/ffx/audio_layout.py Normal file
View File

@@ -0,0 +1,46 @@
from enum import Enum
from .track_type import TrackType
class AudioLayout(Enum):
LAYOUT_STEREO = {"layout": "stereo", "index": 1}
LAYOUT_5_1 = {"layout": "5.1(side)", "index": 2}
LAYOUT_6_1 = {"layout": "6.1", "index": 3}
LAYOUT_7_1 = {"layout": "7.1", "index": 4} #TODO: Does this exist?
LAYOUT_6CH = {"layout": "6ch", "index": 5}
LAYOUT_UNDEFINED = {"layout": "undefined", "index": 0}
def layout(self):
"""Returns the layout as string"""
return self.value['layout']
def index(self):
"""Returns the layout as string"""
return self.value['layout']
def identify(self, streamObj):
FFPROBE_LAYOUT_KEY = 'channel_layout'
FFPROBE_CHANNELS_KEY = 'channels'
FFPROBE_CODEC_TYPE_KEY = 'codec_type'
if (type(streamObj) is not dict
or FFPROBE_CODEC_TYPE_KEY not in streamObj.keys()
or streamObj[FFPROBE_CODEC_TYPE_KEY] != TrackType.AUDIO.label()):
raise Exception('Not an ffprobe audio stream object')
if FFPROBE_LAYOUT_KEY in streamObj.keys():
matchingLayouts = [l for l in AudioLayout if l.value['layout'] == streamObj[FFPROBE_LAYOUT_KEY]]
if matchingLayouts:
return matchingLayouts[0]
if (FFPROBE_CHANNELS_KEY in streamObj.keys()
and int(streamObj[FFPROBE_CHANNELS_KEY]) == 6):
return AudioLayout.LAYOUT_6CH
return AudioLayout.LAYOUT_UNDEFINED

View File

@@ -0,0 +1,16 @@
from textual.app import App, ComposeResult
from textual.screen import Screen
from textual.widgets import Header, Footer, Placeholder, Label
class DashboardScreen(Screen):
def __init__(self):
super().__init__()
context = self.app.getContext()
context['dashboard'] = 'dashboard'
def compose(self) -> ComposeResult:
yield Header(show_clock=True)
yield Placeholder("Dashboard Screen")
yield Footer()

41
bin/ffx/database.py Normal file
View File

@@ -0,0 +1,41 @@
import os
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from ffx.model.show import Base, Show
from ffx.model.pattern import Pattern
from ffx.model.track import Track
from ffx.model.media_tag import MediaTag
from ffx.model.track_tag import TrackTag
def databaseContext():
databaseContext = {}
# Initialize DB
homeDir = os.path.expanduser("~")
ffxVarDir = os.path.join(homeDir, '.local', 'var', 'ffx')
if not os.path.exists(ffxVarDir):
os.makedirs(ffxVarDir)
databaseContext['url'] = f"sqlite:///{os.path.join(ffxVarDir, 'ffx.db')}"
databaseContext['engine'] = create_engine(databaseContext['url'])
databaseContext['session'] = sessionmaker(bind=databaseContext['engine'])
Base.metadata.create_all(databaseContext['engine'])
# isSyncronuous = False
# while not isSyncronuous:
# while True:
# try:
# with databaseContext['database_engine'].connect() as connection:
# connection.execute(sqlalchemy.text('PRAGMA foreign_keys=ON;'))
# #isSyncronuous = True
# break
# except sqlite3.OperationalError:
# time.sleep(0.1)
return databaseContext

29
bin/ffx/ffx_app.py Normal file
View File

@@ -0,0 +1,29 @@
from textual.app import App
from .shows_screen import ShowsScreen
class FfxApp(App):
TITLE = "FFX"
BINDINGS = [
("q", "quit()", "Quit"),
("h", "switch_mode('help')", "Help"),
]
def __init__(self, context = {}):
super().__init__()
# Data 'input' variable
self.context = context
def on_mount(self) -> None:
self.push_screen(ShowsScreen())
def getContext(self):
"""Data 'output' method"""
return self.context

View File

@@ -1,2 +0,0 @@
class FilePattern():
pass

215
bin/ffx/file_properties.py Normal file
View File

@@ -0,0 +1,215 @@
import os, re, click, json
from .media_descriptor import MediaDescriptor
from .pattern_controller import PatternController
from .process import executeProcess
class FileProperties():
FILE_EXTENSIONS = ['mkv', 'mp4', 'avi', 'flv', 'webm']
SEASON_EPISODE_INDICATOR_MATCH = '[sS]([0-9]+)[eE]([0-9]+)'
EPISODE_INDICATOR_MATCH = '[eE]([0-9]+)'
def __init__(self, context, sourcePath):
# Separate basedir, basename and extension for current source file
self.__sourcePath = sourcePath
self.__sourceDirectory = os.path.dirname(self.__sourcePath)
self.__sourceFilename = os.path.basename(self.__sourcePath)
sourcePathTokens = self.__sourceFilename.split('.')
if sourcePathTokens[-1] in FileProperties.FILE_EXTENSIONS:
self.__sourceFileBasename = '.'.join(sourcePathTokens[:-1])
self.__sourceFilenameExtension = sourcePathTokens[-1]
else:
self.__sourceFileBasename = self.__sourceFilename
self.__sourceFilenameExtension = ''
se_match = re.compile(FileProperties.SEASON_EPISODE_INDICATOR_MATCH)
e_match = re.compile(FileProperties.EPISODE_INDICATOR_MATCH)
se_result = se_match.search(self.__sourceFilename)
e_result = e_match.search(self.__sourceFilename)
self.__season = -1
self.__episode = -1
file_index = 0
if se_result is not None:
self.__season = int(se_result.group(1))
self.__episode = int(se_result.group(2))
elif e_result is not None:
self.__episode = int(e_result.group(1))
else:
file_index += 1
pc = PatternController(context)
pattern = pc.matchFilename(self.__sourceFilename)
click.echo(pattern)
# matchingFileSubtitleDescriptors = sorted([d for d in availableFileSubtitleDescriptors if d['season'] == season and d['episode'] == episode], key=lambda d: d['stream']) if availableFileSubtitleDescriptors else []
#
# print(f"season={season} episode={episode} file={file_index}")
#
#
# # Assemble target filename tokens
# targetFilenameTokens = []
# targetFilenameExtension = DEFAULT_FILE_EXTENSION
#
# if label:
# targetFilenameTokens = [label]
#
# if season > -1 and episode > -1:
# targetFilenameTokens += [f"S{season:0{season_digits}d}E{episode:0{episode_digits}d}"]
# elif episode > -1:
# targetFilenameTokens += [f"E{episode:0{episode_digits}d}"]
# else:
# targetFilenameTokens += [f"{file_index:0{index_digits}d}"]
#
# else:
# targetFilenameTokens = [sourceFileBasename]
#
def getFormatData(self):
"""
"format": {
"filename": "Downloads/nagatoro_s02/nagatoro_s01e02.mkv",
"nb_streams": 18,
"nb_programs": 0,
"nb_stream_groups": 0,
"format_name": "matroska,webm",
"format_long_name": "Matroska / WebM",
"start_time": "0.000000",
"duration": "1420.063000",
"size": "1489169824",
"bit_rate": "8389316",
"probe_score": 100,
"tags": {
"PUBLISHER": "Crunchyroll",
"ENCODER": "Lavf58.29.100"
}
}
"""
ffprobeOutput, ffprobeError, returnCode = executeProcess(["ffprobe",
"-hide_banner",
"-show_format",
"-of", "json",
self.__sourcePath])
if 'Invalid data found when processing input' in ffprobeError:
raise Exception(f"File {self.__sourcePath} does not contain valid stream data")
if returnCode != 0:
raise Exception(f"ffprobe returned with error {returnCode}")
return json.loads(ffprobeOutput)['format']
#[{'index': 0, 'codec_name': 'vp9', 'codec_long_name': 'Google VP9', 'profile': 'Profile 0', 'codec_type': 'video', 'codec_tag_string': '[0][0][0][0]', 'codec_tag': '0x0000', 'width': 1920, 'height': 1080, 'coded_width': 1920, 'coded_height': 1080, 'closed_captions': 0, 'film_grain': 0, 'has_b_frames': 0, 'sample_aspect_ratio': '1:1', 'display_aspect_ratio': '16:9', 'pix_fmt': 'yuv420p', 'level': -99, 'color_range': 'tv', 'chroma_location': 'left', 'field_order': 'progressive', 'refs': 1, 'r_frame_rate': '24000/1001', 'avg_frame_rate': '24000/1001', 'time_base': '1/1000', 'start_pts': 0, 'start_time': '0.000000', 'disposition': {'default': 1, 'dub': 0, 'original': 0, 'comment': 0, 'lyrics': 0, 'karaoke': 0, 'forced': 0, 'hearing_impaired': 0, 'visual_impaired': 0, 'clean_effects': 0, 'attached_pic': 0, 'timed_thumbnails': 0, 'non_diegetic': 0, 'captions': 0, 'descriptions': 0, 'metadata': 0, 'dependent': 0, 'still_image': 0}, 'tags': {'BPS': '7974017', 'NUMBER_OF_FRAMES': '34382', 'NUMBER_OF_BYTES': '1429358655', '_STATISTICS_WRITING_APP': "mkvmerge v63.0.0 ('Everything') 64-bit", '_STATISTICS_WRITING_DATE_UTC': '2023-10-07 13:59:46', '_STATISTICS_TAGS': 'BPS DURATION NUMBER_OF_FRAMES NUMBER_OF_BYTES', 'ENCODER': 'Lavc61.3.100 libvpx-vp9', 'DURATION': '00:23:54.016000000'}}]
#[{'index': 1, 'codec_name': 'opus', 'codec_long_name': 'Opus (Opus Interactive Audio Codec)', 'codec_type': 'audio', 'codec_tag_string': '[0][0][0][0]', 'codec_tag': '0x0000', 'sample_fmt': 'fltp', 'sample_rate': '48000', 'channels': 2, 'channel_layout': 'stereo', 'bits_per_sample': 0, 'initial_padding': 312, 'r_frame_rate': '0/0', 'avg_frame_rate': '0/0', 'time_base': '1/1000', 'start_pts': -7, 'start_time': '-0.007000', 'extradata_size': 19, 'disposition': {'default': 1, 'dub': 0, 'original': 0, 'comment': 0, 'lyrics': 0, 'karaoke': 0, 'forced': 0, 'hearing_impaired': 0, 'visual_impaired': 0, 'clean_effects': 0, 'attached_pic': 0, 'timed_thumbnails': 0, 'non_diegetic': 0, 'captions': 0, 'descriptions': 0, 'metadata': 0, 'dependent': 0, 'still_image': 0}, 'tags': {'language': 'jpn', 'title': 'Japanisch', 'BPS': '128000', 'NUMBER_OF_FRAMES': '61763', 'NUMBER_OF_BYTES': '22946145', '_STATISTICS_WRITING_APP': "mkvmerge v63.0.0 ('Everything') 64-bit", '_STATISTICS_WRITING_DATE_UTC': '2023-10-07 13:59:46', '_STATISTICS_TAGS': 'BPS DURATION NUMBER_OF_FRAMES NUMBER_OF_BYTES', 'ENCODER': 'Lavc61.3.100 libopus', 'DURATION': '00:23:54.141000000'}}]
#[{'index': 2, 'codec_name': 'webvtt', 'codec_long_name': 'WebVTT subtitle', 'codec_type': 'subtitle', 'codec_tag_string': '[0][0][0][0]', 'codec_tag': '0x0000', 'r_frame_rate': '0/0', 'avg_frame_rate': '0/0', 'time_base': '1/1000', 'start_pts': -7, 'start_time': '-0.007000', 'duration_ts': 1434141, 'duration': '1434.141000', 'disposition': {'default': 1, 'dub': 0, 'original': 0, 'comment': 0, 'lyrics': 0, 'karaoke': 0, 'forced': 0, 'hearing_impaired': 0, 'visual_impaired': 0, 'clean_effects': 0, 'attached_pic': 0, 'timed_thumbnails': 0, 'non_diegetic': 0, 'captions': 0, 'descriptions': 0, 'metadata': 0, 'dependent': 0, 'still_image': 0}, 'tags': {'language': 'ger', 'title': 'Deutsch [Full]', 'BPS': '118', 'NUMBER_OF_FRAMES': '300', 'NUMBER_OF_BYTES': '21128', '_STATISTICS_WRITING_APP': "mkvmerge v63.0.0 ('Everything') 64-bit", '_STATISTICS_WRITING_DATE_UTC': '2023-10-07 13:59:46', '_STATISTICS_TAGS': 'BPS DURATION NUMBER_OF_FRAMES NUMBER_OF_BYTES', 'ENCODER': 'Lavc61.3.100 webvtt', 'DURATION': '00:23:54.010000000'}}, {'index': 3, 'codec_name': 'webvtt', 'codec_long_name': 'WebVTT subtitle', 'codec_type': 'subtitle', 'codec_tag_string': '[0][0][0][0]', 'codec_tag': '0x0000', 'r_frame_rate': '0/0', 'avg_frame_rate': '0/0', 'time_base': '1/1000', 'start_pts': -7, 'start_time': '-0.007000', 'duration_ts': 1434141, 'duration': '1434.141000', 'disposition': {'default': 0, 'dub': 0, 'original': 0, 'comment': 0, 'lyrics': 0, 'karaoke': 0, 'forced': 0, 'hearing_impaired': 0, 'visual_impaired': 0, 'clean_effects': 0, 'attached_pic': 0, 'timed_thumbnails': 0, 'non_diegetic': 0, 'captions': 0, 'descriptions': 0, 'metadata': 0, 'dependent': 0, 'still_image': 0}, 'tags': {'language': 'eng', 'title': 'Englisch [Full]', 'BPS': '101', 'NUMBER_OF_FRAMES': '276', 'NUMBER_OF_BYTES': '16980', '_STATISTICS_WRITING_APP': "mkvmerge v63.0.0 ('Everything') 64-bit", '_STATISTICS_WRITING_DATE_UTC': '2023-10-07 13:59:46', '_STATISTICS_TAGS': 'BPS DURATION NUMBER_OF_FRAMES NUMBER_OF_BYTES', 'ENCODER': 'Lavc61.3.100 webvtt', 'DURATION': '00:23:53.230000000'}}]
def getStreamData(self):
"""Returns ffprobe stream data as array with elements according to the following example
{
"index": 4,
"codec_name": "hdmv_pgs_subtitle",
"codec_long_name": "HDMV Presentation Graphic Stream subtitles",
"codec_type": "subtitle",
"codec_tag_string": "[0][0][0][0]",
"codec_tag": "0x0000",
"r_frame_rate": "0/0",
"avg_frame_rate": "0/0",
"time_base": "1/1000",
"start_pts": 0,
"start_time": "0.000000",
"duration_ts": 1421035,
"duration": "1421.035000",
"disposition": {
"default": 1,
"dub": 0,
"original": 0,
"comment": 0,
"lyrics": 0,
"karaoke": 0,
"forced": 0,
"hearing_impaired": 0,
"visual_impaired": 0,
"clean_effects": 0,
"attached_pic": 0,
"timed_thumbnails": 0,
"non_diegetic": 0,
"captions": 0,
"descriptions": 0,
"metadata": 0,
"dependent": 0,
"still_image": 0
},
"tags": {
"language": "ger",
"title": "German Full"
}
}
"""
ffprobeOutput, ffprobeError, returnCode = executeProcess(["ffprobe",
"-hide_banner",
"-show_streams",
"-of", "json",
self.__sourcePath])
if 'Invalid data found when processing input' in ffprobeError:
raise Exception(f"File {self.__sourcePath} does not contain valid stream data")
if returnCode != 0:
raise Exception(f"ffprobe returned with error {returnCode}")
return json.loads(ffprobeOutput)['streams']
# def getTrackDescriptor(self, streamObj):
# """Convert the stream describing json object into a track descriptor"""
#
# trackType = streamObj['codec_type']
#
# descriptor = {}
#
# if trackType in [t.label() for t in TrackType]:
#
# descriptor['type'] = trackType
#
# descriptor = {}
# descriptor['disposition_list'] = [t for d in (k for (k,v) in streamObj['disposition'].items() if v) if (t := TrackDisposition.find(d)) if t is not None]
#
# descriptor['tags'] = streamObj['tags'] if 'tags' in streamObj.keys() else {}
#
# if trackType == TrackType.AUDIO.label():
# descriptor['layout'] = AudioLayout.identify(streamObj)
#
# return descriptor
def getMediaDescriptor(self):
return MediaDescriptor.fromFfprobe(self.getFormatData(), self.getStreamData())
# formatData = self.getFormatData()
# streamData = self.getStreamData()

12
bin/ffx/help_screen.py Normal file
View File

@@ -0,0 +1,12 @@
from textual.app import App, ComposeResult
from textual.screen import Screen
from textual.widgets import Header, Footer, Placeholder, Label
class HelpScreen(Screen):
def __init__(self):
super().__init__()
context = self.app.getContext()
def compose(self) -> ComposeResult:
yield Placeholder("Help Screen")
yield Footer()

View File

@@ -1,7 +1,7 @@
from enum import Enum from enum import Enum
import difflib import difflib
class LanguageData(Enum): class IsoLanguage(Enum):
AFRIKAANS = {"name": "Afrikaans", "iso639_1": "af", "iso639_2": "afr"} AFRIKAANS = {"name": "Afrikaans", "iso639_1": "af", "iso639_2": "afr"}
ALBANIAN = {"name": "Albanian", "iso639_1": "sq", "iso639_2": "alb"} ALBANIAN = {"name": "Albanian", "iso639_1": "sq", "iso639_2": "alb"}
@@ -73,21 +73,34 @@ class LanguageData(Enum):
VIETNAMESE = {"name": "Vietnamese", "iso639_1": "vi", "iso639_2": "vie"} VIETNAMESE = {"name": "Vietnamese", "iso639_1": "vi", "iso639_2": "vie"}
WELSH = {"name": "Welsh", "iso639_1": "cy", "iso639_2": "wel"} WELSH = {"name": "Welsh", "iso639_1": "cy", "iso639_2": "wel"}
def find(name : str): UNDEFINED = {"name": "undefined", "iso639_1": "xx", "iso639_2": "und"}
@staticmethod
def find(label : str):
closestMatches = difflib.get_close_matches(label, [l.value["name"] for l in IsoLanguage], n=1)
if closestMatches:
foundLangs = [l for l in IsoLanguage if l.value['name'] == closestMatches[0]]
return foundLangs[0] if foundLangs else IsoLanguage.UNDEFINED
else:
return IsoLanguage.UNDEFINED
@staticmethod
def findThreeLetter(theeLetter : str):
foundLangs = [l for l in IsoLanguage if l.value['iso639_2'] == str(theeLetter)]
return foundLangs[0] if foundLangs else IsoLanguage.UNDEFINED
def label(self):
return str(self.value['name'])
def twoLetter(self):
return str(self.value['iso639_1'])
closestMatches = difflib.get_close_matches(name, [l.value["name"] for l in LanguageData], n=1) def threeLetter(self):
return str(self.value['iso639_2'])
if closestMatches:
foundLangs = [l for l in LanguageData if l.value['name'] == closestMatches[0]]
return foundLangs[0] if foundLangs else None
else:
return None
def get(lang : str):
selectedLangs = [l for l in LanguageData if l.value['iso639_2'] == lang]
if selectedLangs:
return selectedLangs[0]
else:
return None

View File

@@ -0,0 +1,43 @@
from ffx.track_type import TrackType
from ffx.track_descriptor import TrackDescriptor
class MediaDescriptor():
"""This class represents the structural content of a media file including streams and metadata"""
def __init__(self, **kwargs):
self.__mediaTags = kwargs['tags'] if 'tags' in kwargs.keys() else {}
self.__trackDescriptors = kwargs['trackDescriptors'] if 'trackDescriptors' in kwargs.keys() else {}
self.__clearTags = kwargs['clearTags'] if 'clearTags' in kwargs.keys() else False
@classmethod
def fromFfprobe(cls, formatData, streamData):
descriptors = {}
for streamObj in streamData:
trackType = TrackType.fromLabel(streamObj['codec_type'])
if trackType != TrackType.UNKNOWN:
if trackType.label() not in descriptors.keys():
descriptors[trackType.label()] = []
descriptors[trackType.label()].append(TrackDescriptor.fromFfprobe(streamObj))
return cls(tags=formatData['tags'] if 'tags' in formatData.keys() else {},
trackDescriptors = descriptors)
def getTags(self):
return self.__mediaTags
def getAudioTracks(self):
return self.__trackDescriptors[TrackType.AUDIO.label()] if TrackType.AUDIO.label() in self.__trackDescriptors.keys() else []
def getSubtitleTracks(self):
return self.__trackDescriptors[TrackType.SUBTITLE.label()] if TrackType.SUBTITLE.label() in self.__trackDescriptors.keys() else []

View File

View File

@@ -0,0 +1,28 @@
# from typing import List
from sqlalchemy import create_engine, Column, Integer, String, ForeignKey, Enum
from sqlalchemy.orm import relationship, declarative_base, sessionmaker
from .show import Base
class MediaTag(Base):
"""
relationship(argument, opt1, opt2, ...)
argument is string of class or Mapped class of the target entity
backref creates a bi-directional corresponding relationship (back_populates preferred)
back_populates points to the corresponding relationship (the actual class attribute identifier)
See: https://docs.sqlalchemy.org/en/(14|20)/orm/basic_relationships.html
"""
__tablename__ = 'media_tags'
# v1.x
id = Column(Integer, primary_key=True)
key = Column(String)
value = Column(String)
# v1.x
pattern_id = Column(Integer, ForeignKey('patterns.id', ondelete="CASCADE"))
pattern = relationship('Pattern', back_populates='media_tags')

76
bin/ffx/model/pattern.py Normal file
View File

@@ -0,0 +1,76 @@
from sqlalchemy import create_engine, Column, Integer, String, ForeignKey
from sqlalchemy.orm import relationship, sessionmaker, Mapped, backref
from .show import Base
from .track import Track
from ffx.media_descriptor import MediaDescriptor
class Pattern(Base):
__tablename__ = 'patterns'
# v1.x
id = Column(Integer, primary_key=True)
pattern = Column(String)
# v2.0
# id: Mapped[int] = mapped_column(Integer, primary_key=True)
# pattern: Mapped[str] = mapped_column(String, nullable=False)
# v1.x
show_id = Column(Integer, ForeignKey('shows.id', ondelete="CASCADE"))
show = relationship('Show', back_populates='patterns')
# v2.0
# show_id: Mapped[int] = mapped_column(ForeignKey("shows.id", ondelete="CASCADE"))
# show: Mapped["Show"] = relationship(back_populates="patterns")
tracks = relationship('Track', back_populates='pattern', cascade="all, delete")
media_tags = relationship('MediaTag', back_populates='pattern', cascade="all, delete")
# def getDescriptor(self):
#
# descriptor = {}
# descriptor['id'] = int(self.id)
# descriptor['pattern'] = str(self.pattern)
# descriptor['show_id'] = int(self.show_id)
#
# descriptor['tags'] = {}
# for t in self.media_tags:
# descriptor['tags'][str(t.key)] = str(t.value)
#
# return descriptor
def getShow(self):
pass
def getTracks(self):
pass
def getMediaDescriptor(self):
md = MediaDescriptor(tags = self.getDescriptor()['tags'])
for t in self.tracks:
md.appendTrack(t.getDescriptor())
return md
def getId(self):
return int(self.id)
def getPattern(self):
return str(self.pattern)
def getShowId(self):
return int(self.show_id)
def getTags(self):
return {str(k.value):str(v.value) for (k,v) in self.media_tags}

57
bin/ffx/model/show.py Normal file
View File

@@ -0,0 +1,57 @@
# from typing import List
from sqlalchemy import create_engine, Column, Integer, String, ForeignKey
from sqlalchemy.orm import relationship, declarative_base, sessionmaker
Base = declarative_base()
class Show(Base):
"""
relationship(argument, opt1, opt2, ...)
argument is string of class or Mapped class of the target entity
backref creates a bi-directional corresponding relationship (back_populates preferred)
back_populates points to the corresponding relationship (the actual class attribute identifier)
See: https://docs.sqlalchemy.org/en/(14|20)/orm/basic_relationships.html
"""
__tablename__ = 'shows'
# v1.x
id = Column(Integer, primary_key=True)
name = Column(String)
year = Column(Integer)
# v2.0
# id: Mapped[int] = mapped_column(Integer, primary_key=True)
# name: Mapped[str] = mapped_column(String, nullable=False)
# year: Mapped[int] = mapped_column(Integer, nullable=False)
# v1.x
#patterns = relationship('Pattern', back_populates='show', cascade="all, delete", passive_deletes=True)
patterns = relationship('Pattern', back_populates='show', cascade="all, delete")
# patterns = relationship('Pattern', back_populates='show', cascade="all")
# v2.0
# patterns: Mapped[List["Pattern"]] = relationship(back_populates="show", cascade="all, delete")
index_season_digits = Column(Integer, default=2)
index_episode_digits = Column(Integer, default=2)
indicator_season_digits = Column(Integer, default=2)
indicator_episode_digits = Column(Integer, default=2)
def getDesciptor(self):
descriptor = {}
descriptor['id'] = int(self.id)
descriptor['name'] = str(self.name)
descriptor['year'] = int(self.year)
descriptor['index_season_digits'] = int(self.index_season_digits)
descriptor['index_episode_digits'] = int(self.index_episode_digits)
descriptor['indicator_season_digits'] = int(self.indicator_season_digits)
descriptor['indicator_episode_digits'] = int(self.indicator_episode_digits)
return descriptor

193
bin/ffx/model/track.py Normal file
View File

@@ -0,0 +1,193 @@
# from typing import List
from sqlalchemy import create_engine, Column, Integer, String, ForeignKey
from sqlalchemy.orm import relationship, declarative_base, sessionmaker
from .show import Base
from ffx.track_type import TrackType
from ffx.iso_language import IsoLanguage
from ffx.track_disposition import TrackDisposition
from ffx.track_descriptor import TrackDescriptor
import click
class Track(Base):
"""
relationship(argument, opt1, opt2, ...)
argument is string of class or Mapped class of the target entity
backref creates a bi-directional corresponding relationship (back_populates preferred)
back_populates points to the corresponding relationship (the actual class attribute identifier)
See: https://docs.sqlalchemy.org/en/(14|20)/orm/basic_relationships.html
"""
__tablename__ = 'tracks'
# v1.x
id = Column(Integer, primary_key=True, autoincrement = True)
# P=pattern_id+sub_index+track_type
track_type = Column(Integer) # TrackType
index = Column(Integer)
sub_index = Column(Integer)
# v1.x
pattern_id = Column(Integer, ForeignKey('patterns.id', ondelete="CASCADE"))
pattern = relationship('Pattern', back_populates='tracks')
# language = Column(String) # IsoLanguage threeLetter
# title = Column(String)
track_tags = relationship('TrackTag', back_populates='track', cascade="all, delete", lazy="joined")
disposition_flags = Column(Integer)
def __init__(self, **kwargs):
trackType = kwargs.pop('track_type', None)
if trackType is not None:
self.track_type = int(trackType)
# language = kwargs.pop('language', None)
# if language is not None:
# self.language = str(language.threeLetter())
dispositionSet = kwargs.pop(TrackDescriptor.DISPOSITION_SET_KEY, set())
self.disposition_flags = int(TrackDisposition.toFlags(dispositionSet))
super().__init__(**kwargs)
@classmethod
def fromStreamObj(cls, streamObj, subIndex, patternId):
"""{
'index': 4,
'codec_name': 'hdmv_pgs_subtitle',
'codec_long_name': 'HDMV Presentation Graphic Stream subtitles',
'codec_type': 'subtitle',
'codec_tag_string': '[0][0][0][0]',
'codec_tag': '0x0000',
'r_frame_rate': '0/0',
'avg_frame_rate': '0/0',
'time_base': '1/1000',
'start_pts': 0,
'start_time': '0.000000',
'duration_ts': 1421035,
'duration': '1421.035000',
'disposition': {
'default': 1,
'dub': 0,
'original': 0,
'comment': 0,
'lyrics': 0,
'karaoke': 0,
'forced': 0,
'hearing_impaired': 0,
'visual_impaired': 0,
'clean_effects': 0,
'attached_pic': 0,
'timed_thumbnails': 0,
'non_diegetic': 0,
'captions': 0,
'descriptions': 0,
'metadata': 0,
'dependent': 0,
'still_image': 0
},
'tags': {
'language': 'ger',
'title': 'German Full'
}
}
# v1.x
id = Column(Integer, primary_key=True, autoincrement = True)
# P=pattern_id+sub_index+track_type
track_type = Column(Integer) # TrackType
sub_index = Column(Integer)
# v1.x
pattern_id = Column(Integer, ForeignKey('patterns.id', ondelete='CASCADE'))
pattern = relationship('Pattern', back_populates='tracks')
language = Column(String) # IsoLanguage threeLetter
title = Column(String)
track_tags = relationship('TrackTag', back_populates='track', cascade='all, delete')
disposition_flags = Column(Integer)
"""
trackType = streamObj['codec_type']
if trackType in [t.label() for t in TrackType]:
return cls(pattern_id = patternId,
sub_index = int(subIndex),
track_type = trackType,
disposition_flags = sum([2**t.index() for (k,v) in streamObj['disposition'].items() if v and (t := TrackDisposition.find(k)) is not None]))
else:
return None
def getId(self):
return int(self.id)
def getPatternId(self):
return int(self.pattern_id)
def getType(self):
return TrackType.fromIndex(self.track_type)
# def getIndex(self):
# return int(self.index)
def getSubIndex(self):
return int(self.sub_index)
def getLanguage(self):
tags = {t.key:t.value for t in self.track_tags}
return IsoLanguage.findThreeLetter(tags['language']) if 'language' in tags.keys() else IsoLanguage.UNDEFINED
def getTitle(self):
tags = {t.key:t.value for t in self.track_tags}
return tags['title'] if 'title' in tags.keys() else ''
def getDispositionSet(self):
return TrackDisposition.toSet(self.disposition_flags)
def getTags(self):
return {str(t.key):str(t.value) for t in self.track_tags}
def getDescriptor(self) -> TrackDescriptor:
kwargs = {}
kwargs[TrackDescriptor.ID_KEY] = self.getId()
kwargs[TrackDescriptor.PATTERN_ID_KEY] = self.getPatternId()
#kwargs[TrackDescriptor.SUB_INDEX_KEY] = self.getIndex()
kwargs[TrackDescriptor.SUB_INDEX_KEY] = self.getSubIndex()
kwargs[TrackDescriptor.TRACK_TYPE_KEY] = self.getType()
kwargs[TrackDescriptor.DISPOSITION_SET_KEY] = self.getDispositionSet()
kwargs[TrackDescriptor.TAGS_KEY] = self.getTags()
return TrackDescriptor(**kwargs)

View File

@@ -0,0 +1,28 @@
# from typing import List
from sqlalchemy import create_engine, Column, Integer, String, ForeignKey, Enum
from sqlalchemy.orm import relationship, declarative_base, sessionmaker
from .show import Base
class TrackTag(Base):
"""
relationship(argument, opt1, opt2, ...)
argument is string of class or Mapped class of the target entity
backref creates a bi-directional corresponding relationship (back_populates preferred)
back_populates points to the corresponding relationship (the actual class attribute identifier)
See: https://docs.sqlalchemy.org/en/(14|20)/orm/basic_relationships.html
"""
__tablename__ = 'track_tags'
# v1.x
id = Column(Integer, primary_key=True)
key = Column(String)
value = Column(String)
# v1.x
track_id = Column(Integer, ForeignKey('tracks.id', ondelete="CASCADE"))
track = relationship('Track', back_populates='track_tags')

View File

@@ -0,0 +1,154 @@
import click, re
from ffx.model.pattern import Pattern
class PatternController():
def __init__(self, context):
self.context = context
self.Session = self.context['database']['session'] # convenience
def addPattern(self, patternDescriptor):
try:
s = self.Session()
q = s.query(Pattern).filter(Pattern.show_id == int(patternDescriptor['show_id']), Pattern.pattern == str(patternDescriptor['pattern']))
if not q.count():
pattern = Pattern(show_id = int(patternDescriptor['show_id']),
pattern = str(patternDescriptor['pattern']))
s.add(pattern)
s.commit()
return patternDescriptor
else:
return {}
except Exception as ex:
raise click.ClickException(f"PatternController.addPattern(): {repr(ex)}")
finally:
s.close()
def updatePattern(self, patternId, patternDescriptor):
try:
s = self.Session()
q = s.query(Pattern).filter(Pattern.id == int(patternId))
if q.count():
pattern = q.first()
pattern.show_id = int(patternDescriptor['show_id'])
pattern.pattern = str(patternDescriptor['pattern'])
s.commit()
return True
else:
return False
except Exception as ex:
raise click.ClickException(f"PatternController.updatePattern(): {repr(ex)}")
finally:
s.close()
def findPattern(self, patternDescriptor):
try:
s = self.Session()
q = s.query(Pattern).filter(Pattern.show_id == int(patternDescriptor['show_id']), Pattern.pattern == str(patternDescriptor['pattern']))
if q.count():
pattern = q.first()
return int(pattern.id)
else:
return None
except Exception as ex:
raise click.ClickException(f"PatternController.findPattern(): {repr(ex)}")
finally:
s.close()
def getPattern(self, patternId : int):
if type(patternId) is not int:
raise ValueError(f"PatternController.getPattern(): Argument patternId is required to be of type int")
try:
s = self.Session()
q = s.query(Pattern).filter(Pattern.id == int(patternId))
return q.first() if q.count() else None
except Exception as ex:
raise click.ClickException(f"PatternController.getPattern(): {repr(ex)}")
finally:
s.close()
def deletePattern(self, patternId):
try:
s = self.Session()
q = s.query(Pattern).filter(Pattern.id == int(patternId))
if q.count():
#DAFUQ: https://stackoverflow.com/a/19245058
# q.delete()
pattern = q.first()
s.delete(pattern)
s.commit()
return True
return False
except Exception as ex:
raise click.ClickException(f"PatternController.deletePattern(): {repr(ex)}")
finally:
s.close()
def matchFilename(self, filename):
try:
s = self.Session()
q = s.query(Pattern)
matchedPatterns = [p for p in q.all() if re.search(p.pattern, filename)]
if matchedPatterns:
return matchedPatterns[0]
else:
return None
except Exception as ex:
raise click.ClickException(f"PatternController.findPattern(): {repr(ex)}")
finally:
s.close()
return result
def getMediaDescriptor(self, patternId):
try:
s = self.Session()
q = s.query(Pattern).filter(Pattern.id == int(patternId))
if q.count():
pattern = q.first()
#return self.getPatternDict(pattern)
return pattern.getMediaDescriptor()
except Exception as ex:
raise click.ClickException(f"PatternController.getPatternDescriptor(): {repr(ex)}")
finally:
s.close()

View File

@@ -0,0 +1,109 @@
import click
from textual.screen import Screen
from textual.widgets import Header, Footer, Static, Button
from textual.containers import Grid
from .show_controller import ShowController
from .pattern_controller import PatternController
# Screen[dict[int, str, int]]
class PatternDeleteScreen(Screen):
CSS = """
Grid {
grid-size: 2;
grid-rows: 2 auto;
grid-columns: 30 330;
height: 100%;
width: 100%;
padding: 1;
}
Input {
border: none;
}
Button {
border: none;
}
#toplabel {
height: 1;
}
.two {
column-span: 2;
}
.box {
height: 100%;
border: solid green;
}
"""
def __init__(self, patternId = None, showId = None):
super().__init__()
self.context = self.app.getContext()
self.Session = self.context['database']['session'] # convenience
self.__pc = PatternController(context = self.context)
self.__sc = ShowController(context = self.context)
self.pattern_id = patternId
self.pattern_obj = self.__pc.getPatternDescriptor(patternId) if patternId is not None else {}
self.show_obj = self.__sc.getShowDesciptor(showId) if showId is not None else {}
def on_mount(self):
if self.show_obj:
self.query_one("#showlabel", Static).update(f"{self.show_obj['id']} - {self.show_obj['name']} ({self.show_obj['year']})")
if self.pattern_obj:
self.query_one("#patternlabel", Static).update(str(self.pattern_obj['pattern']))
def compose(self):
yield Header()
with Grid():
yield Static("Are you sure to delete the following filename pattern?", id="toplabel", classes="two")
yield Static("", classes="two")
yield Static("Pattern")
yield Static("", id="patternlabel")
yield Static("", classes="two")
yield Static("from show")
yield Static("", id="showlabel")
yield Static("", classes="two")
yield Button("Delete", id="delete_button")
yield Button("Cancel", id="cancel_button")
yield Footer()
# Event handler for button press
def on_button_pressed(self, event: Button.Pressed) -> None:
if event.button.id == "delete_button":
if self.pattern_id is None:
raise click.ClickException('PatternDeleteScreen.on_button_pressed(): pattern id is undefined')
if self.__pc.deletePattern(self.pattern_id):
self.dismiss(self.pattern_obj)
else:
#TODO: Meldung
self.app.pop_screen()
if event.button.id == "cancel_button":
self.app.pop_screen()

View File

@@ -0,0 +1,440 @@
import click, re
from textual import events
from textual.app import App, ComposeResult
from textual.screen import Screen
from textual.widgets import Header, Footer, Static, Button, Input, DataTable
from textual.containers import Grid
from ffx.model.show import Show
from ffx.model.pattern import Pattern
from .pattern_controller import PatternController
from .show_controller import ShowController
from .track_controller import TrackController
from .track_details_screen import TrackDetailsScreen
from .track_delete_screen import TrackDeleteScreen
from ffx.track_type import TrackType
from ffx.track_disposition import TrackDisposition
from ffx.track_descriptor import TrackDescriptor
from textual.widgets._data_table import CellDoesNotExist
# Screen[dict[int, str, int]]
class PatternDetailsScreen(Screen):
CSS = """
Grid {
grid-size: 5 12;
grid-rows: 2 2 2 2 2 6 2 2 6 2 2 2;
grid-columns: 25 25 25 25 25;
height: 100%;
width: 100%;
padding: 1;
}
Input {
border: none;
}
Button {
border: none;
}
DataTable {
min-height: 6;
}
#toplabel {
height: 1;
}
.three {
column-span: 3;
}
.four {
column-span: 4;
}
.five {
column-span: 5;
}
.box {
height: 100%;
border: solid green;
}
"""
def __init__(self, patternId = None, showId = None):
super().__init__()
self.context = self.app.getContext()
self.Session = self.context['database']['session'] # convenience
self.__pc = PatternController(context = self.context)
self.__sc = ShowController(context = self.context)
self.__tc = TrackController(context = self.context)
self.__pattern : Pattern = self.__pc.getPattern(patternId) if patternId is not None else None
self.show_obj = self.__sc.getShowDesciptor(showId) if showId is not None else {}
def loadTracks(self, show_id):
try:
tracks = {}
tracks['audio'] = {}
tracks['subtitle'] = {}
s = self.Session()
q = s.query(Pattern).filter(Pattern.show_id == int(show_id))
return [{'id': int(p.id), 'pattern': p.pattern} for p in q.all()]
except Exception as ex:
raise click.ClickException(f"loadTracks(): {repr(ex)}")
finally:
s.close()
def updateAudioTracks(self):
self.audioStreamsTable.clear()
if self.__pattern is not None:
audioTracks = self.__tc.findAudioTracks(self.__pattern.getId())
for at in audioTracks:
dispoSet = at.getDispositionSet()
row = (at.getSubIndex(),
" ",
at.getLanguage().label(),
at.getTitle(),
'Yes' if TrackDisposition.DEFAULT in dispoSet else 'No',
'Yes' if TrackDisposition.FORCED in dispoSet else 'No')
self.audioStreamsTable.add_row(*map(str, row))
def updateSubtitleTracks(self):
self.subtitleStreamsTable.clear()
if self.__pattern is not None:
subtitleTracks = self.__tc.findSubtitleTracks(self.__pattern.getId())
for st in subtitleTracks:
dispoSet = st.getDispositionSet()
row = (st.getSubIndex(),
" ",
st.getLanguage().label(),
st.getTitle(),
'Yes' if TrackDisposition.DEFAULT in dispoSet else 'No',
'Yes' if TrackDisposition.FORCED in dispoSet else 'No')
self.subtitleStreamsTable.add_row(*map(str, row))
def on_mount(self):
if self.show_obj:
self.query_one("#showlabel", Static).update(f"{self.show_obj['id']} - {self.show_obj['name']} ({self.show_obj['year']})")
if self.__pattern is not None:
self.query_one("#pattern_input", Input).value = str(self.__pattern.getPattern())
self.updateAudioTracks()
self.updateSubtitleTracks()
def compose(self):
self.audioStreamsTable = DataTable(classes="five")
# Define the columns with headers
self.column_key_audio_subid = self.audioStreamsTable.add_column("Subindex", width=20)
self.column_key_audio_layout = self.audioStreamsTable.add_column("Layout", width=20)
self.column_key_audio_language = self.audioStreamsTable.add_column("Language", width=20)
self.column_key_audio_title = self.audioStreamsTable.add_column("Title", width=30)
self.column_key_audio_default = self.audioStreamsTable.add_column("Default", width=10)
self.column_key_audio_forced = self.audioStreamsTable.add_column("Forced", width=10)
self.audioStreamsTable.cursor_type = 'row'
self.subtitleStreamsTable = DataTable(classes="five")
# Define the columns with headers
self.column_key_subtitle_subid = self.subtitleStreamsTable.add_column("Subindex", width=20)
self.column_key_subtitle_spacer = self.subtitleStreamsTable.add_column(" ", width=20)
self.column_key_subtitle_language = self.subtitleStreamsTable.add_column("Language", width=20)
self.column_key_subtitle_title = self.subtitleStreamsTable.add_column("Title", width=30)
self.column_key_subtitle_default = self.subtitleStreamsTable.add_column("Default", width=10)
self.column_key_subtitle_forced = self.subtitleStreamsTable.add_column("Forced", width=10)
self.subtitleStreamsTable.cursor_type = 'row'
yield Header()
with Grid():
# 1
yield Static("Edit filename pattern" if self.__pattern is not None else "New filename pattern", id="toplabel")
yield Input(type="text", id="pattern_input", classes="four")
# 2
yield Static("from show")
yield Static("", id="showlabel", classes="three")
yield Button("Substitute pattern", id="patternbutton")
# 3
yield Static(" ", classes="five")
# 4
yield Static(" ", classes="five")
# 5
yield Static("Audio streams")
yield Static(" ")
if self.__pattern is not None:
yield Button("Add", id="button_add_audio_stream")
yield Button("Edit", id="button_edit_audio_stream")
yield Button("Delete", id="button_delete_audio_stream")
else:
yield Static("")
yield Static("")
yield Static("")
# 6
yield self.audioStreamsTable
# 7
yield Static(" ", classes="five")
# 8
yield Static("Subtitle streams")
yield Static(" ")
if self.__pattern is not None:
yield Button("Add", id="button_add_subtitle_stream")
yield Button("Edit", id="button_edit_subtitle_stream")
yield Button("Delete", id="button_delete_subtitle_stream")
else:
yield Static("")
yield Static("")
yield Static("")
# 9
yield self.subtitleStreamsTable
# 10
yield Static(" ", classes="five")
# 11
yield Button("Save", id="save_button")
yield Button("Cancel", id="cancel_button")
yield Footer()
def getPatternFromInput(self):
return str(self.query_one("#pattern_input", Input).value)
def getSelectedAudioTrackDescriptor(self):
if not self.__pattern:
return None
try:
# Fetch the currently selected row when 'Enter' is pressed
#selected_row_index = self.table.cursor_row
row_key, col_key = self.audioStreamsTable.coordinate_to_cell_key(self.audioStreamsTable.cursor_coordinate)
if row_key is not None:
selected_track_data = self.audioStreamsTable.get_row(row_key)
subIndex = int(selected_track_data[0])
return self.__tc.findTrack(self.__pattern.getId(), TrackType.AUDIO, subIndex).getDescriptor()
else:
return None
except CellDoesNotExist:
return None
def getSelectedSubtitleTrackDescriptor(self) -> TrackDescriptor:
if not self.__pattern is None:
return None
try:
# Fetch the currently selected row when 'Enter' is pressed
#selected_row_index = self.table.cursor_row
row_key, col_key = self.subtitleStreamsTable.coordinate_to_cell_key(self.subtitleStreamsTable.cursor_coordinate)
if row_key is not None:
selected_track_data = self.subtitleStreamsTable.get_row(row_key)
subIndex = int(selected_track_data[0])
return self.__tc.findTrack(self.__pattern.getId(), TrackType.SUBTITLE, subIndex).getDescriptor()
else:
return None
except CellDoesNotExist:
return None
# Event handler for button press
def on_button_pressed(self, event: Button.Pressed) -> None:
# Check if the button pressed is the one we are interested in
if event.button.id == "save_button":
patternDescriptor = {}
patternDescriptor['show_id'] = self.show_obj['id']
patternDescriptor['pattern'] = self.getPatternFromInput()
if self.__pattern is not None:
if self.__pc.updatePattern(self.__pattern.getId(), patternDescriptor):
self.dismiss(patternDescriptor)
else:
#TODO: Meldung
self.app.pop_screen()
else:
if self.__pc.addPattern(patternDescriptor):
self.dismiss(patternDescriptor)
else:
#TODO: Meldung
self.app.pop_screen()
if event.button.id == "cancel_button":
self.app.pop_screen()
# Save pattern when just created before adding streams
if self.__pattern is not None:
if event.button.id == "button_add_audio_stream":
self.app.push_screen(TrackDetailsScreen(trackType = TrackType.AUDIO, patternId = self.__pattern.getId(), subIndex = len(self.audioStreamsTable.rows)), self.handle_add_track)
selectedAudioTrack = self.getSelectedAudioTrackDescriptor()
if selectedAudioTrack is not None:
if event.button.id == "button_edit_audio_stream":
self.app.push_screen(TrackDetailsScreen(trackDescriptor = selectedAudioTrack), self.handle_edit_track)
if event.button.id == "button_delete_audio_stream":
self.app.push_screen(TrackDeleteScreen(trackDescriptor = selectedAudioTrack), self.handle_delete_track)
if event.button.id == "button_add_subtitle_stream":
self.app.push_screen(TrackDetailsScreen(trackType = TrackType.SUBTITLE, patternId = self.__pattern.getId(), subIndex = len(self.subtitleStreamsTable.rows)), self.handle_add_track)
selectedSubtitleTrack = self.getSelectedSubtitleTrackDescriptor()
if selectedSubtitleTrack is not None:
if event.button.id == "button_edit_subtitle_stream":
self.app.push_screen(TrackDetailsScreen(trackDescriptor = selectedSubtitleTrack), self.handle_edit_track)
if event.button.id == "button_delete_subtitle_stream":
self.app.push_screen(TrackDeleteScreen(trackDescriptor = selectedSubtitleTrack), self.handle_delete_track)
if event.button.id == "patternbutton":
INDICATOR_PATTERN = '([sS][0-9]+[eE][0-9]+)'
pattern = self.query_one("#pattern_input", Input).value
patternMatch = re.search(INDICATOR_PATTERN, pattern)
if patternMatch:
self.query_one("#pattern_input", Input).value = pattern.replace(patternMatch.group(1), INDICATOR_PATTERN)
def handle_add_track(self, trackDescriptor):
dispoSet = trackDescriptor.getDispositionSet()
trackType = trackDescriptor.getType()
subIndex = trackDescriptor.getSubIndex()
language = trackDescriptor.getLanguage()
title = trackDescriptor.getTitle()
if trackType == TrackType.AUDIO:
row = (subIndex,
" ",
language.label(),
title,
'Yes' if TrackDisposition.DEFAULT in dispoSet else 'No',
'Yes' if TrackDisposition.FORCED in dispoSet else 'No')
self.audioStreamsTable.add_row(*map(str, row))
if trackType == TrackType.SUBTITLE:
row = (subIndex,
" ",
language.label(),
title,
'Yes' if TrackDisposition.DEFAULT in dispoSet else 'No',
'Yes' if TrackDisposition.FORCED in dispoSet else 'No')
self.subtitleStreamsTable.add_row(*map(str, row))
def handle_edit_track(self, trackDescriptor : TrackDescriptor):
try:
if trackDescriptor.getType() == TrackType.AUDIO:
row_key, col_key = self.audioStreamsTable.coordinate_to_cell_key(self.audioStreamsTable.cursor_coordinate)
self.audioStreamsTable.update_cell(row_key, self.column_key_audio_language, trackDescriptor.getLanguage().label())
self.audioStreamsTable.update_cell(row_key, self.column_key_audio_title, trackDescriptor.getTitle())
self.audioStreamsTable.update_cell(row_key, self.column_key_audio_default, 'Yes' if TrackDisposition.DEFAULT in trackDescriptor.getDispositionSet() else 'No')
self.audioStreamsTable.update_cell(row_key, self.column_key_audio_forced, 'Yes' if TrackDisposition.FORCED in trackDescriptor.getDispositionSet() else 'No')
if trackDescriptor.getType() == TrackType.SUBTITLE:
row_key, col_key = self.subtitleStreamsTable.coordinate_to_cell_key(self.subtitleStreamsTable.cursor_coordinate)
self.subtitleStreamsTable.update_cell(row_key, self.column_key_subtitle_language, trackDescriptor.getLanguage().label())
self.subtitleStreamsTable.update_cell(row_key, self.column_key_subtitle_title, trackDescriptor.getTitle())
self.subtitleStreamsTable.update_cell(row_key, self.column_key_subtitle_default, 'Yes' if TrackDisposition.DEFAULT in trackDescriptor.getDispositionSet() else 'No')
self.subtitleStreamsTable.update_cell(row_key, self.column_key_subtitle_forced, 'Yes' if TrackDisposition.FORCED in trackDescriptor.getDispositionSet() else 'No')
except CellDoesNotExist:
pass
def handle_delete_track(self, trackDescriptor : TrackDescriptor):
try:
if trackDescriptor.getType() == TrackType.AUDIO:
self.updateAudioTracks()
if trackDescriptor.getType() == TrackType.SUBTITLE:
self.updateSubtitleTracks()
except CellDoesNotExist:
pass

6
bin/ffx/process.py Normal file
View File

@@ -0,0 +1,6 @@
import subprocess
def executeProcess(commandSequence):
process = subprocess.Popen(commandSequence, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
output, error = process.communicate()
return output.decode('utf-8'), error.decode('utf-8'), process.returncode

View File

@@ -0,0 +1,11 @@
from textual.app import App, ComposeResult
from textual.screen import Screen
from textual.widgets import Header, Footer, Placeholder, Label
class SettingsScreen(Screen):
def __init__(self):
super().__init__()
context = self.app.getContext()
def compose(self) -> ComposeResult:
yield Placeholder("Settings Screen")
yield Footer()

View File

@@ -1,2 +0,0 @@
class Show():
pass

View File

@@ -1,2 +1,103 @@
import click
from ffx.model.show import Show
class ShowController(): class ShowController():
pass
def __init__(self, context):
self.context = context
self.Session = self.context['database']['session'] # convenience
def getShowDesciptor(self, showId):
try:
s = self.Session()
q = s.query(Show).filter(Show.id == showId)
if q.count():
show = q.first()
return show.getDesciptor()
except Exception as ex:
raise click.ClickException(f"ShowController.getShowDesciptor(): {repr(ex)}")
finally:
s.close()
def updateShow(self, showDescriptor):
try:
s = self.Session()
q = s.query(Show).filter(Show.id == showDescriptor['id'])
if not q.count():
show = Show(id = int(showDescriptor['id']),
name = str(showDescriptor['name']),
year = int(showDescriptor['year']),
index_season_digits = showDescriptor['index_season_digits'],
index_episode_digits = showDescriptor['index_episode_digits'],
indicator_season_digits = showDescriptor['indicator_season_digits'],
indicator_episode_digits = showDescriptor['indicator_episode_digits'])
s.add(show)
s.commit()
return True
else:
currentShow = q.first()
changed = False
if currentShow.name != str(showDescriptor['name']):
currentShow.name = str(showDescriptor['name'])
changed = True
if currentShow.year != int(showDescriptor['year']):
currentShow.year = int(showDescriptor['year'])
changed = True
if currentShow.index_season_digits != int(showDescriptor['index_season_digits']):
currentShow.index_season_digits = int(showDescriptor['index_season_digits'])
changed = True
if currentShow.index_episode_digits != int(showDescriptor['index_episode_digits']):
currentShow.index_episode_digits = int(showDescriptor['index_episode_digits'])
changed = True
if currentShow.indicator_season_digits != int(showDescriptor['indicator_season_digits']):
currentShow.indicator_season_digits = int(showDescriptor['indicator_season_digits'])
changed = True
if currentShow.indicator_episode_digits != int(showDescriptor['indicator_episode_digits']):
currentShow.indicator_episode_digits = int(showDescriptor['indicator_episode_digits'])
changed = True
if changed:
s.commit()
return changed
except Exception as ex:
raise click.ClickException(f"ShowController.updateShow(): {repr(ex)}")
finally:
s.close()
def deleteShow(self, show_id):
try:
s = self.Session()
q = s.query(Show).filter(Show.id == int(show_id))
if q.count():
#DAFUQ: https://stackoverflow.com/a/19245058
# q.delete()
show = q.first()
s.delete(show)
s.commit()
return True
return False
except Exception as ex:
raise click.ClickException(f"ShowController.deleteShow(): {repr(ex)}")
finally:
s.close()

View File

@@ -0,0 +1,95 @@
from textual.screen import Screen
from textual.widgets import Header, Footer, Static, Button
from textual.containers import Grid
from .show_controller import ShowController
# Screen[dict[int, str, int]]
class ShowDeleteScreen(Screen):
CSS = """
Grid {
grid-size: 2;
grid-rows: 2 auto;
grid-columns: 30 auto;
height: 100%;
width: 100%;
padding: 1;
}
Input {
border: none;
}
Button {
border: none;
}
#toplabel {
height: 1;
}
.two {
column-span: 2;
}
.box {
height: 100%;
border: solid green;
}
"""
def __init__(self, showId = None):
super().__init__()
self.context = self.app.getContext()
self.Session = self.context['database']['session'] # convenience
self.__sc = ShowController(context = self.context)
self.show_obj = self.__sc.getShowDesciptor(showId) if showId is not None else {}
def on_mount(self):
if self.show_obj:
self.query_one("#showlabel", Static).update(f"{self.show_obj['id']} - {self.show_obj['name']} ({self.show_obj['year']})")
def compose(self):
yield Header()
with Grid():
yield Static("Are you sure to delete the following show?", id="toplabel", classes="two")
yield Static("", classes="two")
yield Static("", id="showlabel")
yield Static("")
yield Static("", classes="two")
yield Static("", classes="two")
yield Button("Delete", id="delete_button")
yield Button("Cancel", id="cancel_button")
yield Footer()
# Event handler for button press
def on_button_pressed(self, event: Button.Pressed) -> None:
if event.button.id == "delete_button":
if self.__sc.deleteShow(self.show_obj['id']):
self.dismiss(self.show_obj['id'])
else:
#TODO: Meldung
self.app.pop_screen()
if event.button.id == "cancel_button":
self.app.pop_screen()

View File

@@ -0,0 +1,314 @@
import click
from textual.screen import Screen
from textual.widgets import Header, Footer, Static, Button, DataTable, Input
from textual.containers import Grid
from textual.widgets._data_table import CellDoesNotExist
from ffx.model.show import Show
from ffx.model.pattern import Pattern
from .pattern_details_screen import PatternDetailsScreen
from .pattern_delete_screen import PatternDeleteScreen
from .show_controller import ShowController
from .pattern_controller import PatternController
# Screen[dict[int, str, int]]
class ShowDetailsScreen(Screen):
CSS = """
Grid {
grid-size: 5 14;
grid-rows: 2 2 2 2 2 2 2 2 2 2 2 6 2 2;
grid-columns: 30 30 30 30 30;
height: 100%;
width: 100%;
padding: 1;
}
Input {
border: none;
}
Button {
border: none;
}
DataTable {
column-span: 2;
min-height: 5;
}
#toplabel {
height: 1;
}
.two {
column-span: 2;
}
.three {
column-span: 3;
}
.four {
column-span: 4;
}
.five {
column-span: 5;
}
.box {
height: 100%;
border: solid green;
}
"""
BINDINGS = [
("a", "add_pattern", "Add Pattern"),
("e", "edit_pattern", "Edit Pattern"),
("r", "remove_pattern", "Remove Pattern"),
]
def __init__(self, showId = None):
super().__init__()
self.context = self.app.getContext()
self.Session = self.context['database']['session'] # convenience
self.__sc = ShowController(context = self.context)
self.__pc = PatternController(context = self.context)
self.show_obj = self.__sc.getShowDesciptor(showId) if showId is not None else {}
def loadPatterns(self, show_id):
try:
s = self.Session()
q = s.query(Pattern).filter(Pattern.show_id == int(show_id))
return [{'id': int(p.id), 'pattern': p.pattern} for p in q.all()]
except Exception as ex:
click.ClickException(f"loadPatterns(): {repr(ex)}")
finally:
s.close()
def on_mount(self):
if self.show_obj:
self.query_one("#id_wdg", Static).update(str(self.show_obj['id']))
self.query_one("#name_input", Input).value = str(self.show_obj['name'])
self.query_one("#year_input", Input).value = str(self.show_obj['year'])
self.query_one("#index_season_digits_input", Input).value = str(self.show_obj['index_season_digits'])
self.query_one("#index_episode_digits_input", Input).value = str(self.show_obj['index_episode_digits'])
self.query_one("#indicator_season_digits_input", Input).value = str(self.show_obj['indicator_season_digits'])
self.query_one("#indicator_episode_digits_input", Input).value = str(self.show_obj['indicator_episode_digits'])
for pattern in self.loadPatterns(int(self.show_obj['id'])):
row = (pattern['pattern'],)
self.patternTable.add_row(*map(str, row))
else:
self.query_one("#index_season_digits_input", Input).value = "2"
self.query_one("#index_episode_digits_input", Input).value = "2"
self.query_one("#indicator_season_digits_input", Input).value = "2"
self.query_one("#indicator_episode_digits_input", Input).value = "2"
def getSelectedPatternDescriptor(self):
selectedPattern = {}
try:
# Fetch the currently selected row when 'Enter' is pressed
#selected_row_index = self.table.cursor_row
row_key, col_key = self.patternTable.coordinate_to_cell_key(self.patternTable.cursor_coordinate)
if row_key is not None:
selected_row_data = self.patternTable.get_row(row_key)
selectedPattern['show_id'] = self.show_obj['id']
selectedPattern['pattern'] = str(selected_row_data[0])
except CellDoesNotExist:
pass
return selectedPattern
def action_add_pattern(self):
if self.show_obj:
self.app.push_screen(PatternDetailsScreen(showId = self.show_obj['id']), self.handle_add_pattern) # <-
def handle_add_pattern(self, screenResult):
pattern = (screenResult['pattern'],)
self.patternTable.add_row(*map(str, pattern))
def action_edit_pattern(self):
selectedPatternDescriptor = self.getSelectedPatternDescriptor()
if selectedPatternDescriptor:
selectedPatternId = self.__pc.findPattern(selectedPatternDescriptor)
if selectedPatternId is None:
raise click.ClickException(f"ShowDetailsScreen.action_edit_pattern(): Pattern to remove has no id")
self.app.push_screen(PatternDetailsScreen(patternId = selectedPatternId, showId = self.show_obj['id']), self.handle_edit_pattern) # <-
def handle_edit_pattern(self, screenResult):
try:
row_key, col_key = self.patternTable.coordinate_to_cell_key(self.patternTable.cursor_coordinate)
self.patternTable.update_cell(row_key, self.column_key_pattern, screenResult['pattern'])
except CellDoesNotExist:
pass
def action_remove_pattern(self):
selectedPatternDescriptor = self.getSelectedPatternDescriptor()
if selectedPatternDescriptor:
selectedPatternId = self.__pc.findPattern(selectedPatternDescriptor)
if selectedPatternId is None:
raise click.ClickException(f"ShowDetailsScreen.action_remove_pattern(): Pattern to remove has no id")
self.app.push_screen(PatternDeleteScreen(patternId = selectedPatternId, showId = self.show_obj['id']), self.handle_remove_pattern)
def handle_remove_pattern(self, screenResult):
try:
row_key, col_key = self.patternTable.coordinate_to_cell_key(self.patternTable.cursor_coordinate)
self.patternTable.remove_row(row_key)
except CellDoesNotExist:
pass
def compose(self):
# Create the DataTable widget
self.patternTable = DataTable(classes="five")
# Define the columns with headers
self.column_key_pattern = self.patternTable.add_column("Pattern", width=150)
self.patternTable.cursor_type = 'row'
yield Header()
with Grid():
# 1
yield Static("Show" if self.show_obj else "New Show", id="toplabel", classes="five")
# 2
yield Static("ID")
if self.show_obj:
yield Static("", id="id_wdg", classes="four")
else:
yield Input(type="integer", id="id_wdg", classes="four")
# 3
yield Static("Name")
yield Input(type="text", id="name_input", classes="four")
# 4
yield Static("Year")
yield Input(type="integer", id="year_input", classes="four")
#5
yield Static(" ", classes="five")
#6
yield Static("Index Season Digits")
yield Input(type="integer", id="index_season_digits_input", classes="four")
#7
yield Static("Index Episode Digits")
yield Input(type="integer", id="index_episode_digits_input", classes="four")
#8
yield Static("Indicator Season Digits")
yield Input(type="integer", id="indicator_season_digits_input", classes="four")
#9
yield Static("Indicator Edisode Digits")
yield Input(type="integer", id="indicator_episode_digits_input", classes="four")
# 10
yield Static(" ", classes="five")
# 11
yield Static("File patterns", classes="five")
# 12
yield self.patternTable
# 13
yield Static(" ", classes="five")
# 14
yield Button("Save", id="save_button")
yield Button("Cancel", id="cancel_button")
yield Footer()
def getShowDescriptorFromInput(self):
showDescriptor = {}
if self.show_obj:
showDescriptor['id'] = int(self.show_obj['id'])
else:
showDescriptor['id'] = int(self.query_one("#id_wdg", Input).value)
showDescriptor['name'] = str(self.query_one("#name_input", Input).value)
showDescriptor['year'] = int(self.query_one("#year_input", Input).value)
showDescriptor['index_season_digits'] = int(self.query_one("#index_season_digits_input", Input).value)
showDescriptor['index_episode_digits'] = int(self.query_one("#index_episode_digits_input", Input).value)
showDescriptor['indicator_season_digits'] = int(self.query_one("#indicator_season_digits_input", Input).value)
showDescriptor['indicator_episode_digits'] = int(self.query_one("#indicator_episode_digits_input", Input).value)
return showDescriptor
# Event handler for button press
def on_button_pressed(self, event: Button.Pressed) -> None:
# Check if the button pressed is the one we are interested in
if event.button.id == "save_button":
showDescriptor = self.getShowDescriptorFromInput()
if self.__sc.updateShow(showDescriptor):
self.dismiss(showDescriptor)
else:
#TODO: Meldung
self.app.pop_screen()
if event.button.id == "cancel_button":
self.app.pop_screen()

172
bin/ffx/shows_screen.py Normal file
View File

@@ -0,0 +1,172 @@
import click
from textual.app import App, ComposeResult
from textual.screen import Screen
from textual.widgets import Header, Footer, Placeholder, Label, ListView, ListItem, Static, DataTable, Button
from textual.containers import Grid
from ffx.model.show import Show
from .show_details_screen import ShowDetailsScreen
from .show_delete_screen import ShowDeleteScreen
from .help_screen import HelpScreen
from textual.widgets._data_table import CellDoesNotExist
class ShowsScreen(Screen):
CSS = """
Grid {
grid-size: 1;
grid-rows: 2 auto;
height: 100%;
width: 100%;
padding: 1;
}
#top {
height: 1;
}
#two {
column-span: 2;
row-span: 2;
tint: magenta 40%;
}
.box {
height: 100%;
border: solid green;
}
"""
BINDINGS = [
("e", "edit_show", "Edit Show"),
("n", "new_show", "New Show"),
("d", "delete_show", "Delete Show"),
]
def __init__(self):
super().__init__()
self.context = self.app.getContext()
self.Session = self.context['database']['session'] # convenience
def getSelectedShowId(self):
try:
# Fetch the currently selected row when 'Enter' is pressed
#selected_row_index = self.table.cursor_row
row_key, col_key = self.table.coordinate_to_cell_key(self.table.cursor_coordinate)
if row_key is not None:
selected_row_data = self.table.get_row(row_key)
return selected_row_data[0]
except CellDoesNotExist:
return None
def action_new_show(self):
self.app.push_screen(ShowDetailsScreen(), self.handle_new_screen)
def handle_new_screen(self, screenResult):
show = (screenResult['id'], screenResult['name'], screenResult['year'])
self.table.add_row(*map(str, show))
def action_edit_show(self):
selectedShowId = self.getSelectedShowId()
if selectedShowId is not None:
self.app.push_screen(ShowDetailsScreen(showId = selectedShowId), self.handle_edit_screen)
def handle_edit_screen(self, screenResult):
try:
row_key, col_key = self.table.coordinate_to_cell_key(self.table.cursor_coordinate)
self.table.update_cell(row_key, self.column_key_name, screenResult['name'])
self.table.update_cell(row_key, self.column_key_year, screenResult['year'])
except CellDoesNotExist:
pass
def action_delete_show(self):
selectedShowId = self.getSelectedShowId()
if selectedShowId is not None:
self.app.push_screen(ShowDeleteScreen(showId = selectedShowId), self.handle_delete_show)
def handle_delete_show(self, screenResult):
try:
row_key, col_key = self.table.coordinate_to_cell_key(self.table.cursor_coordinate)
self.table.remove_row(row_key)
except CellDoesNotExist:
pass
def loadShows(self):
try:
s = self.Session()
q = s.query(Show)
return [(int(s.id), s.name, s.year) for s in q.all()]
except Exception as ex:
raise click.ClickException(f"ShowsScreen.loadShows(): {repr(ex)}")
finally:
s.close()
def on_mount(self) -> None:
for show in self.loadShows():
self.table.add_row(*map(str, show)) # Convert each element to a string before adding
def compose(self):
# Create the DataTable widget
self.table = DataTable()
# Define the columns with headers
self.column_key_id = self.table.add_column("ID", width=10)
self.column_key_name = self.table.add_column("Name", width=50)
self.column_key_year = self.table.add_column("Year", width=10)
self.table.cursor_type = 'row'
yield Header()
with Grid():
yield Static("Shows")
yield self.table
yield Footer()

View File

@@ -1,56 +0,0 @@
from language_data import LanguageData
from stream_type import StreamType
class StreamDescriptor():
def __init__(self,
streamType : StreamType,
language : LanguageData,
title : str,
codec : str,
subIndex : int = -1):
self.__streamType = streamType
self.__subIndex = subIndex
self.__streamLanguage = language
self.__streamTitle = title
self.__codecName = codec
# "index": 4,
# "codec_name": "hdmv_pgs_subtitle",
# "codec_long_name": "HDMV Presentation Graphic Stream subtitles",
# "codec_type": "subtitle",
# "codec_tag_string": "[0][0][0][0]",
# "codec_tag": "0x0000",
# "r_frame_rate": "0/0",
# "avg_frame_rate": "0/0",
# "time_base": "1/1000",
# "start_pts": 0,
# "start_time": "0.000000",
# "duration_ts": 1421035,
# "duration": "1421.035000",
# "disposition": {
# "default": 1,
# "dub": 0,
# "original": 0,
# "comment": 0,
# "lyrics": 0,
# "karaoke": 0,
# "forced": 0,
# "hearing_impaired": 0,
# "visual_impaired": 0,
# "clean_effects": 0,
# "attached_pic": 0,
# "timed_thumbnails": 0,
# "non_diegetic": 0,
# "captions": 0,
# "descriptions": 0,
# "metadata": 0,
# "dependent": 0,
# "still_image": 0
# },
# "tags": {
# "language": "ger",
# "title": "German Full"

View File

@@ -1,6 +0,0 @@
from enum import Enum
class StreamType(Enum):
VIDEO = 1
AUDIO = 2
SUBTITLE = 3

183
bin/ffx/tag_controller.py Normal file
View File

@@ -0,0 +1,183 @@
import click
from ffx.model.track import Track
from ffx.model.media_tag import MediaTag
from ffx.model.track_tag import TrackTag
class TagController():
def __init__(self, context):
self.context = context
self.Session = self.context['database']['session'] # convenience
def updateMediaTag(self, trackId, tagKey, tagValue):
try:
s = self.Session()
q = s.query(MediaTag).filter(MediaTag.track_id == int(trackId),
MediaTag.key == str(tagKey),
MediaTag.value == str(tagValue))
tag = q.first()
if tag:
tag.value = str(tagValue)
else:
tag = MediaTag(track_id = int(trackId),
key = str(tagKey),
value = str(tagValue))
s.add(tag)
s.commit()
return int(tag.id)
except Exception as ex:
raise click.ClickException(f"TagController.updateTrackTag(): {repr(ex)}")
finally:
s.close()
def updateTrackTag(self, trackId, tagKey, tagValue):
try:
s = self.Session()
q = s.query(TrackTag).filter(TrackTag.track_id == int(trackId),
TrackTag.key == str(tagKey),
TrackTag.value == str(tagValue))
tag = q.first()
if tag:
tag.value = str(tagValue)
else:
tag = TrackTag(track_id = int(trackId),
key = str(tagKey),
value = str(tagValue))
s.add(tag)
s.commit()
return int(tag.id)
except Exception as ex:
raise click.ClickException(f"TagController.updateTrackTag(): {repr(ex)}")
finally:
s.close()
def findAllMediaTags(self, trackId) -> dict:
try:
s = self.Session()
q = s.query(MediaTag).filter(MediaTag.track_id == int(trackId))
if q.count():
return {t.key:t.value for t in q.all()}
else:
return {}
except Exception as ex:
raise click.ClickException(f"TagController.findAllMediaTags(): {repr(ex)}")
finally:
s.close()
def findAllTrackTags(self, trackId) -> dict:
try:
s = self.Session()
q = s.query(TrackTag).filter(TrackTag.track_id == int(trackId))
if q.count():
return {t.key:t.value for t in q.all()}
else:
return {}
except Exception as ex:
raise click.ClickException(f"TagController.findAllTracks(): {repr(ex)}")
finally:
s.close()
def findMediaTag(self, trackId : int, trackKey : str) -> MediaTag:
try:
s = self.Session()
q = s.query(Track).filter(MediaTag.track_id == int(trackId), MediaTag.key == str(trackKey))
if q.count():
return q.first()
else:
return None
except Exception as ex:
raise click.ClickException(f"TagController.findMediaTag(): {repr(ex)}")
finally:
s.close()
def findTrackTag(self, trackId : int, tagKey : str) -> TrackTag:
try:
s = self.Session()
q = s.query(TrackTag).filter(TrackTag.track_id == int(trackId), TrackTag.key == str(tagKey))
if q.count():
return q.first()
else:
return None
except Exception as ex:
raise click.ClickException(f"TagController.findTrackTag(): {repr(ex)}")
finally:
s.close()
def deleteMediaTag(self, tagId) -> bool:
try:
s = self.Session()
q = s.query(MediaTag).filter(MediaTag.id == int(tagId))
if q.count():
tag = q.first()
s.delete(tag)
s.commit()
return True
return False
except Exception as ex:
raise click.ClickException(f"TagController.deleteMediaTag(): {repr(ex)}")
finally:
s.close()
def deleteTrackTag(self, tagId : int) -> bool:
if type(tagId) is not int:
raise TypeError('TagController.deleteTrackTag(): Argument tagId is required to be of type int')
try:
s = self.Session()
q = s.query(TrackTag).filter(TrackTag.id == int(tagId))
if q.count():
tag = q.first()
s.delete(tag)
s.commit()
return True
return False
except Exception as ex:
raise click.ClickException(f"TagController.deleteTrackTag(): {repr(ex)}")
finally:
s.close()

View File

@@ -0,0 +1,98 @@
from textual.screen import Screen
from textual.widgets import Header, Footer, Static, Button
from textual.containers import Grid
# Screen[dict[int, str, int]]
class TagDeleteScreen(Screen):
CSS = """
Grid {
grid-size: 4 9;
grid-rows: 2 2 2 2 2 2 2 2 2;
grid-columns: 30 30 30 30;
height: 100%;
width: 100%;
padding: 1;
}
Input {
border: none;
}
Button {
border: none;
}
#toplabel {
height: 1;
}
.two {
column-span: 2;
}
.three {
column-span: 3;
}
.four {
column-span: 4;
}
.five {
column-span: 5;
}
.box {
height: 100%;
border: solid green;
}
"""
def __init__(self, key=None, value=None):
super().__init__()
self.__key = key
self.__value = value
def on_mount(self):
self.query_one("#keylabel", Static).update(str(self.__key))
self.query_one("#valuelabel", Static).update(str(self.__value))
def compose(self):
yield Header()
with Grid():
#1
yield Static(f"Are you sure to delete this tag ?", id="toplabel", classes="five")
#2
yield Static("Key")
yield Static(" ", id="keylabel", classes="four")
#3
yield Static("Value")
yield Static(" ", id="valuelabel", classes="four")
#4
yield Static(" ", classes="five")
#9
yield Button("Delete", id="delete_button")
yield Button("Cancel", id="cancel_button")
yield Footer()
# Event handler for button press
def on_button_pressed(self, event: Button.Pressed) -> None:
if event.button.id == "delete_button":
tag = (self.__key, self.__value)
self.dismiss(tag)
if event.button.id == "cancel_button":
self.app.pop_screen()

View File

@@ -0,0 +1,121 @@
from textual.screen import Screen
from textual.widgets import Header, Footer, Static, Button, Input
from textual.containers import Grid
# Screen[dict[int, str, int]]
class TagDetailsScreen(Screen):
CSS = """
Grid {
grid-size: 5 20;
grid-rows: 2 2 2 2 2 3 2 2 2 2 2 6 2 2 6 2 2 2 2 6;
grid-columns: 25 25 25 25 225;
height: 100%;
width: 100%;
padding: 1;
}
Input {
border: none;
}
Button {
border: none;
}
SelectionList {
border: none;
min-height: 6;
}
Select {
border: none;
}
DataTable {
min-height: 6;
}
#toplabel {
height: 1;
}
.two {
column-span: 2;
}
.three {
column-span: 3;
}
.four {
column-span: 4;
}
.five {
column-span: 5;
}
.box {
height: 100%;
border: solid green;
}
"""
def __init__(self, key=None, value=None):
super().__init__()
self.__key = key
self.__value = value
def on_mount(self):
if self.__key is not None:
self.query_one("#key_input", Input).value = str(self.__key)
if self.__value is not None:
self.query_one("#value_input", Input).value = str(self.__value)
def compose(self):
yield Header()
with Grid():
# 8
yield Static("Key")
yield Input(id="key_input", classes="four")
yield Static("Value")
yield Input(id="value_input", classes="four")
# 17
yield Static(" ", classes="five")
# 18
yield Button("Save", id="save_button")
yield Button("Cancel", id="cancel_button")
# 19
yield Static(" ", classes="five")
# 20
yield Static(" ", classes="five", id="messagestatic")
yield Footer(id="footer")
def getTagFromInput(self):
tagKey = self.query_one("#key_input", Input).value
tagValue = self.query_one("#value_input", Input).value
return (tagKey, tagValue)
# Event handler for button press
def on_button_pressed(self, event: Button.Pressed) -> None:
# Check if the button pressed is the one we are interested in
if event.button.id == "save_button":
self.dismiss(self.getTagFromInput())
if event.button.id == "cancel_button":
self.app.pop_screen()

View File

@@ -0,0 +1,98 @@
import os, click, requests
class TmdbController():
DEFAULT_LANGUAGE = 'de-DE'
def __init__(self):
try:
self.__tmdbApiKey = os.environ['TMDB_API_KEY']
except KeyError:
click.ClickException('TMDB api key is not available, please set environment variable TMDB_API_KEY')
self.tmdbLanguage = TmdbController.DEFAULT_LANGUAGE
def queryTmdb(self, showId, season, episode):
"""
First level keys in the response object:
air_date str 'YYY-MM-DD'
crew []
episode_number int
guest_stars []
name str
overview str
id int
production_code
runtime int
season_number int
still_path str '/filename.jpg'
vote_average float
vote_count int
"""
urlParams = f"?language={self.tmdbLanguage}&api_key={self.__tmdbApiKey}"
tmdbUrl = f"https://api.themoviedb.org/3/tv/{showId}/season/{season}/episode/{episode}{urlParams}"
return requests.get(tmdbUrl).json()
def getEpisodeFilename(self,
showName,
episodeName,
season,
episode,
extension,
indexSeasonDigits = 2,
indexEpisodeDigits = 2,
indicatorSeasonDigits = 2,
indicatorEpisodeDigits = 2):
"""
One Piece:
indexSeasonDigits = 0,
indexEpisodeDigits = 4,
indicatorSeasonDigits = 2,
indicatorEpisodeDigits = 4
Three-Body:
indexSeasonDigits = 0,
indexEpisodeDigits = 2,
indicatorSeasonDigits = 2,
indicatorEpisodeDigits = 2
Dragonball:
indexSeasonDigits = 0,
indexEpisodeDigits = 3,
indicatorSeasonDigits = 2,
indicatorEpisodeDigits = 3
Boruto:
indexSeasonDigits = 0,
indexEpisodeDigits = 4,
indicatorSeasonDigits = 2,
indicatorEpisodeDigits = 4
"""
filenameTokens = [str(showName), ' - ']
if indexSeasonDigits:
filenameTokens += ['{num:{fill}{width}}'.format(num=season, fill='0', width=indexSeasonDigits)]
if indexEpisodeDigits:
filenameTokens += ['{num:{fill}{width}}'.format(num=episode, fill='0', width=indexEpisodeDigits)]
if indexSeasonDigits or indexEpisodeDigits:
filenameTokens += [' ']
filenameTokens += [episodeName]
if indicatorSeasonDigits or indicatorEpisodeDigits:
filenameTokens += [' - ']
if indicatorSeasonDigits:
filenameTokens += ['S{num:{fill}{width}}'.format(num=season, fill='0', width=indicatorSeasonDigits)]
if indicatorEpisodeDigits:
filenameTokens += ['E{num:{fill}{width}}'.format(num=episode, fill='0', width=indicatorEpisodeDigits)]
filenameTokens += ['.', extension]
return ''.join(filenameTokens)

165
bin/ffx/track_controller.py Normal file
View File

@@ -0,0 +1,165 @@
import click
from ffx.model.track import Track
from .track_type import TrackType
from .track_disposition import TrackDisposition
from .iso_language import IsoLanguage
from .track_type import TrackType
from ffx.model.track_tag import TrackTag
from ffx.track_descriptor import TrackDescriptor
class TrackController():
def __init__(self, context):
self.context = context
self.Session = self.context['database']['session'] # convenience
def addTrack(self, trackDescriptor):
try:
s = self.Session()
track = Track(pattern_id = int(trackDescriptor.getPatternId()),
track_type = int(trackDescriptor.getType().index()),
sub_index = int(trackDescriptor.getSubIndex()),
disposition_flags = int(TrackDisposition.toFlags(trackDescriptor.getDispositionSet())))
s.add(track)
s.commit()
for k,v in trackDescriptor.getTags().items():
tag = TrackTag(track_id = track.id,
key = k,
value = v)
s.add(tag)
s.commit()
except Exception as ex:
raise click.ClickException(f"TrackController.addTrack(): {repr(ex)}")
finally:
s.close()
def updateTrack(self, trackId, trackDescriptor : TrackDescriptor):
if type(trackDescriptor) is not TrackDescriptor:
raise TypeError('TrackController.updateTrack(): Argument trackDescriptor is required to be of type TrackDescriptor')
try:
s = self.Session()
q = s.query(Track).filter(Track.id == int(trackId))
if q.count():
track : Track = q.first()
track.sub_index = int(trackDescriptor.getSubIndex())
track.disposition_flags = int(TrackDisposition.toFlags(trackDescriptor.getDispositionSet()))
descriptorTagKeys = trackDescriptor.getTags()
tagKeysInDescriptor = set(descriptorTagKeys.keys())
tagKeysInDb = {t.key for t in track.track_tags}
for k in tagKeysInDescriptor & tagKeysInDb: # to update
tags = [t for t in track.track_tags if t.key == k]
tags[0].value = descriptorTagKeys[k]
for k in tagKeysInDescriptor - tagKeysInDb: # to add
tag = TrackTag(track_id=track.id, key=k, value=descriptorTagKeys[k])
s.add(tag)
for k in tagKeysInDb - tagKeysInDescriptor: # to remove
tags = [t for t in track.track_tags if t.key == k]
s.delete(tags[0])
s.commit()
return True
else:
return False
except Exception as ex:
raise click.ClickException(f"TrackController.updateTrack(): {repr(ex)}")
finally:
s.close()
def findAudioTracks(self, patternId):
try:
s = self.Session()
q = s.query(Track).filter(Track.pattern_id == int(patternId), Track.track_type == TrackType.AUDIO.index())
return [a for a in q.all()]
except Exception as ex:
raise click.ClickException(f"TrackController.findAudioTracks(): {repr(ex)}")
finally:
s.close()
def findSubtitleTracks(self, patternId):
try:
s = self.Session()
q = s.query(Track).filter(Track.pattern_id == int(patternId), Track.track_type == TrackType.SUBTITLE.index())
return [s for s in q.all()]
except Exception as ex:
raise click.ClickException(f"TrackController.findSubtitleTracks(): {repr(ex)}")
finally:
s.close()
def findTrack(self, patternId : int, trackType : TrackType, subIndex : int) -> Track:
try:
s = self.Session()
q = s.query(Track).filter(Track.pattern_id == int(patternId), Track.track_type == trackType.index(), Track.sub_index == int(subIndex))
if q.count():
return q.first()
else:
return None
except Exception as ex:
raise click.ClickException(f"TrackController.findTrack(): {repr(ex)}")
finally:
s.close()
def deleteTrack(self, trackId):
try:
s = self.Session()
q = s.query(Track).filter(Track.id == int(trackId))
if q.count():
track = q.first()
q_siblings = s.query(Track).filter(Track.pattern_id == track.getPatternId(), Track.track_type == track.getType().index()).order_by(Track.sub_index)
subIndex = 0
for track in q_siblings.all():
if track.sub_index == track.getSubIndex():
s.delete(track)
else:
track.sub_index = subIndex
subIndex += 1
s.commit()
return True
return False
except Exception as ex:
raise click.ClickException(f"TrackController.deleteTrack(): {repr(ex)}")
finally:
s.close()

View File

@@ -0,0 +1,140 @@
import click
from textual import events
from textual.app import App, ComposeResult
from textual.screen import Screen
from textual.widgets import Header, Footer, Static, Button
from textual.containers import Grid
from ffx.model.pattern import Pattern
from ffx.track_descriptor import TrackDescriptor
from .track_controller import TrackController
from .track_type import TrackType
# Screen[dict[int, str, int]]
class TrackDeleteScreen(Screen):
CSS = """
Grid {
grid-size: 4 9;
grid-rows: 2 2 2 2 2 2 2 2 2;
grid-columns: 30 30 30 30;
height: 100%;
width: 100%;
padding: 1;
}
Input {
border: none;
}
Button {
border: none;
}
#toplabel {
height: 1;
}
.two {
column-span: 2;
}
.three {
column-span: 3;
}
.four {
column-span: 4;
}
.box {
height: 100%;
border: solid green;
}
"""
def __init__(self, trackDescriptor : TrackDescriptor):
super().__init__()
self.context = self.app.getContext()
self.Session = self.context['database']['session'] # convenience
if type(trackDescriptor) is not TrackDescriptor:
raise click.ClickException('TrackDeleteScreen.init(): trackDescriptor is required to be of type TrackDescriptor')
self.__tc = TrackController(context = self.context)
self.__trackDescriptor = trackDescriptor
def on_mount(self):
self.query_one("#subindexlabel", Static).update(str(self.__trackDescriptor.getSubIndex()))
self.query_one("#patternlabel", Static).update(str(self.__trackDescriptor.getPatternId()))
self.query_one("#languagelabel", Static).update(str(self.__trackDescriptor.getLanguage().label()))
self.query_one("#titlelabel", Static).update(str(str(self.__trackDescriptor.getTitle())))
def compose(self):
yield Header()
with Grid():
#1
yield Static(f"Are you sure to delete the following {self.__trackDescriptor.getType().label()} track?", id="toplabel", classes="four")
#2
yield Static("sub index")
yield Static(" ", id="subindexlabel", classes="three")
#3
yield Static("from pattern")
yield Static(" ", id="patternlabel", classes="three")
#4
yield Static(" ", classes="four")
#5
yield Static("Language")
yield Static(" ", id="languagelabel", classes="three")
#6
yield Static("Title")
yield Static(" ", id="titlelabel", classes="three")
#7
yield Static(" ", classes="four")
#8
yield Static(" ", classes="four")
#9
yield Button("Delete", id="delete_button")
yield Button("Cancel", id="cancel_button")
yield Footer()
# Event handler for button press
def on_button_pressed(self, event: Button.Pressed) -> None:
if event.button.id == "delete_button":
track = self.__tc.findTrack(self.__trackDescriptor.getPatternId(), self.__trackDescriptor.getType(), self.__trackDescriptor.getSubIndex())
if track is None:
raise click.ClickException(f"Track is none: patternId={self.__trackDescriptor.getPatternId()} type={self.__trackDescriptor.getType()} subIndex={self.__trackDescriptor.getSubIndex()}")
if track is not None:
if self.__tc.deleteTrack(track.getId()):
self.dismiss(self.__trackDescriptor)
else:
#TODO: Meldung
self.app.pop_screen()
if event.button.id == "cancel_button":
self.app.pop_screen()

181
bin/ffx/track_descriptor.py Normal file
View File

@@ -0,0 +1,181 @@
from .iso_language import IsoLanguage
from .track_type import TrackType
from .audio_layout import AudioLayout
from .track_disposition import TrackDisposition
class TrackDescriptor():
ID_KEY = 'id'
INDEX_KEY = 'index'
SUB_INDEX_KEY = 'sub_index'
PATTERN_ID_KEY = 'pattern_id'
TRACK_TYPE_KEY = 'track_type'
DISPOSITION_SET_KEY = 'disposition_set'
TAGS_KEY = 'tags'
AUDIO_LAYOUT_KEY = 'audio_layout'
FFPROBE_DISPOSITION_KEY = 'disposition'
FFPROBE_TAGS_KEY = 'tags'
def __init__(self, **kwargs):
if TrackDescriptor.ID_KEY in kwargs.keys():
if type(kwargs[TrackDescriptor.ID_KEY]) is not int:
raise TypeError(f"TrackDesciptor.__init__(): Argument {TrackDescriptor.ID_KEY} is required to be of type int")
self.__trackId = kwargs[TrackDescriptor.ID_KEY]
else:
self.__trackId = -1
if TrackDescriptor.PATTERN_ID_KEY in kwargs.keys():
if type(kwargs[TrackDescriptor.PATTERN_ID_KEY]) is not int:
raise TypeError(f"TrackDesciptor.__init__(): Argument {TrackDescriptor.PATTERN_ID_KEY} is required to be of type int")
self.__patternId = kwargs[TrackDescriptor.PATTERN_ID_KEY]
else:
self.__patternId = -1
if TrackDescriptor.INDEX_KEY in kwargs.keys():
if type(kwargs[TrackDescriptor.INDEX_KEY]) is not int:
raise TypeError(f"TrackDesciptor.__init__(): Argument {TrackDescriptor.INDEX_KEY} is required to be of type int")
self.__index = kwargs[TrackDescriptor.INDEX_KEY]
else:
self.__index = -1
if TrackDescriptor.SUB_INDEX_KEY in kwargs.keys():
if type(kwargs[TrackDescriptor.SUB_INDEX_KEY]) is not int:
raise TypeError(f"TrackDesciptor.__init__(): Argument {TrackDescriptor.SUB_INDEX_KEY} is required to be of type dict")
self.__subIndex = kwargs[TrackDescriptor.SUB_INDEX_KEY]
else:
self.__subIndex = -1
if TrackDescriptor.TRACK_TYPE_KEY in kwargs.keys():
if type(kwargs[TrackDescriptor.TRACK_TYPE_KEY]) is not TrackType:
raise TypeError(f"TrackDesciptor.__init__(): Argument {TrackDescriptor.TRACK_TYPE_KEY} is required to be of type TrackType")
self.__trackType = kwargs[TrackDescriptor.TRACK_TYPE_KEY]
else:
self.__trackType = TrackType.UNKNOWN
if TrackDescriptor.TAGS_KEY in kwargs.keys():
if type(kwargs[TrackDescriptor.TAGS_KEY]) is not dict:
raise TypeError(f"TrackDesciptor.__init__(): Argument {TrackDescriptor.TAGS_KEY} is required to be of type dict")
self.__trackTags = kwargs[TrackDescriptor.TAGS_KEY]
else:
self.__trackTags = {}
if TrackDescriptor.DISPOSITION_SET_KEY in kwargs.keys():
if type(kwargs[TrackDescriptor.DISPOSITION_SET_KEY]) is not set:
raise TypeError(f"TrackDesciptor.__init__(): Argument {TrackDescriptor.DISPOSITION_SET_KEY} is required to be of type set")
for d in kwargs[TrackDescriptor.DISPOSITION_SET_KEY]:
if type(d) is not TrackDisposition:
raise TypeError(f"TrackDesciptor.__init__(): All elements of argument set {TrackDescriptor.DISPOSITION_SET_KEY} is required to be of type TrackDisposition")
self.__dispositionSet = kwargs[TrackDescriptor.DISPOSITION_SET_KEY]
else:
self.__dispositionSet = set()
if TrackDescriptor.AUDIO_LAYOUT_KEY in kwargs.keys():
if type(kwargs[TrackDescriptor.AUDIO_LAYOUT_KEY]) is not AudioLayout:
raise TypeError(f"TrackDesciptor.__init__(): Argument {TrackDescriptor.AUDIO_LAYOUT_KEY} is required to be of type AudioLayout")
self.__audioLayout = kwargs[TrackDescriptor.AUDIO_LAYOUT_KEY]
else:
self.__audioLayout = AudioLayout.LAYOUT_UNDEFINED
@classmethod
def fromFfprobe(cls, streamObj):
"""Processes ffprobe stream data as array with elements according to the following example
{
"index": 4,
"codec_name": "hdmv_pgs_subtitle",
"codec_long_name": "HDMV Presentation Graphic Stream subtitles",
"codec_type": "subtitle",
"codec_tag_string": "[0][0][0][0]",
"codec_tag": "0x0000",
"r_frame_rate": "0/0",
"avg_frame_rate": "0/0",
"time_base": "1/1000",
"start_pts": 0,
"start_time": "0.000000",
"duration_ts": 1421035,
"duration": "1421.035000",
"disposition": {
"default": 1,
"dub": 0,
"original": 0,
"comment": 0,
"lyrics": 0,
"karaoke": 0,
"forced": 0,
"hearing_impaired": 0,
"visual_impaired": 0,
"clean_effects": 0,
"attached_pic": 0,
"timed_thumbnails": 0,
"non_diegetic": 0,
"captions": 0,
"descriptions": 0,
"metadata": 0,
"dependent": 0,
"still_image": 0
},
"tags": {
"language": "ger",
"title": "German Full"
}
}
"""
trackType = TrackType.fromLabel(streamObj['codec_type']) if 'codec_type' in streamObj.keys() else TrackType.UNKNOWN
if trackType != TrackType.UNKNOWN:
kwargs = {}
kwargs[TrackDescriptor.TRACK_TYPE_KEY] = trackType
kwargs[TrackDescriptor.DISPOSITION_SET_KEY] = {t for d in (k for (k,v) in streamObj[TrackDescriptor.FFPROBE_DISPOSITION_KEY].items() if v)
if (t := TrackDisposition.find(d)) if t is not None} if TrackDescriptor.FFPROBE_DISPOSITION_KEY in streamObj.keys() else set()
kwargs[TrackDescriptor.TAGS_KEY] = streamObj[TrackDescriptor.FFPROBE_TAGS_KEY] if TrackDescriptor.FFPROBE_TAGS_KEY in streamObj.keys() else {}
kwargs[TrackDescriptor.AUDIO_LAYOUT_KEY] = AudioLayout.identify(streamObj) if trackType == TrackType.AUDIO.label() else AudioLayout.LAYOUT_UNDEFINED
return cls(**kwargs)
else:
return None
def getId(self):
return self.__trackId
def getPatternId(self):
return self.__patternId
def getIndex(self):
return self.__index
def getSubIndex(self):
return self.__subIndex
def getType(self):
return self.__trackType
def getLanguage(self):
if 'language' in self.__trackTags.keys():
return IsoLanguage.findThreeLetter(self.__trackTags['language'])
else:
return IsoLanguage.UNDEFINED
def getTitle(self):
if 'title' in self.__trackTags.keys():
return str(self.__trackTags['title'])
else:
return ''
def getAudioLayout(self):
return self.__audioLayout
def getTags(self):
return self.__trackTags
def getDispositionSet(self):
return self.__dispositionSet

View File

@@ -0,0 +1,377 @@
import click, time
from textual import events
from textual.app import App, ComposeResult
from textual.screen import Screen
from textual.widgets import Header, Footer, Static, Button, SelectionList, Select, DataTable, Input
from textual.containers import Grid
from ffx.model.pattern import Pattern
from .track_controller import TrackController
from .pattern_controller import PatternController
from .tag_controller import TagController
from .track_type import TrackType
from .iso_language import IsoLanguage
from .track_disposition import TrackDisposition
from .audio_layout import AudioLayout
from .track_descriptor import TrackDescriptor
from .tag_details_screen import TagDetailsScreen
from .tag_delete_screen import TagDeleteScreen
from textual.widgets._data_table import CellDoesNotExist
# Screen[dict[int, str, int]]
class TrackDetailsScreen(Screen):
CSS = """
Grid {
grid-size: 5 20;
grid-rows: 2 2 2 2 2 3 2 2 2 2 2 6 2 2 6 2 2 2 2 6;
grid-columns: 25 25 25 25 225;
height: 100%;
width: 100%;
padding: 1;
}
Input {
border: none;
}
Button {
border: none;
}
SelectionList {
border: none;
min-height: 6;
}
Select {
border: none;
}
DataTable {
min-height: 6;
}
#toplabel {
height: 1;
}
.two {
column-span: 2;
}
.three {
column-span: 3;
}
.four {
column-span: 4;
}
.five {
column-span: 5;
}
.box {
height: 100%;
border: solid green;
}
"""
def __init__(self, trackDescriptor = None, patternId = None, trackType : TrackType = None, subIndex = None):
super().__init__()
self.context = self.app.getContext()
self.Session = self.context['database']['session'] # convenience
self.__tc = TrackController(context = self.context)
self.__pc = PatternController(context = self.context)
self.__tac = TagController(context = self.context)
self.__isNew = trackDescriptor is None
if self.__isNew:
self.__trackType = trackType
self.__subIndex = subIndex
self.__trackDescriptor : TrackDescriptor = None
self.__pattern : Pattern = self.__pc.getPattern(patternId) if patternId is not None else {}
else:
self.__trackType = trackDescriptor.getType()
self.__subIndex = trackDescriptor.getSubIndex()
self.__trackDescriptor : TrackDescriptor = trackDescriptor
self.__pattern : Pattern = self.__pc.getPattern(self.__trackDescriptor.getPatternId())
def updateTags(self):
self.trackTagsTable.clear()
trackId = self.__trackDescriptor.getId()
if trackId != -1:
trackTags = self.__tac.findAllTrackTags(trackId)
for k,v in trackTags.items():
if k != 'language' and k != 'title':
row = (k,v)
self.trackTagsTable.add_row(*map(str, row))
def on_mount(self):
if self.__pattern is not None:
self.query_one("#patternlabel", Static).update(self.__pattern.getPattern())
if self.__subIndex is not None:
self.query_one("#subindexlabel", Static).update(str(self.__subIndex))
for d in TrackDisposition:
dispositionIsSet = (self.__trackDescriptor is not None
and d in self.__trackDescriptor.getDispositionSet())
dispositionOption = (d.label(), d.index(), dispositionIsSet)
self.query_one("#dispositions_selection_list", SelectionList).add_option(dispositionOption)
if self.__trackDescriptor is not None:
self.query_one("#language_select", Select).value = self.__trackDescriptor.getLanguage().label()
self.query_one("#title_input", Input).value = self.__trackDescriptor.getTitle()
self.updateTags()
def compose(self):
self.trackTagsTable = DataTable(classes="five")
# Define the columns with headers
self.column_key_track_tag_key = self.trackTagsTable.add_column("Key", width=10)
self.column_key_track_tag_value = self.trackTagsTable.add_column("Value", width=30)
self.trackTagsTable.cursor_type = 'row'
languages = [l.label() for l in IsoLanguage]
yield Header()
with Grid():
# 1
yield Static(f"New {self.__trackType.label()} stream" if self.__isNew else f"Edit {self.__trackType.label()} stream", id="toplabel", classes="five")
# 2
yield Static("for pattern")
yield Static("", id="patternlabel", classes="four")
# 3
yield Static("sub index")
yield Static("", id="subindexlabel", classes="four")
# 4
yield Static(" ", classes="five")
# 5
yield Static(" ", classes="five")
# 6
yield Static("Language")
yield Select.from_values(languages, classes="four", id="language_select")
# 7
yield Static(" ", classes="five")
# 8
yield Static("Title")
yield Input(id="title_input", classes="four")
# 9
yield Static(" ", classes="five")
# 10
yield Static(" ", classes="five")
# 11
yield Static("Stream tags")
yield Static(" ")
yield Button("Add", id="button_add_stream_tag")
yield Button("Edit", id="button_edit_stream_tag")
yield Button("Delete", id="button_delete_stream_tag")
# 12
yield self.trackTagsTable
# 13
yield Static(" ", classes="five")
# 14
yield Static("Stream dispositions", classes="five")
# 15
yield SelectionList[int](
classes="five",
id = "dispositions_selection_list"
)
# 16
yield Static(" ", classes="five")
# 17
yield Static(" ", classes="five")
# 18
yield Button("Save", id="save_button")
yield Button("Cancel", id="cancel_button")
# 19
yield Static(" ", classes="five")
# 20
yield Static(" ", classes="five", id="messagestatic")
yield Footer(id="footer")
def getTrackDescriptorFromInput(self):
kwargs = {}
kwargs[TrackDescriptor.PATTERN_ID_KEY] = int(self.__pattern.getId())
kwargs[TrackDescriptor.INDEX_KEY] = -1
kwargs[TrackDescriptor.SUB_INDEX_KEY] = self.__subIndex
kwargs[TrackDescriptor.TRACK_TYPE_KEY] = self.__trackType
trackTags = {}
language = self.query_one("#language_select", Select).value
if language:
trackTags['language'] = IsoLanguage.find(language).threeLetter()
title = self.query_one("#title_input", Input).value
if title:
trackTags['title'] = title
tableTags = {row[0]:row[1] for r in self.trackTagsTable.rows if (row := self.trackTagsTable.get_row(r)) and row[0] != 'language' and row[0] != 'title'}
kwargs[TrackDescriptor.TAGS_KEY] = trackTags | tableTags
dispositionFlags = sum([2**f for f in self.query_one("#dispositions_selection_list", SelectionList).selected])
kwargs[TrackDescriptor.DISPOSITION_SET_KEY] = TrackDisposition.toSet(dispositionFlags)
kwargs[TrackDescriptor.AUDIO_LAYOUT_KEY] = AudioLayout.LAYOUT_UNDEFINED
return TrackDescriptor(**kwargs)
def getSelectedTag(self):
try:
# Fetch the currently selected row when 'Enter' is pressed
#selected_row_index = self.table.cursor_row
row_key, col_key = self.trackTagsTable.coordinate_to_cell_key(self.trackTagsTable.cursor_coordinate)
if row_key is not None:
selected_tag_data = self.trackTagsTable.get_row(row_key)
tagKey = str(selected_tag_data[0])
tagValue = str(selected_tag_data[1])
return tagKey, tagValue
else:
return None
except CellDoesNotExist:
return None
# Event handler for button press
def on_button_pressed(self, event: Button.Pressed) -> None:
# Check if the button pressed is the one we are interested in
if event.button.id == "save_button":
trackDescriptor = self.getTrackDescriptorFromInput()
# Check for multiple default/forced disposition flags
if self.__trackType == TrackType.AUDIO:
trackList = self.__tc.findAudioTracks(self.__pattern.getId())
elif self.__trackType == TrackType.SUBTITLE:
trackList = self.__tc.findSubtitleTracks(self.__pattern.getId())
else:
trackList = []
siblingTrackList = [t for t in trackList if t.getType() == self.__trackType and t.getSubIndex() != self.__subIndex]
numDefaultTracks = len([t for t in siblingTrackList if TrackDisposition.DEFAULT in t.getDispositionSet()])
numForcedTracks = len([t for t in siblingTrackList if TrackDisposition.FORCED in t.getDispositionSet()])
if ((TrackDisposition.DEFAULT in trackDescriptor.getDispositionSet() and numDefaultTracks)
or (TrackDisposition.FORCED in trackDescriptor.getDispositionSet() and numForcedTracks)):
self.query_one("#messagestatic", Static).update("Cannot add another stream with disposition flag 'debug' or 'forced' set")
else:
self.query_one("#messagestatic", Static).update(" ")
if self.__isNew:
self.__tc.addTrack(trackDescriptor)
self.dismiss(trackDescriptor)
else:
track = self.__tc.findTrack(self.__pattern.getId(), self.__trackType, self.__subIndex)
if self.__tc.updateTrack(track.getId(), trackDescriptor):
self.dismiss(trackDescriptor)
else:
self.app.pop_screen()
if event.button.id == "cancel_button":
self.app.pop_screen()
if event.button.id == "button_add_stream_tag":
if not self.__isNew:
self.app.push_screen(TagDetailsScreen(), self.handle_update_tag)
if event.button.id == "button_edit_stream_tag":
tagKey, tagValue = self.getSelectedTag()
self.app.push_screen(TagDetailsScreen(key=tagKey, value=tagValue), self.handle_update_tag)
if event.button.id == "button_delete_stream_tag":
tagKey, tagValue = self.getSelectedTag()
self.app.push_screen(TagDeleteScreen(key=tagKey, value=tagValue), self.handle_delete_tag)
def handle_update_tag(self, tag):
trackId = self.__trackDescriptor.getId()
if trackId == -1:
raise click.ClickException(f"TrackDetailsScreen.handle_add_tag: trackId not set (-1) trackDescriptor={self.__trackDescriptor}")
if self.__tac.updateTrackTag(trackId, tag[0], tag[1]) is not None:
self.updateTags()
def handle_delete_tag(self, trackTag):
trackId = self.__trackDescriptor.getId()
if trackId == -1:
raise click.ClickException(f"TrackDetailsScreen.handle_delete_tag: trackId not set (-1) trackDescriptor={self.__trackDescriptor}")
tag = self.__tac.findTrackTag(trackId, trackTag[0])
if tag is not None:
if self.__tac.deleteTrackTag(tag.id):
self.updateTags()

View File

@@ -0,0 +1,65 @@
import difflib, click
from enum import Enum
class TrackDisposition(Enum):
DEFAULT = {"name": "default", "index": 0}
FORCED = {"name": "forced", "index": 1}
DUB = {"name": "dub", "index": 2}
ORIGINAL = {"name": "original", "index": 3}
COMMENT = {"name": "comment", "index": 4}
LYRICS = {"name": "lyrics", "index": 5}
KARAOKE = {"name": "karaoke", "index": 6}
HEARING_IMPAIRED = {"name": "hearing_impaired", "index": 7}
VISUAL_IMPAIRED = {"name": "visual_impaired", "index": 8}
CLEAN_EFFECTS = {"name": "clean_effects", "index": 9}
ATTACHED_PIC = {"name": "attached_pic", "index": 10}
TIMED_THUMBNAILS = {"name": "timed_thumbnails", "index": 11}
NON_DIEGETICS = {"name": "non_diegetic", "index": 12}
CAPTIONS = {"name": "captions", "index": 13}
DESCRIPTIONS = {"name": "descriptions", "index": 14}
METADATA = {"name": "metadata", "index": 15}
DEPENDENT = {"name": "dependent", "index": 16}
STILL_IMAGE = {"name": "still_image", "index": 17}
def label(self):
return str(self.value['name'])
def index(self):
return int(self.value['index'])
@staticmethod
def toFlags(dispositionSet):
"""Flags stored in integer bits (2**index)"""
if type(dispositionSet) is not set:
raise click.ClickException('TrackDisposition.toFlags(): Argument is not of type set')
flags = 0
for d in dispositionSet:
if type(d) is not TrackDisposition:
raise click.ClickException('TrackDisposition.toFlags(): Element not of type TrackDisposition')
flags += 2 ** d.index()
return flags
@staticmethod
def toSet(flags):
dispositionSet = set()
for d in TrackDisposition:
if flags & int(2 ** d.index()):
dispositionSet.add(d)
return dispositionSet
@staticmethod
def find(label):
matchingDispositions = [d for d in TrackDisposition if d.label() == str(label)]
if matchingDispositions:
return matchingDispositions[0]
else:
return None

34
bin/ffx/track_type.py Normal file
View File

@@ -0,0 +1,34 @@
from enum import Enum
class TrackType(Enum):
VIDEO = {'label': 'video', 'index': 1}
AUDIO = {'label': 'audio', 'index': 2}
SUBTITLE = {'label': 'subtitle', 'index': 3}
UNKNOWN = {'label': 'unknown', 'index': 0}
def label(self):
"""Returns the stream type as string"""
return str(self.value['label'])
def index(self):
"""Returns the stream type index"""
return int(self.value['index'])
@staticmethod
def fromLabel(label):
tlist = [t for t in TrackType if t.value['label'] == label]
if tlist:
return tlist[0]
else:
return TrackType.UNKNOWN
@staticmethod
def fromIndex(index):
tlist = [t for t in TrackType if t.value['index'] == index]
if tlist:
return tlist[0]
else:
return TrackType.UNKNOWN

11
bin/ffx/warning_screen.py Normal file
View File

@@ -0,0 +1,11 @@
from textual.app import App, ComposeResult
from textual.screen import Screen
from textual.widgets import Header, Footer, Placeholder, Label
class WarningScreen(Screen):
def __init__(self):
super().__init__()
context = self.app.getContext()
def compose(self) -> ComposeResult:
yield Label("Warning! This file is not compliant to the defined source schema!")
yield Footer()