2 Commits

Author SHA1 Message Date
Maveno
6ac0c76d65 Merge branch 'click' of gitea.maveno.de:Javanaut/ffx into click 2024-09-22 15:40:07 +02:00
dd8f472ac5 fix 2024-09-11 19:42:52 +02:00
37 changed files with 173 additions and 2874 deletions

2
.gitignore vendored
View File

@@ -1,2 +0,0 @@
__pycache__
junk/

View File

@@ -1,32 +0,0 @@
import os
from ffx.pattern_controller import PatternController
from ffx.model.show import Base
from sqlalchemy import create_engine, Column, Integer, String, ForeignKey
from sqlalchemy.orm import relationship, sessionmaker, Mapped, backref
filename = 'Boruto.Naruto.Next.Generations.S01E256.GerEngSub.AAC.1080p.WebDL.x264-Tanuki.mkv'
# Data 'input' variable
context = {}
# Initialize DB
homeDir = os.path.expanduser("~")
ffxVarDir = os.path.join(homeDir, '.local', 'var', 'ffx')
if not os.path.exists(ffxVarDir):
os.makedirs(ffxVarDir)
context['database_url'] = f"sqlite:///{os.path.join(ffxVarDir, 'ffx.db')}"
context['database_engine'] = create_engine(context['database_url'])
context['database_session'] = sessionmaker(bind=context['database_engine'])
Base.metadata.create_all(context['database_engine'])
pc = PatternController(context)
print(pc.matchFilename(filename))

View File

@@ -1,10 +1,10 @@
#! /usr/bin/python3 #! /usr/bin/python3
import os, sys, subprocess, json, click, time, re import os, sys, subprocess, json, click, time
from textual.app import App, ComposeResult from textual.app import App, ComposeResult
from textual.screen import Screen from textual.screen import Screen
from textual.widgets import Header, Footer, Placeholder, Label from textual.widgets import Header, Footer, Placeholder
VERSION='0.1.0' VERSION='0.1.0'
@@ -35,7 +35,7 @@ MKVMERGE_METADATA_KEYS = ['BPS',
'_STATISTICS_WRITING_DATE_UTC', '_STATISTICS_WRITING_DATE_UTC',
'_STATISTICS_TAGS'] '_STATISTICS_TAGS']
FILE_EXTENSIONS = ['mkv', 'mp4', 'avi', 'flv', 'webm'] FILE_EXTENSION = ['mkv', 'mp4', 'avi', 'flv', 'webm']
COMMAND_TOKENS = ['ffmpeg', '-y', '-i'] COMMAND_TOKENS = ['ffmpeg', '-y', '-i']
@@ -50,9 +50,6 @@ STREAM_LAYOUT_5_1 = '5.1(side)'
STREAM_LAYOUT_STEREO = 'stereo' STREAM_LAYOUT_STEREO = 'stereo'
STREAM_LAYOUT_6CH = '6ch' STREAM_LAYOUT_6CH = '6ch'
SEASON_EPISODE_INDICATOR_MATCH = '([sS][0-9]+)([eE][0-9]+)'
SEASON_INDICATOR_MATCH = '([sS][0-9]+)'
EPISODE_INDICATOR_MATCH = '([eE][0-9]+)'
class DashboardScreen(Screen): class DashboardScreen(Screen):
@@ -68,14 +65,6 @@ class DashboardScreen(Screen):
yield Placeholder("Dashboard Screen") yield Placeholder("Dashboard Screen")
yield Footer() yield Footer()
class WarningScreen(Screen):
def __init__(self):
super().__init__()
context = self.app.getContext()
def compose(self) -> ComposeResult:
yield Label("Warning! This file is not compliant to the defined source schema!")
yield Footer()
class SettingsScreen(Screen): class SettingsScreen(Screen):
def __init__(self): def __init__(self):
@@ -98,31 +87,29 @@ class HelpScreen(Screen):
class ModesApp(App): class ModesApp(App):
BINDINGS = [ BINDINGS = [
("q", "quit()", "Quit"), ("d", "switch_mode('dashboard')", "Dashboard"),
# ("d", "switch_mode('dashboard')", "Dashboard"), ("s", "switch_mode('settings')", "Settings"),
# ("s", "switch_mode('settings')", "Settings"), ("h", "switch_mode('help')", "Help"),
# ("h", "switch_mode('help')", "Help"),
] ]
MODES = { MODES = {
"warning": WarningScreen,
"dashboard": DashboardScreen, "dashboard": DashboardScreen,
"settings": SettingsScreen, "settings": SettingsScreen,
"help": HelpScreen, "help": HelpScreen,
} }
def __init__(self, context = {}): def __init__(self, context = {}):
super().__init__() super().__init__()
self.context = context self.context = context
def on_mount(self) -> None: def on_mount(self) -> None:
self.switch_mode("warning") self.switch_mode("dashboard")
def getContext(self): def getContext(self):
return self.context return self.context
def executeProcess(commandSequence): def executeProcess(commandSequence):
process = subprocess.Popen(commandSequence, stdout=subprocess.PIPE, stderr=subprocess.PIPE) process = subprocess.Popen(commandSequence, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
@@ -367,10 +354,8 @@ def streams(filename):
@click.option("-c", "--clear-metadata", is_flag=True, default=False) @click.option("-c", "--clear-metadata", is_flag=True, default=False)
@click.option("-d", "--denoise", is_flag=True, default=False) @click.option("-d", "--denoise", is_flag=True, default=False)
@click.option("-o", "--output-directory", type=str, default='')
def convert(ctx, paths, label, video_encoder, quality, preset, stereo_bitrate, ac3_bitrate, dts_bitrate, crop, clear_metadata, default_subtitle, forced_audio, default_audio, denoise):
def convert(ctx, paths, label, video_encoder, quality, preset, stereo_bitrate, ac3_bitrate, dts_bitrate, crop, clear_metadata, default_subtitle, forced_audio, default_audio, denoise, output_directory):
"""Batch conversion of audiovideo files in format suitable for web playback, e.g. jellyfin """Batch conversion of audiovideo files in format suitable for web playback, e.g. jellyfin
Files found under PATHS will be converted according to parameters. Files found under PATHS will be converted according to parameters.
@@ -378,79 +363,56 @@ def convert(ctx, paths, label, video_encoder, quality, preset, stereo_bitrate, a
Suffices will we appended to filename in case of multiple created files Suffices will we appended to filename in case of multiple created files
or if the filename has not changed.""" or if the filename has not changed."""
startTime = time.perf_counter() #startTime = time.perf_counter()
context = ctx.obj #sourcePath = paths[0]
#targetFilename = paths[1]
#if not os.path.isfile(sourcePath):
# raise click.ClickException(f"There is no file with path {sourcePath}")
#click.echo(f"src: {sourcePath} tgt: {targetFilename}")
click.echo(f"\nVideo encoder: {video_encoder}") #click.echo(f"ve={video_encoder}")
qualityTokens = quality.split(',')
q_list = [q for q in qualityTokens if q.isnumeric()]
click.echo(f"Qualities: {q_list}")
ctx.obj['bitrates'] = {} #qualityTokens = quality.split(',')
ctx.obj['bitrates']['stereo'] = str(stereo_bitrate) if str(stereo_bitrate).endswith('k') else f"{stereo_bitrate}k"
ctx.obj['bitrates']['ac3'] = str(ac3_bitrate) if str(ac3_bitrate).endswith('k') else f"{ac3_bitrate}k"
ctx.obj['bitrates']['dts'] = str(dts_bitrate) if str(dts_bitrate).endswith('k') else f"{dts_bitrate}k"
click.echo(f"Stereo bitrate: {ctx.obj['bitrates']['stereo']}") #q_list = [q for q in qualityTokens if q.isnumeric()]
click.echo(f"AC3 bitrate: {ctx.obj['bitrates']['ac3']}")
click.echo(f"DTS bitrate: {ctx.obj['bitrates']['dts']}")
ctx.obj['perform_crop'] = (crop != 'none') #click.echo(q_list)
if ctx.obj['perform_crop']: #ctx.obj['bitrates'] = {}
#ctx.obj['bitrates']['stereo'] = str(stereo_bitrate) if str(stereo_bitrate).endswith('k') else f"{stereo_bitrate}k"
cropTokens = crop.split(',') #ctx.obj['bitrates']['ac3'] = str(ac3_bitrate) if str(ac3_bitrate).endswith('k') else f"{ac3_bitrate}k"
#ctx.obj['bitrates']['dts'] = str(dts_bitrate) if str(dts_bitrate).endswith('k') else f"{dts_bitrate}k"
if cropTokens and len(cropTokens) == 2:
ctx.obj['crop_start'], ctx.obj['crop_length'] = crop.split(',')
else:
ctx.obj['crop_start'] = DEFAULT_CROP_START
ctx.obj['crop_length'] = DEFAULT_CROP_LENGTH
click.echo(f"crop start={ctx.obj['crop_start']} length={ctx.obj['crop_length']}")
click.echo(f"\nRunning {len(paths) * len(q_list)} jobs") #click.echo(f"a={ctx.obj['bitrates']['stereo']}")
#click.echo(f"ac3={ctx.obj['bitrates']['ac3']}")
#click.echo(f"dts={ctx.obj['bitrates']['dts']}")
se_match = re.compile(SEASON_EPISODE_INDICATOR_MATCH) #performCrop = (crop != 'none')
s_match = re.compile(SEASON_INDICATOR_MATCH)
e_match = re.compile(EPISODE_INDICATOR_MATCH)
for sourcePath in paths: #if performCrop:
#cropTokens = crop.split(',')
#if cropTokens and len(cropTokens) == 2:
#cropStart, cropLength = crop.split(',')
#else:
#cropStart = DEFAULT_CROP_START
#cropLength = DEFAULT_CROP_LENGTH
#click.echo(f"crop start={cropStart} length={cropLength}")
if not os.path.isfile(sourcePath):
click.echo(f"There is no file with path {sourcePath}, skipping ...")
continue
sourceDirectory = os.path.dirname(sourcePath)
sourceFilename = os.path.basename(sourcePath)
sourcePathTokens = sourceFilename.split('.')
if sourcePathTokens[-1] in FILE_EXTENSIONS:
sourceFileBasename = '.'.join(sourcePathTokens[:-1])
sourceFilenameExtension = sourcePathTokens[-1]
else:
sourceFileBasename = sourceFilename
sourceFilenameExtension = ''
#click.echo(f"dir={sourceDirectory} base={sourceFileBasename} ext={sourceFilenameExtension}")
click.echo(f"\nProcessing file {sourcePath}")
se_result = se_match.search(sourceFilename)
s_result = s_match.search(sourceFilename)
e_result = e_match.search(sourceFilename)
#click.echo(f"\nRunning {len(q_list)} jobs")
#streamDescriptor = getStreamDescriptor(sourcePath) #streamDescriptor = getStreamDescriptor(sourcePath)
@@ -535,18 +497,15 @@ def convert(ctx, paths, label, video_encoder, quality, preset, stereo_bitrate, a
#executeProcess(commandSequence2) #executeProcess(commandSequence2)
#app = ModesApp(ctx.obj) #click.echo('\nDONE\n')
#app.run()
#click.confirm('Warning! This file is not compliant to the defined source schema! Do you want to continue?', abort=True) #endTime = time.perf_counter()
#click.echo(f"Time elapsed {endTime - startTime}")
click.echo('\nDONE\n') app = ModesApp(ctx.obj)
app.run()
endTime = time.perf_counter() click.echo(f"app result: {app.getContext()}")
click.echo(f"Time elapsed {endTime - startTime}")
# click.echo(f"app result: {app.getContext()}")

View File

@@ -1,32 +0,0 @@
import os
from ffx.pattern_controller import PatternController
from ffx.model.show import Base
from sqlalchemy import create_engine, Column, Integer, String, ForeignKey
from sqlalchemy.orm import relationship, sessionmaker, Mapped, backref
filename = 'Boruto.Naruto.Next.Generations.S01E256.GerEngSub.AAC.1080p.WebDL.x264-Tanuki.mkv'
# Data 'input' variable
context = {}
# Initialize DB
homeDir = os.path.expanduser("~")
ffxVarDir = os.path.join(homeDir, '.local', 'var', 'ffx')
if not os.path.exists(ffxVarDir):
os.makedirs(ffxVarDir)
context['database_url'] = f"sqlite:///{os.path.join(ffxVarDir, 'ffx.db')}"
context['database_engine'] = create_engine(context['database_url'])
context['database_session'] = sessionmaker(bind=context['database_engine'])
Base.metadata.create_all(context['database_engine'])
pc = PatternController(context)
print(pc.matchFilename(filename))

View File

@@ -2,7 +2,10 @@
import os, sys, subprocess, json, click, time, re import os, sys, subprocess, json, click, time, re
from ffx.ffx_app import FfxApp from textual.app import App, ComposeResult
from textual.screen import Screen
from textual.widgets import Header, Footer, Placeholder, Label
VERSION='0.1.0' VERSION='0.1.0'
@@ -53,6 +56,74 @@ SEASON_EPISODE_STREAM_LANGUAGE_MATCH = '[sS]([0-9]+)[eE]([0-9]+)_([0-9]+)_([a-z]
SUBTITLE_FILE_EXTENSION = 'vtt' SUBTITLE_FILE_EXTENSION = 'vtt'
class DashboardScreen(Screen):
def __init__(self):
super().__init__()
context = self.app.getContext()
context['dashboard'] = 'dashboard'
def compose(self) -> ComposeResult:
yield Header(show_clock=True)
yield Placeholder("Dashboard Screen")
yield Footer()
class WarningScreen(Screen):
def __init__(self):
super().__init__()
context = self.app.getContext()
def compose(self) -> ComposeResult:
yield Label("Warning! This file is not compliant to the defined source schema!")
yield Footer()
class SettingsScreen(Screen):
def __init__(self):
super().__init__()
context = self.app.getContext()
def compose(self) -> ComposeResult:
yield Placeholder("Settings Screen")
yield Footer()
class HelpScreen(Screen):
def __init__(self):
super().__init__()
context = self.app.getContext()
def compose(self) -> ComposeResult:
yield Placeholder("Help Screen")
yield Footer()
class ModesApp(App):
BINDINGS = [
("q", "quit()", "Quit"),
# ("d", "switch_mode('dashboard')", "Dashboard"),
# ("s", "switch_mode('settings')", "Settings"),
# ("h", "switch_mode('help')", "Help"),
]
MODES = {
"warning": WarningScreen,
"dashboard": DashboardScreen,
"settings": SettingsScreen,
"help": HelpScreen,
}
def __init__(self, context = {}):
super().__init__()
self.context = context
def on_mount(self) -> None:
self.switch_mode("warning")
def getContext(self):
return self.context
def executeProcess(commandSequence): def executeProcess(commandSequence):
process = subprocess.Popen(commandSequence, stdout=subprocess.PIPE, stderr=subprocess.PIPE) process = subprocess.Popen(commandSequence, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
@@ -138,8 +209,6 @@ def getStreamDescriptor(filename):
for subStream in streamData: for subStream in streamData:
if subStream['codec_type'] in ['video', 'audio', 'subtitle']:
if not 'disposition' in subStream.keys(): if not 'disposition' in subStream.keys():
subStream['disposition'] = {} subStream['disposition'] = {}
if not 'default' in subStream['disposition'].keys(): if not 'default' in subStream['disposition'].keys():
@@ -364,63 +433,6 @@ def streams(filename):
@ffx.command()
@click.pass_context
@click.argument('paths', nargs=-1)
@click.option('-l', '--label', type=str, default='', help='Label to be used as filename prefix')
@click.option('-sd', '--subtitle-directory', type=str, default='', help='Load subtitles from here')
@click.option('-sp', '--subtitle-prefix', type=str, default='', help='Subtitle filename prefix')
@click.option("-o", "--output-directory", type=str, default='')
@click.option("--dry-run", is_flag=True, default=False)
def unmux(ctx,
label,
paths,
subtitle_directory,
subtitle_prefix,
output_directory,
dry_run):
existingSourcePaths = [p for p in paths if os.path.isfile(p)]
click.echo(f"\nUnmuxing {len(existingSourcePaths)} files")
for sourcePath in existingSourcePaths:
sd = getStreamDescriptor(sourcePath)
print(f"\nFile {sourcePath}\n")
for v in sd['video']:
if v['codec_name'] == 'h264':
commandSequence = ['ffmpeg', '-i', sourcePath, '-map', '0:v:0', '-c', 'copy', '-f', 'h264']
executeProcess()
for a in sd['audio']:
print(f"A: {a}\n")
for s in sd['subtitle']:
print(f"S: {s}\n")
@ffx.command()
@click.pass_context
def shows(ctx):
app = FfxApp(ctx.obj)
app.run()
@ffx.command() @ffx.command()
@click.pass_context @click.pass_context
@@ -568,9 +580,6 @@ def convert(ctx,
for sourcePath in existingSourcePaths: for sourcePath in existingSourcePaths:
###
###
# Separate basedir, basename and extension for current source file # Separate basedir, basename and extension for current source file
sourceDirectory = os.path.dirname(sourcePath) sourceDirectory = os.path.dirname(sourcePath)
sourceFilename = os.path.basename(sourcePath) sourceFilename = os.path.basename(sourcePath)
@@ -628,8 +637,6 @@ def convert(ctx,
else: else:
targetFilenameTokens = [sourceFileBasename] targetFilenameTokens = [sourceFileBasename]
###
###
# Load source stream descriptor # Load source stream descriptor
try: try:

View File

@@ -1,66 +0,0 @@
import os, re, click
class FileProperties():
FILE_EXTENSIONS = ['mkv', 'mp4', 'avi', 'flv', 'webm']
SEASON_EPISODE_INDICATOR_MATCH = '[sS]([0-9]+)[eE]([0-9]+)'
EPISODE_INDICATOR_MATCH = '[eE]([0-9]+)'
def ___init__(self, sourcePath, ):
# Separate basedir, basename and extension for current source file
self.__sourceDirectory = os.path.dirname(sourcePath)
self.__sourceFilename = os.path.basename(sourcePath)
sourcePathTokens = self.__sourceFilename.split('.')
if sourcePathTokens[-1] in FilenameController.FILE_EXTENSIONS:
self.__sourceFileBasename = '.'.join(sourcePathTokens[:-1])
self.__sourceFilenameExtension = sourcePathTokens[-1]
else:
self.__sourceFileBasename = self.__sourceFilename
self.__sourceFilenameExtension = ''
se_match = re.compile(FilenameController.SEASON_EPISODE_INDICATOR_MATCH)
e_match = re.compile(FilenameController.EPISODE_INDICATOR_MATCH)
se_result = se_match.search(self.__sourceFilename)
e_result = e_match.search(self.__sourceFilename)
self.__season = -1
self.__episode = -1
file_index = 0
if se_result is not None:
self.__season = int(se_result.group(1))
self.__episode = int(se_result.group(2))
elif e_result is not None:
self.__episode = int(e_result.group(1))
else:
file_index += 1
matchingFileSubtitleDescriptors = sorted([d for d in availableFileSubtitleDescriptors if d['season'] == season and d['episode'] == episode], key=lambda d: d['stream']) if availableFileSubtitleDescriptors else []
print(f"season={season} episode={episode} file={file_index}")
# Assemble target filename tokens
targetFilenameTokens = []
targetFilenameExtension = DEFAULT_FILE_EXTENSION
if label:
targetFilenameTokens = [label]
if season > -1 and episode > -1:
targetFilenameTokens += [f"S{season:0{season_digits}d}E{episode:0{episode_digits}d}"]
elif episode > -1:
targetFilenameTokens += [f"E{episode:0{episode_digits}d}"]
else:
targetFilenameTokens += [f"{file_index:0{index_digits}d}"]
else:
targetFilenameTokens = [sourceFileBasename]

View File

View File

@@ -1,16 +0,0 @@
from textual.app import App, ComposeResult
from textual.screen import Screen
from textual.widgets import Header, Footer, Placeholder, Label
class DashboardScreen(Screen):
def __init__(self):
super().__init__()
context = self.app.getContext()
context['dashboard'] = 'dashboard'
def compose(self) -> ComposeResult:
yield Header(show_clock=True)
yield Placeholder("Dashboard Screen")
yield Footer()

View File

@@ -1,65 +0,0 @@
import os, time, sqlite3, sqlalchemy
from textual.app import App, ComposeResult
from textual.screen import Screen
from textual.widgets import Header, Footer, Placeholder, Label
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from ffx.model.show import Base, Show
from ffx.model.pattern import Pattern
from ffx.model.track import Track
from .shows_screen import ShowsScreen
from .warning_screen import WarningScreen
from .dashboard_screen import DashboardScreen
from .settings_screen import SettingsScreen
from .help_screen import HelpScreen
class FfxApp(App):
TITLE = "FFX"
BINDINGS = [
("q", "quit()", "Quit"),
("h", "switch_mode('help')", "Help"),
]
def __init__(self, context = {}):
super().__init__()
# Data 'input' variable
self.context = context
# Initialize DB
homeDir = os.path.expanduser("~")
ffxVarDir = os.path.join(homeDir, '.local', 'var', 'ffx')
if not os.path.exists(ffxVarDir):
os.makedirs(ffxVarDir)
self.context['database_url'] = f"sqlite:///{os.path.join(ffxVarDir, 'ffx.db')}"
self.context['database_engine'] = create_engine(self.context['database_url'])
self.context['database_session'] = sessionmaker(bind=self.context['database_engine'])
Base.metadata.create_all(self.context['database_engine'])
# isSyncronuous = False
# while not isSyncronuous:
# while True:
# try:
# with self.context['database_engine'].connect() as connection:
# connection.execute(sqlalchemy.text('PRAGMA foreign_keys=ON;'))
# #isSyncronuous = True
# break
# except sqlite3.OperationalError:
# time.sleep(0.1)
def on_mount(self) -> None:
self.push_screen(ShowsScreen())
def getContext(self):
"""Data 'output' method"""
return self.context

2
bin/ffx/file_pattern.py Normal file
View File

@@ -0,0 +1,2 @@
class FilePattern():
pass

View File

@@ -1,66 +0,0 @@
import os, re, click
class FileProperties():
FILE_EXTENSIONS = ['mkv', 'mp4', 'avi', 'flv', 'webm']
SEASON_EPISODE_INDICATOR_MATCH = '[sS]([0-9]+)[eE]([0-9]+)'
EPISODE_INDICATOR_MATCH = '[eE]([0-9]+)'
def ___init__(self, sourcePath, ):
# Separate basedir, basename and extension for current source file
self.__sourceDirectory = os.path.dirname(sourcePath)
self.__sourceFilename = os.path.basename(sourcePath)
sourcePathTokens = self.__sourceFilename.split('.')
if sourcePathTokens[-1] in FilenameController.FILE_EXTENSIONS:
self.__sourceFileBasename = '.'.join(sourcePathTokens[:-1])
self.__sourceFilenameExtension = sourcePathTokens[-1]
else:
self.__sourceFileBasename = self.__sourceFilename
self.__sourceFilenameExtension = ''
se_match = re.compile(FilenameController.SEASON_EPISODE_INDICATOR_MATCH)
e_match = re.compile(FilenameController.EPISODE_INDICATOR_MATCH)
se_result = se_match.search(self.__sourceFilename)
e_result = e_match.search(self.__sourceFilename)
self.__season = -1
self.__episode = -1
file_index = 0
if se_result is not None:
self.__season = int(se_result.group(1))
self.__episode = int(se_result.group(2))
elif e_result is not None:
self.__episode = int(e_result.group(1))
else:
file_index += 1
matchingFileSubtitleDescriptors = sorted([d for d in availableFileSubtitleDescriptors if d['season'] == season and d['episode'] == episode], key=lambda d: d['stream']) if availableFileSubtitleDescriptors else []
print(f"season={season} episode={episode} file={file_index}")
# Assemble target filename tokens
targetFilenameTokens = []
targetFilenameExtension = DEFAULT_FILE_EXTENSION
if label:
targetFilenameTokens = [label]
if season > -1 and episode > -1:
targetFilenameTokens += [f"S{season:0{season_digits}d}E{episode:0{episode_digits}d}"]
elif episode > -1:
targetFilenameTokens += [f"E{episode:0{episode_digits}d}"]
else:
targetFilenameTokens += [f"{file_index:0{index_digits}d}"]
else:
targetFilenameTokens = [sourceFileBasename]

View File

@@ -1,12 +0,0 @@
from textual.app import App, ComposeResult
from textual.screen import Screen
from textual.widgets import Header, Footer, Placeholder, Label
class HelpScreen(Screen):
def __init__(self):
super().__init__()
context = self.app.getContext()
def compose(self) -> ComposeResult:
yield Placeholder("Help Screen")
yield Footer()

View File

@@ -1,7 +1,7 @@
from enum import Enum from enum import Enum
import difflib import difflib
class IsoLanguage(Enum): class LanguageData(Enum):
AFRIKAANS = {"name": "Afrikaans", "iso639_1": "af", "iso639_2": "afr"} AFRIKAANS = {"name": "Afrikaans", "iso639_1": "af", "iso639_2": "afr"}
ALBANIAN = {"name": "Albanian", "iso639_1": "sq", "iso639_2": "alb"} ALBANIAN = {"name": "Albanian", "iso639_1": "sq", "iso639_2": "alb"}
@@ -73,41 +73,21 @@ class IsoLanguage(Enum):
VIETNAMESE = {"name": "Vietnamese", "iso639_1": "vi", "iso639_2": "vie"} VIETNAMESE = {"name": "Vietnamese", "iso639_1": "vi", "iso639_2": "vie"}
WELSH = {"name": "Welsh", "iso639_1": "cy", "iso639_2": "wel"} WELSH = {"name": "Welsh", "iso639_1": "cy", "iso639_2": "wel"}
def find(name : str):
@staticmethod closestMatches = difflib.get_close_matches(name, [l.value["name"] for l in LanguageData], n=1)
def find(label : str):
closestMatches = difflib.get_close_matches(label, [l.value["name"] for l in IsoLanguage], n=1)
if closestMatches: if closestMatches:
foundLangs = [l for l in IsoLanguage if l.value['name'] == closestMatches[0]] foundLangs = [l for l in LanguageData if l.value['name'] == closestMatches[0]]
return foundLangs[0] if foundLangs else None return foundLangs[0] if foundLangs else None
else: else:
return None return None
@staticmethod def get(lang : str):
def findThreeLetter(theeLetter : str):
foundLangs = [l for l in IsoLanguage if l.value['iso639_2'] == str(theeLetter)]
return foundLangs[0] if foundLangs else None
# def get(lang : str):
#
# selectedLangs = [l for l in IsoLanguage if l.value['iso639_2'] == lang]
#
# if selectedLangs:
# return selectedLangs[0]
# else:
# return None
def label(self):
return str(self.value['name'])
def twoLetter(self):
return str(self.value['iso639_1'])
def threeLetter(self):
return str(self.value['iso639_2'])
selectedLangs = [l for l in LanguageData if l.value['iso639_2'] == lang]
if selectedLangs:
return selectedLangs[0]
else:
return None

View File

@@ -1,2 +0,0 @@
class MediaDescriptor():
pass

View File

@@ -1,28 +0,0 @@
from sqlalchemy import create_engine, Column, Integer, String, ForeignKey
from sqlalchemy.orm import relationship, sessionmaker, Mapped, backref
from .show import Base
from .track import Track
class Pattern(Base):
__tablename__ = 'patterns'
# v1.x
id = Column(Integer, primary_key=True)
pattern = Column(String)
# v2.0
# id: Mapped[int] = mapped_column(Integer, primary_key=True)
# pattern: Mapped[str] = mapped_column(String, nullable=False)
# v1.x
show_id = Column(Integer, ForeignKey('shows.id', ondelete="CASCADE"))
show = relationship('Show', back_populates='patterns')
# v2.0
# show_id: Mapped[int] = mapped_column(ForeignKey("shows.id", ondelete="CASCADE"))
# show: Mapped["Show"] = relationship(back_populates="patterns")
tracks = relationship('Track', back_populates='pattern', cascade="all, delete")

View File

@@ -1,41 +0,0 @@
# from typing import List
from sqlalchemy import create_engine, Column, Integer, String, ForeignKey
from sqlalchemy.orm import relationship, declarative_base, sessionmaker
Base = declarative_base()
class Show(Base):
"""
relationship(argument, opt1, opt2, ...)
argument is string of class or Mapped class of the target entity
backref creates a bi-directional corresponding relationship (back_populates preferred)
back_populates points to the corresponding relationship (the actual class attribute identifier)
See: https://docs.sqlalchemy.org/en/(14|20)/orm/basic_relationships.html
"""
__tablename__ = 'shows'
# v1.x
id = Column(Integer, primary_key=True)
name = Column(String)
year = Column(Integer)
# v2.0
# id: Mapped[int] = mapped_column(Integer, primary_key=True)
# name: Mapped[str] = mapped_column(String, nullable=False)
# year: Mapped[int] = mapped_column(Integer, nullable=False)
# v1.x
#patterns = relationship('Pattern', back_populates='show', cascade="all, delete", passive_deletes=True)
patterns = relationship('Pattern', back_populates='show', cascade="all, delete")
# patterns = relationship('Pattern', back_populates='show', cascade="all")
# v2.0
# patterns: Mapped[List["Pattern"]] = relationship(back_populates="show", cascade="all, delete")
index_season_digits = Column(Integer, default=2)
index_episode_digits = Column(Integer, default=2)
indicator_season_digits = Column(Integer, default=2)
indicator_episode_digits = Column(Integer, default=2)

View File

@@ -1,30 +0,0 @@
# from typing import List
from sqlalchemy import create_engine, Column, Integer, String, ForeignKey, Enum
from sqlalchemy.orm import relationship, declarative_base, sessionmaker
from .show import Base
from ffx.track_type import TrackType
class Tag(Base):
"""
relationship(argument, opt1, opt2, ...)
argument is string of class or Mapped class of the target entity
backref creates a bi-directional corresponding relationship (back_populates preferred)
back_populates points to the corresponding relationship (the actual class attribute identifier)
See: https://docs.sqlalchemy.org/en/(14|20)/orm/basic_relationships.html
"""
__tablename__ = 'tags'
# v1.x
id = Column(Integer, primary_key=True)
key = Column(String)
value = Column(String)
# v1.x
track_id = Column(Integer, ForeignKey('tracks.id', ondelete="CASCADE"))
track = relationship('Track', back_populates='tags')

View File

@@ -1,49 +0,0 @@
# from typing import List
from sqlalchemy import create_engine, Column, Integer, String, ForeignKey
from sqlalchemy.orm import relationship, declarative_base, sessionmaker
from .show import Base
from ffx.track_type import TrackType
from ffx.iso_language import IsoLanguage
from ffx.model.tag import Tag
class Track(Base):
"""
relationship(argument, opt1, opt2, ...)
argument is string of class or Mapped class of the target entity
backref creates a bi-directional corresponding relationship (back_populates preferred)
back_populates points to the corresponding relationship (the actual class attribute identifier)
See: https://docs.sqlalchemy.org/en/(14|20)/orm/basic_relationships.html
"""
__tablename__ = 'tracks'
# v1.x
id = Column(Integer, primary_key=True, autoincrement = True)
# P=pattern_id+sub_index+track_type
track_type = Column(Integer) # TrackType
sub_index = Column(Integer)
# v1.x
pattern_id = Column(Integer, ForeignKey('patterns.id', ondelete="CASCADE"))
pattern = relationship('Pattern', back_populates='tracks')
language = Column(String) # IsoLanguage threeLetter
title = Column(String)
tags = relationship('Tag', back_populates='track', cascade="all, delete")
disposition_flags = Column(Integer)
def getDescriptor(self):
pass

View File

@@ -1,163 +0,0 @@
import click, re
from ffx.model.pattern import Pattern
class PatternController():
def __init__(self, context):
self.context = context
self.Session = self.context['database_session'] # convenience
def addPattern(self, patternDescriptor):
try:
s = self.Session()
q = s.query(Pattern).filter(Pattern.show_id == int(patternDescriptor['show_id']), Pattern.pattern == str(patternDescriptor['pattern']))
if not q.count():
pattern = Pattern(show_id = int(patternDescriptor['show_id']),
pattern = str(patternDescriptor['pattern']))
s.add(pattern)
s.commit()
return patternDescriptor
else:
return {}
except Exception as ex:
raise click.ClickException(f"PatternController.addPattern(): {repr(ex)}")
finally:
s.close()
def updatePattern(self, patternId, patternDescriptor):
try:
s = self.Session()
q = s.query(Pattern).filter(Pattern.id == int(patternId))
if q.count():
pattern = q.first()
pattern.show_id = int(patternDescriptor['show_id'])
pattern.pattern = str(patternDescriptor['pattern'])
s.commit()
return True
else:
return False
except Exception as ex:
raise click.ClickException(f"PatternController.updatePattern(): {repr(ex)}")
finally:
s.close()
def findPattern(self, patternDescriptor):
try:
s = self.Session()
q = s.query(Pattern).filter(Pattern.show_id == int(patternDescriptor['show_id']), Pattern.pattern == str(patternDescriptor['pattern']))
if q.count():
pattern = q.first()
return int(pattern.id)
else:
return None
except Exception as ex:
raise click.ClickException(f"PatternController.findPattern(): {repr(ex)}")
finally:
s.close()
def getPatternDict(self, pattern):
patternDescriptor = {}
patternDescriptor['id'] = int(pattern.id)
patternDescriptor['pattern'] = str(pattern.pattern)
patternDescriptor['show_id'] = int(pattern.show_id)
return patternDescriptor
def getPatternDescriptor(self, patternId):
try:
s = self.Session()
q = s.query(Pattern).filter(Pattern.id == int(patternId))
if q.count():
pattern = q.first()
return self.getPatternDict(pattern)
except Exception as ex:
raise click.ClickException(f"PatternController.getPatternDescriptor(): {repr(ex)}")
finally:
s.close()
def deletePattern(self, patternId):
try:
s = self.Session()
q = s.query(Pattern).filter(Pattern.id == int(patternId))
if q.count():
#DAFUQ: https://stackoverflow.com/a/19245058
# q.delete()
pattern = q.first()
s.delete(pattern)
s.commit()
return True
return False
except Exception as ex:
raise click.ClickException(f"PatternController.deletePattern(): {repr(ex)}")
finally:
s.close()
def matchFilename(self, filename):
SEASON_PATTERN = '[sS]([0-9]+)'
EPISODE_PATTERN = '[eE]([0-9]+)'
result = {}
try:
s = self.Session()
q = s.query(Pattern)
for pattern in q.all():
match = re.search(pattern.pattern, filename)
if match:
result['pattern_id'] = pattern.id
result['show_id'] = pattern.show_id
result['indicator'] = match.group(1)
seasonMatch = re.search(SEASON_PATTERN, result['indicator'])
if seasonMatch:
result['season'] = int(seasonMatch.group(1))
episodeMatch = re.search(EPISODE_PATTERN, result['indicator'])
if episodeMatch:
result['episode'] = int(episodeMatch.group(1))
except Exception as ex:
raise click.ClickException(f"PatternController.matchFilename(): {repr(ex)}")
finally:
s.close()
return result

View File

@@ -1,113 +0,0 @@
import click
from textual import events
from textual.app import App, ComposeResult
from textual.screen import Screen
from textual.widgets import Header, Footer, Placeholder, Label, ListView, ListItem, Static, DataTable, Button, Input
from textual.containers import Grid, Horizontal
from ffx.model.pattern import Pattern
from .show_controller import ShowController
from .pattern_controller import PatternController
# Screen[dict[int, str, int]]
class PatternDeleteScreen(Screen):
CSS = """
Grid {
grid-size: 2;
grid-rows: 2 auto;
grid-columns: 30 330;
height: 100%;
width: 100%;
padding: 1;
}
Input {
border: none;
}
Button {
border: none;
}
#toplabel {
height: 1;
}
.two {
column-span: 2;
}
.box {
height: 100%;
border: solid green;
}
"""
def __init__(self, patternId = None, showId = None):
super().__init__()
self.context = self.app.getContext()
self.Session = self.context['database_session'] # convenience
self.__pc = PatternController(context = self.context)
self.__sc = ShowController(context = self.context)
self.pattern_id = patternId
self.pattern_obj = self.__pc.getPatternDescriptor(patternId) if patternId is not None else {}
self.show_obj = self.__sc.getShowDesciptor(showId) if showId is not None else {}
def on_mount(self):
if self.show_obj:
self.query_one("#showlabel", Static).update(f"{self.show_obj['id']} - {self.show_obj['name']} ({self.show_obj['year']})")
if self.pattern_obj:
self.query_one("#patternlabel", Static).update(str(self.pattern_obj['pattern']))
def compose(self):
yield Header()
with Grid():
yield Static("Are you sure to delete the following filename pattern?", id="toplabel", classes="two")
yield Static("", classes="two")
yield Static("Pattern")
yield Static("", id="patternlabel")
yield Static("", classes="two")
yield Static("from show")
yield Static("", id="showlabel")
yield Static("", classes="two")
yield Button("Delete", id="delete_button")
yield Button("Cancel", id="cancel_button")
yield Footer()
# Event handler for button press
def on_button_pressed(self, event: Button.Pressed) -> None:
if event.button.id == "delete_button":
if self.pattern_id is None:
raise click.ClickException('PatternDeleteScreen.on_button_pressed(): pattern id is undefined')
if self.__pc.deletePattern(self.pattern_id):
self.dismiss(self.pattern_obj)
else:
#TODO: Meldung
self.app.pop_screen()
if event.button.id == "cancel_button":
self.app.pop_screen()

View File

@@ -1,436 +0,0 @@
import click, re
from textual import events
from textual.app import App, ComposeResult
from textual.screen import Screen
from textual.widgets import Header, Footer, Placeholder, Label, ListView, ListItem, Static, DataTable, Button, Input
from textual.containers import Grid, Horizontal
from ffx.model.show import Show
from ffx.model.pattern import Pattern
from .pattern_controller import PatternController
from .show_controller import ShowController
from .track_controller import TrackController
from .track_details_screen import TrackDetailsScreen
from .track_delete_screen import TrackDeleteScreen
from ffx.track_type import TrackType
from ffx.track_disposition import TrackDisposition
from textual.widgets._data_table import CellDoesNotExist
# Screen[dict[int, str, int]]
class PatternDetailsScreen(Screen):
CSS = """
Grid {
grid-size: 5 12;
grid-rows: 2 2 2 2 2 6 2 2 6 2 2 2;
grid-columns: 25 25 25 25 25;
height: 100%;
width: 100%;
padding: 1;
}
Input {
border: none;
}
Button {
border: none;
}
DataTable {
min-height: 6;
}
#toplabel {
height: 1;
}
.three {
column-span: 3;
}
.four {
column-span: 4;
}
.five {
column-span: 5;
}
.box {
height: 100%;
border: solid green;
}
"""
def __init__(self, patternId = None, showId = None):
super().__init__()
self.context = self.app.getContext()
self.Session = self.context['database_session'] # convenience
self.__pc = PatternController(context = self.context)
self.__sc = ShowController(context = self.context)
self.__tc = TrackController(context = self.context)
self.pattern_obj = self.__pc.getPatternDescriptor(patternId) if patternId is not None else {}
self.show_obj = self.__sc.getShowDesciptor(showId) if showId is not None else {}
def loadTracks(self, show_id):
try:
tracks = {}
tracks['audio'] = {}
tracks['subtitle'] = {}
s = self.Session()
q = s.query(Pattern).filter(Pattern.show_id == int(show_id))
return [{'id': int(p.id), 'pattern': p.pattern} for p in q.all()]
except Exception as ex:
click.ClickException(f"loadPatterns(): {repr(ex)}")
finally:
s.close()
def updateAudioTracks(self):
self.audioStreamsTable.clear()
trackIds = self.__tc.findAllTracks(self.pattern_obj['id'])
for audioTrackId in trackIds[TrackType.AUDIO.label()]:
ad = self.__tc.getTrackDescriptor(audioTrackId)
dispoList = ad['disposition_list']
row = (ad['sub_index'],
" ",
ad['language'].label(),
ad['title'],
'Yes' if TrackDisposition.DEFAULT in dispoList else 'No',
'Yes' if TrackDisposition.FORCED in dispoList else 'No')
self.audioStreamsTable.add_row(*map(str, row))
def updateSubtitleTracks(self):
self.subtitleStreamsTable.clear()
trackIds = self.__tc.findAllTracks(self.pattern_obj['id'])
for subtitleTrackId in trackIds[TrackType.SUBTITLE.label()]:
sd = self.__tc.getTrackDescriptor(subtitleTrackId)
dispoList = sd['disposition_list']
row = (sd['sub_index'],
" ",
sd['language'].label(),
sd['title'],
'Yes' if TrackDisposition.DEFAULT in dispoList else 'No',
'Yes' if TrackDisposition.FORCED in dispoList else 'No')
self.subtitleStreamsTable.add_row(*map(str, row))
def on_mount(self):
if self.pattern_obj:
self.query_one("#pattern_input", Input).value = str(self.pattern_obj['pattern'])
if self.show_obj:
self.query_one("#showlabel", Static).update(f"{self.show_obj['id']} - {self.show_obj['name']} ({self.show_obj['year']})")
if self.pattern_obj:
self.updateAudioTracks()
self.updateSubtitleTracks()
def compose(self):
self.audioStreamsTable = DataTable(classes="five")
# Define the columns with headers
self.column_key_audio_subid = self.audioStreamsTable.add_column("Subindex", width=20)
self.column_key_audio_layout = self.audioStreamsTable.add_column("Layout", width=20)
self.column_key_audio_language = self.audioStreamsTable.add_column("Language", width=20)
self.column_key_audio_title = self.audioStreamsTable.add_column("Title", width=30)
self.column_key_audio_default = self.audioStreamsTable.add_column("Default", width=10)
self.column_key_audio_forced = self.audioStreamsTable.add_column("Forced", width=10)
self.audioStreamsTable.cursor_type = 'row'
self.subtitleStreamsTable = DataTable(classes="five")
# Define the columns with headers
self.column_key_subtitle_subid = self.subtitleStreamsTable.add_column("Subindex", width=20)
self.column_key_subtitle_spacer = self.subtitleStreamsTable.add_column(" ", width=20)
self.column_key_subtitle_language = self.subtitleStreamsTable.add_column("Language", width=20)
self.column_key_subtitle_title = self.subtitleStreamsTable.add_column("Title", width=30)
self.column_key_subtitle_default = self.subtitleStreamsTable.add_column("Default", width=10)
self.column_key_subtitle_forced = self.subtitleStreamsTable.add_column("Forced", width=10)
self.subtitleStreamsTable.cursor_type = 'row'
yield Header()
with Grid():
# 1
yield Static("Edit filename pattern" if self.pattern_obj else "New filename pattern", id="toplabel")
yield Input(type="text", id="pattern_input", classes="four")
# 2
yield Static("from show")
yield Static("", id="showlabel", classes="three")
yield Button("Substitute pattern", id="patternbutton")
# 3
yield Static(" ", classes="five")
# 4
yield Static(" ", classes="five")
# 5
yield Static("Audio streams")
yield Static(" ")
if self.pattern_obj:
yield Button("Add", id="button_add_audio_stream")
yield Button("Edit", id="button_edit_audio_stream")
yield Button("Delete", id="button_delete_audio_stream")
else:
yield Static("")
yield Static("")
yield Static("")
# 6
yield self.audioStreamsTable
# 7
yield Static(" ", classes="five")
# 8
yield Static("Subtitle streams")
yield Static(" ")
if self.pattern_obj:
yield Button("Add", id="button_add_subtitle_stream")
yield Button("Edit", id="button_edit_subtitle_stream")
yield Button("Delete", id="button_delete_subtitle_stream")
else:
yield Static("")
yield Static("")
yield Static("")
# 9
yield self.subtitleStreamsTable
# 10
yield Static(" ", classes="five")
# 11
yield Button("Save", id="save_button")
yield Button("Cancel", id="cancel_button")
yield Footer()
def getPatternFromInput(self):
return str(self.query_one("#pattern_input", Input).value)
def getSelectedAudioTrackId(self):
if not self.pattern_obj:
return None
try:
# Fetch the currently selected row when 'Enter' is pressed
#selected_row_index = self.table.cursor_row
row_key, col_key = self.audioStreamsTable.coordinate_to_cell_key(self.audioStreamsTable.cursor_coordinate)
if row_key is not None:
selected_track_data = self.audioStreamsTable.get_row(row_key)
subIndex = int(selected_track_data[0])
return self.__tc.findTrack(self.pattern_obj['id'], TrackType.AUDIO, subIndex)
else:
return None
except CellDoesNotExist:
return None
def getSelectedSubtitleTrackId(self):
if not self.pattern_obj:
return None
try:
# Fetch the currently selected row when 'Enter' is pressed
#selected_row_index = self.table.cursor_row
row_key, col_key = self.subtitleStreamsTable.coordinate_to_cell_key(self.subtitleStreamsTable.cursor_coordinate)
if row_key is not None:
selected_track_data = self.subtitleStreamsTable.get_row(row_key)
subIndex = int(selected_track_data[0])
return self.__tc.findTrack(self.pattern_obj['id'], TrackType.SUBTITLE, subIndex)
else:
return None
except CellDoesNotExist:
return None
# Event handler for button press
def on_button_pressed(self, event: Button.Pressed) -> None:
# Check if the button pressed is the one we are interested in
if event.button.id == "save_button":
patternDescriptor = {}
patternDescriptor['show_id'] = self.show_obj['id']
patternDescriptor['pattern'] = self.getPatternFromInput()
if self.pattern_obj:
if self.__pc.updatePattern(self.pattern_obj['id'], patternDescriptor):
self.dismiss(patternDescriptor)
else:
#TODO: Meldung
self.app.pop_screen()
else:
if self.__pc.addPattern(patternDescriptor):
self.dismiss(patternDescriptor)
else:
#TODO: Meldung
self.app.pop_screen()
if event.button.id == "cancel_button":
self.app.pop_screen()
# Save pattern when just created before adding streams
if self.pattern_obj:
#self.pattern_obj
if event.button.id == "button_add_audio_stream":
self.app.push_screen(TrackDetailsScreen(trackType = TrackType.AUDIO, patternId = self.pattern_obj['id'], subIndex = len(self.audioStreamsTable.rows)), self.handle_add_track)
selectedAudioTrackId = self.getSelectedAudioTrackId()
if selectedAudioTrackId is not None:
if event.button.id == "button_edit_audio_stream":
self.app.push_screen(TrackDetailsScreen(trackId = selectedAudioTrackId), self.handle_edit_track)
if event.button.id == "button_delete_audio_stream":
self.app.push_screen(TrackDeleteScreen(trackId = selectedAudioTrackId), self.handle_delete_track)
if event.button.id == "button_add_subtitle_stream":
self.app.push_screen(TrackDetailsScreen(trackType = TrackType.SUBTITLE, patternId = self.pattern_obj['id'], subIndex = len(self.subtitleStreamsTable.rows)), self.handle_add_track)
selectedSubtitleTrackId = self.getSelectedSubtitleTrackId()
if selectedSubtitleTrackId is not None:
if event.button.id == "button_edit_subtitle_stream":
self.app.push_screen(TrackDetailsScreen(trackId = selectedSubtitleTrackId), self.handle_edit_track)
if event.button.id == "button_delete_subtitle_stream":
self.app.push_screen(TrackDeleteScreen(trackId = selectedSubtitleTrackId), self.handle_delete_track)
if event.button.id == "patternbutton":
INDICATOR_PATTERN = '([sS][0-9]+[eE][0-9]+)'
pattern = self.query_one("#pattern_input", Input).value
patternMatch = re.search(INDICATOR_PATTERN, pattern)
if patternMatch:
self.query_one("#pattern_input", Input).value = pattern.replace(patternMatch.group(1), INDICATOR_PATTERN)
def handle_add_track(self, trackDescriptor):
dispoList = trackDescriptor['disposition_list']
if trackDescriptor['type'] == TrackType.AUDIO:
row = (trackDescriptor['sub_index'],
" ",
trackDescriptor['language'].label(),
trackDescriptor['title'],
'Yes' if TrackDisposition.DEFAULT in dispoList else 'No',
'Yes' if TrackDisposition.FORCED in dispoList else 'No')
self.audioStreamsTable.add_row(*map(str, row))
if trackDescriptor['type'] == TrackType.SUBTITLE:
row = (trackDescriptor['sub_index'],
" ",
trackDescriptor['language'].label(),
trackDescriptor['title'],
'Yes' if TrackDisposition.DEFAULT in dispoList else 'No',
'Yes' if TrackDisposition.FORCED in dispoList else 'No')
self.subtitleStreamsTable.add_row(*map(str, row))
def handle_edit_track(self, trackDescriptor):
try:
if trackDescriptor['type'] == TrackType.AUDIO:
row_key, col_key = self.audioStreamsTable.coordinate_to_cell_key(self.audioStreamsTable.cursor_coordinate)
self.audioStreamsTable.update_cell(row_key, self.column_key_audio_language, trackDescriptor['language'].label())
self.audioStreamsTable.update_cell(row_key, self.column_key_audio_title, trackDescriptor['title'])
self.audioStreamsTable.update_cell(row_key, self.column_key_audio_default, 'Yes' if TrackDisposition.DEFAULT in trackDescriptor['disposition_list'] else 'No')
self.audioStreamsTable.update_cell(row_key, self.column_key_audio_forced, 'Yes' if TrackDisposition.FORCED in trackDescriptor['disposition_list'] else 'No')
if trackDescriptor['type'] == TrackType.SUBTITLE:
row_key, col_key = self.subtitleStreamsTable.coordinate_to_cell_key(self.subtitleStreamsTable.cursor_coordinate)
self.subtitleStreamsTable.update_cell(row_key, self.column_key_subtitle_language, trackDescriptor['language'].label())
self.subtitleStreamsTable.update_cell(row_key, self.column_key_subtitle_title, trackDescriptor['title'])
self.subtitleStreamsTable.update_cell(row_key, self.column_key_subtitle_default, 'Yes' if TrackDisposition.DEFAULT in trackDescriptor['disposition_list'] else 'No')
self.subtitleStreamsTable.update_cell(row_key, self.column_key_subtitle_forced, 'Yes' if TrackDisposition.FORCED in trackDescriptor['disposition_list'] else 'No')
except CellDoesNotExist:
pass
def handle_delete_track(self, trackDescriptor):
try:
if trackDescriptor['type'] == TrackType.AUDIO:
self.updateAudioTracks()
if trackDescriptor['type'] == TrackType.SUBTITLE:
self.updateSubtitleTracks()
except CellDoesNotExist:
pass

View File

@@ -1,11 +0,0 @@
from textual.app import App, ComposeResult
from textual.screen import Screen
from textual.widgets import Header, Footer, Placeholder, Label
class SettingsScreen(Screen):
def __init__(self):
super().__init__()
context = self.app.getContext()
def compose(self) -> ComposeResult:
yield Placeholder("Settings Screen")
yield Footer()

2
bin/ffx/show.py Normal file
View File

@@ -0,0 +1,2 @@
class Show():
pass

View File

@@ -1,115 +1,2 @@
import click
from ffx.model.show import Show
class ShowController(): class ShowController():
pass
def __init__(self, context):
self.context = context
self.Session = self.context['database_session'] # convenience
def getShowDesciptor(self, showId):
try:
s = self.Session()
q = s.query(Show).filter(Show.id == showId)
showDescriptor = {}
if q.count():
show = q.first()
showDescriptor['id'] = int(show.id)
showDescriptor['name'] = str(show.name)
showDescriptor['year'] = int(show.year)
showDescriptor['index_season_digits'] = int(show.index_season_digits)
showDescriptor['index_episode_digits'] = int(show.index_episode_digits)
showDescriptor['indicator_season_digits'] = int(show.indicator_season_digits)
showDescriptor['indicator_episode_digits'] = int(show.indicator_episode_digits)
return showDescriptor
except Exception as ex:
raise click.ClickException(f"ShowController.getShowDesciptor(): {repr(ex)}")
finally:
s.close()
def updateShow(self, showDescriptor):
try:
s = self.Session()
q = s.query(Show).filter(Show.id == showDescriptor['id'])
if not q.count():
show = Show(id = int(showDescriptor['id']),
name = str(showDescriptor['name']),
year = int(showDescriptor['year']),
index_season_digits = showDescriptor['index_season_digits'],
index_episode_digits = showDescriptor['index_episode_digits'],
indicator_season_digits = showDescriptor['indicator_season_digits'],
indicator_episode_digits = showDescriptor['indicator_episode_digits'])
s.add(show)
s.commit()
return True
else:
currentShow = q.first()
changed = False
if currentShow.name != str(showDescriptor['name']):
currentShow.name = str(showDescriptor['name'])
changed = True
if currentShow.year != int(showDescriptor['year']):
currentShow.year = int(showDescriptor['year'])
changed = True
if currentShow.index_season_digits != int(showDescriptor['index_season_digits']):
currentShow.index_season_digits = int(showDescriptor['index_season_digits'])
changed = True
if currentShow.index_episode_digits != int(showDescriptor['index_episode_digits']):
currentShow.index_episode_digits = int(showDescriptor['index_episode_digits'])
changed = True
if currentShow.indicator_season_digits != int(showDescriptor['indicator_season_digits']):
currentShow.indicator_season_digits = int(showDescriptor['indicator_season_digits'])
changed = True
if currentShow.indicator_episode_digits != int(showDescriptor['indicator_episode_digits']):
currentShow.indicator_episode_digits = int(showDescriptor['indicator_episode_digits'])
changed = True
if changed:
s.commit()
return changed
except Exception as ex:
raise click.ClickException(f"ShowController.updateShow(): {repr(ex)}")
finally:
s.close()
def deleteShow(self, show_id):
try:
s = self.Session()
q = s.query(Show).filter(Show.id == int(show_id))
if q.count():
#DAFUQ: https://stackoverflow.com/a/19245058
# q.delete()
show = q.first()
s.delete(show)
s.commit()
return True
return False
except Exception as ex:
raise click.ClickException(f"ShowController.deleteShow(): {repr(ex)}")
finally:
s.close()

View File

@@ -1,101 +0,0 @@
import click
from textual import events
from textual.app import App, ComposeResult
from textual.screen import Screen
from textual.widgets import Header, Footer, Placeholder, Label, ListView, ListItem, Static, DataTable, Button, Input
from textual.containers import Grid, Horizontal
from ffx.model.show import Show
from .show_controller import ShowController
# Screen[dict[int, str, int]]
class ShowDeleteScreen(Screen):
CSS = """
Grid {
grid-size: 2;
grid-rows: 2 auto;
grid-columns: 30 auto;
height: 100%;
width: 100%;
padding: 1;
}
Input {
border: none;
}
Button {
border: none;
}
#toplabel {
height: 1;
}
.two {
column-span: 2;
}
.box {
height: 100%;
border: solid green;
}
"""
def __init__(self, showId = None):
super().__init__()
self.context = self.app.getContext()
self.Session = self.context['database_session'] # convenience
self.__sc = ShowController(context = self.context)
self.show_obj = self.__sc.getShowDesciptor(showId) if showId is not None else {}
def on_mount(self):
if self.show_obj:
self.query_one("#showlabel", Static).update(f"{self.show_obj['id']} - {self.show_obj['name']} ({self.show_obj['year']})")
def compose(self):
yield Header()
with Grid():
yield Static("Are you sure to delete the following show?", id="toplabel", classes="two")
yield Static("", classes="two")
yield Static("", id="showlabel")
yield Static("")
yield Static("", classes="two")
yield Static("", classes="two")
yield Button("Delete", id="delete_button")
yield Button("Cancel", id="cancel_button")
yield Footer()
# Event handler for button press
def on_button_pressed(self, event: Button.Pressed) -> None:
if event.button.id == "delete_button":
if self.__sc.deleteShow(self.show_obj['id']):
self.dismiss(self.show_obj['id'])
else:
#TODO: Meldung
self.app.pop_screen()
if event.button.id == "cancel_button":
self.app.pop_screen()

View File

@@ -1,319 +0,0 @@
import click
from textual import events
from textual.app import App, ComposeResult
from textual.screen import Screen
from textual.widgets import Header, Footer, Placeholder, Label, ListView, ListItem, Static, DataTable, Button, Input
from textual.containers import Grid, Horizontal
from textual.widgets._data_table import CellDoesNotExist
from ffx.model.show import Show
from ffx.model.pattern import Pattern
from .pattern_details_screen import PatternDetailsScreen
from .pattern_delete_screen import PatternDeleteScreen
from .show_controller import ShowController
from .pattern_controller import PatternController
# Screen[dict[int, str, int]]
class ShowDetailsScreen(Screen):
CSS = """
Grid {
grid-size: 5 14;
grid-rows: 2 2 2 2 2 2 2 2 2 2 2 6 2 2;
grid-columns: 30 30 30 30 30;
height: 100%;
width: 100%;
padding: 1;
}
Input {
border: none;
}
Button {
border: none;
}
DataTable {
column-span: 2;
min-height: 5;
}
#toplabel {
height: 1;
}
.two {
column-span: 2;
}
.three {
column-span: 3;
}
.four {
column-span: 4;
}
.five {
column-span: 5;
}
.box {
height: 100%;
border: solid green;
}
"""
BINDINGS = [
("a", "add_pattern", "Add Pattern"),
("e", "edit_pattern", "Edit Pattern"),
("r", "remove_pattern", "Remove Pattern"),
]
def __init__(self, showId = None):
super().__init__()
self.context = self.app.getContext()
self.Session = self.context['database_session'] # convenience
self.__sc = ShowController(context = self.context)
self.__pc = PatternController(context = self.context)
self.show_obj = self.__sc.getShowDesciptor(showId) if showId is not None else {}
def loadPatterns(self, show_id):
try:
s = self.Session()
q = s.query(Pattern).filter(Pattern.show_id == int(show_id))
return [{'id': int(p.id), 'pattern': p.pattern} for p in q.all()]
except Exception as ex:
click.ClickException(f"loadPatterns(): {repr(ex)}")
finally:
s.close()
def on_mount(self):
if self.show_obj:
self.query_one("#id_wdg", Static).update(str(self.show_obj['id']))
self.query_one("#name_input", Input).value = str(self.show_obj['name'])
self.query_one("#year_input", Input).value = str(self.show_obj['year'])
self.query_one("#index_season_digits_input", Input).value = str(self.show_obj['index_season_digits'])
self.query_one("#index_episode_digits_input", Input).value = str(self.show_obj['index_episode_digits'])
self.query_one("#indicator_season_digits_input", Input).value = str(self.show_obj['indicator_season_digits'])
self.query_one("#indicator_episode_digits_input", Input).value = str(self.show_obj['indicator_episode_digits'])
for pattern in self.loadPatterns(int(self.show_obj['id'])):
row = (pattern['pattern'],)
self.patternTable.add_row(*map(str, row))
else:
self.query_one("#index_season_digits_input", Input).value = "2"
self.query_one("#index_episode_digits_input", Input).value = "2"
self.query_one("#indicator_season_digits_input", Input).value = "2"
self.query_one("#indicator_episode_digits_input", Input).value = "2"
def getSelectedPatternDescriptor(self):
selectedPattern = {}
try:
# Fetch the currently selected row when 'Enter' is pressed
#selected_row_index = self.table.cursor_row
row_key, col_key = self.patternTable.coordinate_to_cell_key(self.patternTable.cursor_coordinate)
if row_key is not None:
selected_row_data = self.patternTable.get_row(row_key)
selectedPattern['show_id'] = self.show_obj['id']
selectedPattern['pattern'] = str(selected_row_data[0])
except CellDoesNotExist:
pass
return selectedPattern
def action_add_pattern(self):
if self.show_obj:
self.app.push_screen(PatternDetailsScreen(showId = self.show_obj['id']), self.handle_add_pattern) # <-
def handle_add_pattern(self, screenResult):
pattern = (screenResult['pattern'],)
self.patternTable.add_row(*map(str, pattern))
def action_edit_pattern(self):
selectedPatternDescriptor = self.getSelectedPatternDescriptor()
if selectedPatternDescriptor:
selectedPatternId = self.__pc.findPattern(selectedPatternDescriptor)
if selectedPatternId is None:
raise click.ClickException(f"ShowDetailsScreen.action_edit_pattern(): Pattern to remove has no id")
self.app.push_screen(PatternDetailsScreen(patternId = selectedPatternId, showId = self.show_obj['id']), self.handle_edit_pattern) # <-
def handle_edit_pattern(self, screenResult):
try:
row_key, col_key = self.patternTable.coordinate_to_cell_key(self.patternTable.cursor_coordinate)
self.patternTable.update_cell(row_key, self.column_key_pattern, screenResult['pattern'])
except CellDoesNotExist:
pass
def action_remove_pattern(self):
selectedPatternDescriptor = self.getSelectedPatternDescriptor()
if selectedPatternDescriptor:
selectedPatternId = self.__pc.findPattern(selectedPatternDescriptor)
if selectedPatternId is None:
raise click.ClickException(f"ShowDetailsScreen.action_remove_pattern(): Pattern to remove has no id")
self.app.push_screen(PatternDeleteScreen(patternId = selectedPatternId, showId = self.show_obj['id']), self.handle_remove_pattern)
def handle_remove_pattern(self, screenResult):
try:
row_key, col_key = self.patternTable.coordinate_to_cell_key(self.patternTable.cursor_coordinate)
self.patternTable.remove_row(row_key)
except CellDoesNotExist:
pass
def compose(self):
# Create the DataTable widget
self.patternTable = DataTable(classes="five")
# Define the columns with headers
self.column_key_pattern = self.patternTable.add_column("Pattern", width=150)
#self.column_key_name = self.patternTable.add_column("Name", width=50)
#self.column_key_year = self.patternTable.add_column("Year", width=10)
self.patternTable.cursor_type = 'row'
yield Header()
with Grid():
# 1
yield Static("Show" if self.show_obj else "New Show", id="toplabel", classes="five")
# 2
yield Static("ID")
if self.show_obj:
yield Static("", id="id_wdg", classes="four")
else:
yield Input(type="integer", id="id_wdg", classes="four")
# 3
yield Static("Name")
yield Input(type="text", id="name_input", classes="four")
# 4
yield Static("Year")
yield Input(type="integer", id="year_input", classes="four")
#5
yield Static(" ", classes="five")
#6
yield Static("Index Season Digits")
yield Input(type="integer", id="index_season_digits_input", classes="four")
#7
yield Static("Index Episode Digits")
yield Input(type="integer", id="index_episode_digits_input", classes="four")
#8
yield Static("Indicator Season Digits")
yield Input(type="integer", id="indicator_season_digits_input", classes="four")
#9
yield Static("Indicator Edisode Digits")
yield Input(type="integer", id="indicator_episode_digits_input", classes="four")
# 10
yield Static(" ", classes="five")
# 11
yield Static("File patterns", classes="five")
# 12
yield self.patternTable
# 13
yield Static(" ", classes="five")
# 14
yield Button("Save", id="save_button")
yield Button("Cancel", id="cancel_button")
yield Footer()
def getShowDescriptorFromInput(self):
showDescriptor = {}
if self.show_obj:
showDescriptor['id'] = int(self.show_obj['id'])
else:
showDescriptor['id'] = int(self.query_one("#id_wdg", Input).value)
showDescriptor['name'] = str(self.query_one("#name_input", Input).value)
showDescriptor['year'] = int(self.query_one("#year_input", Input).value)
showDescriptor['index_season_digits'] = int(self.query_one("#index_season_digits_input", Input).value)
showDescriptor['index_episode_digits'] = int(self.query_one("#index_episode_digits_input", Input).value)
showDescriptor['indicator_season_digits'] = int(self.query_one("#indicator_season_digits_input", Input).value)
showDescriptor['indicator_episode_digits'] = int(self.query_one("#indicator_episode_digits_input", Input).value)
return showDescriptor
# Event handler for button press
def on_button_pressed(self, event: Button.Pressed) -> None:
# Check if the button pressed is the one we are interested in
if event.button.id == "save_button":
showDescriptor = self.getShowDescriptorFromInput()
if self.__sc.updateShow(showDescriptor):
self.dismiss(showDescriptor)
else:
#TODO: Meldung
self.app.pop_screen()
if event.button.id == "cancel_button":
self.app.pop_screen()

View File

@@ -1,172 +0,0 @@
import click
from textual.app import App, ComposeResult
from textual.screen import Screen
from textual.widgets import Header, Footer, Placeholder, Label, ListView, ListItem, Static, DataTable, Button
from textual.containers import Grid, Horizontal
from ffx.model.show import Show
from .show_details_screen import ShowDetailsScreen
from .show_delete_screen import ShowDeleteScreen
from .help_screen import HelpScreen
from textual.widgets._data_table import CellDoesNotExist
class ShowsScreen(Screen):
CSS = """
Grid {
grid-size: 1;
grid-rows: 2 auto;
height: 100%;
width: 100%;
padding: 1;
}
#top {
height: 1;
}
#two {
column-span: 2;
row-span: 2;
tint: magenta 40%;
}
.box {
height: 100%;
border: solid green;
}
"""
BINDINGS = [
("e", "edit_show", "Edit Show"),
("n", "new_show", "New Show"),
("d", "delete_show", "Delete Show"),
]
def __init__(self):
super().__init__()
self.context = self.app.getContext()
self.Session = self.context['database_session'] # convenience
def getSelectedShowId(self):
try:
# Fetch the currently selected row when 'Enter' is pressed
#selected_row_index = self.table.cursor_row
row_key, col_key = self.table.coordinate_to_cell_key(self.table.cursor_coordinate)
if row_key is not None:
selected_row_data = self.table.get_row(row_key)
return selected_row_data[0]
except CellDoesNotExist:
return None
def action_new_show(self):
self.app.push_screen(ShowDetailsScreen(), self.handle_new_screen)
def handle_new_screen(self, screenResult):
show = (screenResult['id'], screenResult['name'], screenResult['year'])
self.table.add_row(*map(str, show))
def action_edit_show(self):
selectedShowId = self.getSelectedShowId()
if selectedShowId is not None:
self.app.push_screen(ShowDetailsScreen(showId = selectedShowId), self.handle_edit_screen)
def handle_edit_screen(self, screenResult):
try:
row_key, col_key = self.table.coordinate_to_cell_key(self.table.cursor_coordinate)
self.table.update_cell(row_key, self.column_key_name, screenResult['name'])
self.table.update_cell(row_key, self.column_key_year, screenResult['year'])
except CellDoesNotExist:
pass
def action_delete_show(self):
selectedShowId = self.getSelectedShowId()
if selectedShowId is not None:
self.app.push_screen(ShowDeleteScreen(showId = selectedShowId), self.handle_delete_show)
def handle_delete_show(self, screenResult):
try:
row_key, col_key = self.table.coordinate_to_cell_key(self.table.cursor_coordinate)
self.table.remove_row(row_key)
except CellDoesNotExist:
pass
def loadShows(self):
try:
s = self.Session()
q = s.query(Show)
return [(int(s.id), s.name, s.year) for s in q.all()]
except Exception as ex:
raise click.ClickException(f"ShowsScreen.loadShows(): {repr(ex)}")
finally:
s.close()
def on_mount(self) -> None:
for show in self.loadShows():
self.table.add_row(*map(str, show)) # Convert each element to a string before adding
def compose(self):
# Create the DataTable widget
self.table = DataTable()
# Define the columns with headers
self.column_key_id = self.table.add_column("ID", width=10)
self.column_key_name = self.table.add_column("Name", width=50)
self.column_key_year = self.table.add_column("Year", width=10)
self.table.cursor_type = 'row'
yield Header()
with Grid():
yield Static("Shows")
yield self.table
yield Footer()

6
bin/ffx/stream_type.py Normal file
View File

@@ -0,0 +1,6 @@
from enum import Enum
class StreamType(Enum):
VIDEO = 1
AUDIO = 2
SUBTITLE = 3

View File

@@ -1,98 +0,0 @@
import os, click, requests
class TmdbController():
DEFAULT_LANGUAGE = 'de-DE'
def __init__(self):
try:
self.__tmdbApiKey = os.environ['TMDB_API_KEY']
except KeyError:
click.ClickException('TMDB api key is not available, please set environment variable TMDB_API_KEY')
self.tmdbLanguage = TmdbController.DEFAULT_LANGUAGE
def queryTmdb(self, showId, season, episode):
"""
First level keys in the response object:
air_date str 'YYY-MM-DD'
crew []
episode_number int
guest_stars []
name str
overview str
id int
production_code
runtime int
season_number int
still_path str '/filename.jpg'
vote_average float
vote_count int
"""
urlParams = f"?language={self.tmdbLanguage}&api_key={self.__tmdbApiKey}"
tmdbUrl = f"https://api.themoviedb.org/3/tv/{showId}/season/{season}/episode/{episode}{urlParams}"
return requests.get(tmdbUrl).json()
def getEpisodeFilename(self,
showName,
episodeName,
season,
episode,
extension,
indexSeasonDigits = 2,
indexEpisodeDigits = 2,
indicatorSeasonDigits = 2,
indicatorEpisodeDigits = 2):
"""
One Piece:
indexSeasonDigits = 0,
indexEpisodeDigits = 4,
indicatorSeasonDigits = 2,
indicatorEpisodeDigits = 4
Three-Body:
indexSeasonDigits = 0,
indexEpisodeDigits = 2,
indicatorSeasonDigits = 2,
indicatorEpisodeDigits = 2
Dragonball:
indexSeasonDigits = 0,
indexEpisodeDigits = 3,
indicatorSeasonDigits = 2,
indicatorEpisodeDigits = 3
Boruto:
indexSeasonDigits = 0,
indexEpisodeDigits = 4,
indicatorSeasonDigits = 2,
indicatorEpisodeDigits = 4
"""
filenameTokens = [str(showName), ' - ']
if indexSeasonDigits:
filenameTokens += ['{num:{fill}{width}}'.format(num=season, fill='0', width=indexSeasonDigits)]
if indexEpisodeDigits:
filenameTokens += ['{num:{fill}{width}}'.format(num=episode, fill='0', width=indexEpisodeDigits)]
if indexSeasonDigits or indexEpisodeDigits:
filenameTokens += [' ']
filenameTokens += [episodeName]
if indicatorSeasonDigits or indicatorEpisodeDigits:
filenameTokens += [' - ']
if indicatorSeasonDigits:
filenameTokens += ['S{num:{fill}{width}}'.format(num=season, fill='0', width=indicatorSeasonDigits)]
if indicatorEpisodeDigits:
filenameTokens += ['E{num:{fill}{width}}'.format(num=episode, fill='0', width=indicatorEpisodeDigits)]
filenameTokens += ['.', extension]
return ''.join(filenameTokens)

View File

@@ -1,182 +0,0 @@
import click
from ffx.model.track import Track
from .track_type import TrackType
from .track_disposition import TrackDisposition
from .iso_language import IsoLanguage
from .track_type import TrackType
class TrackController():
def __init__(self, context):
self.context = context
self.Session = self.context['database_session'] # convenience
def addTrack(self, trackDescriptor):
try:
s = self.Session()
track = Track(pattern_id = int(trackDescriptor['pattern_id']),
track_type = int(trackDescriptor['type'].value),
sub_index = int(trackDescriptor['sub_index']),
language = str(trackDescriptor['language'].threeLetter()),
title = str(trackDescriptor['title']),
disposition_flags = int(TrackDisposition.toFlags(trackDescriptor['disposition_list'])))
s.add(track)
s.commit()
except Exception as ex:
raise click.ClickException(f"TrackController.addTrack(): {repr(ex)}")
finally:
s.close()
def updateTrack(self, trackId, trackDescriptor):
try:
s = self.Session()
q = s.query(Track).filter(Track.id == int(trackId))
if q.count():
track = q.first()
track.sub_index = int(trackDescriptor['sub_index'])
track.language = str(trackDescriptor['language'].threeLetter())
track.title = str(trackDescriptor['title'])
track.disposition_flags = int(TrackDisposition.toFlags(trackDescriptor['disposition_list']))
s.commit()
return True
else:
return False
except Exception as ex:
raise click.ClickException(f"TrackController.addTrack(): {repr(ex)}")
finally:
s.close()
def findAllTracks(self, patternId):
try:
s = self.Session()
trackDescriptors = {}
trackDescriptors[TrackType.AUDIO.label()] = []
trackDescriptors[TrackType.SUBTITLE.label()] = []
q_audio = s.query(Track).filter(Track.pattern_id == int(patternId), Track.track_type == TrackType.AUDIO.value)
for audioTrack in q_audio.all():
trackDescriptors[TrackType.AUDIO.label()].append(audioTrack.id)
q_subtitle = s.query(Track).filter(Track.pattern_id == int(patternId), Track.track_type == TrackType.SUBTITLE.value)
for subtitleTrack in q_subtitle.all():
trackDescriptors[TrackType.SUBTITLE.label()].append(subtitleTrack.id)
return trackDescriptors
except Exception as ex:
raise click.ClickException(f"TrackController.findAllTracks(): {repr(ex)}")
finally:
s.close()
def findTrack(self, patternId, trackType : TrackType, subIndex):
try:
s = self.Session()
q = s.query(Track).filter(Track.pattern_id == int(patternId), Track.track_type == trackType.value, Track.sub_index == int(subIndex))
if q.count():
track = q.first()
return int(track.id)
else:
return None
except Exception as ex:
raise click.ClickException(f"TrackController.findTrack(): {repr(ex)}")
finally:
s.close()
def getTrackDict(self, track):
trackDescriptor = {}
trackDescriptor['id'] = int(track.id)
trackDescriptor['pattern_id'] = int(track.pattern_id)
trackDescriptor['type'] = TrackType(track.track_type)
trackDescriptor['sub_index'] = int(track.sub_index)
trackDescriptor['language'] = IsoLanguage.findThreeLetter(track.language)
trackDescriptor['title'] = str(track.title)
trackDescriptor['disposition_list'] = TrackDisposition.toList(track.disposition_flags)
return trackDescriptor
def getTrackDescriptor(self, trackId):
try:
s = self.Session()
q = s.query(Track).filter(Track.id == int(trackId))
if q.count():
track = q.first()
return self.getTrackDict(track)
else:
return {}
except Exception as ex:
raise click.ClickException(f"TrackController.getTrackDescriptor(): {repr(ex)}")
finally:
s.close()
def deleteTrack(self, trackId):
try:
s = self.Session()
q = s.query(Track).filter(Track.id == int(trackId))
if q.count():
trackDescriptor = self.getTrackDict(q.first())
q_siblings = s.query(Track).filter(Track.pattern_id == int(trackDescriptor['pattern_id']), Track.track_type == trackDescriptor['type'].value).order_by(Track.sub_index)
subIndex = 0
for track in q_siblings.all():
if track.sub_index == trackDescriptor['sub_index']:
s.delete(track)
else:
track.sub_index = subIndex
subIndex += 1
s.commit()
return True
return False
except Exception as ex:
raise click.ClickException(f"TrackController.deleteTrack(): {repr(ex)}")
finally:
s.close()

View File

@@ -1,138 +0,0 @@
import click
from textual import events
from textual.app import App, ComposeResult
from textual.screen import Screen
from textual.widgets import Header, Footer, Placeholder, Label, ListView, ListItem, Static, DataTable, Button, Input
from textual.containers import Grid, Horizontal
from ffx.model.pattern import Pattern
# from .show_controller import ShowController
# from .pattern_controller import PatternController
from .track_controller import TrackController
from .track_type import TrackType
# Screen[dict[int, str, int]]
class TrackDeleteScreen(Screen):
CSS = """
Grid {
grid-size: 4 9;
grid-rows: 2 2 2 2 2 2 2 2 2;
grid-columns: 30 30 30 30;
height: 100%;
width: 100%;
padding: 1;
}
Input {
border: none;
}
Button {
border: none;
}
#toplabel {
height: 1;
}
.two {
column-span: 2;
}
.three {
column-span: 3;
}
.four {
column-span: 4;
}
.box {
height: 100%;
border: solid green;
}
"""
def __init__(self, trackId = None):
super().__init__()
self.context = self.app.getContext()
self.Session = self.context['database_session'] # convenience
if trackId is None:
raise click.ClickException('TrackDeleteScreen.init(): trackId is required to be set')
self.__tc = TrackController(context = self.context)
self.track_obj = self.__tc.getTrackDescriptor(trackId)
def on_mount(self):
self.query_one("#subindexlabel", Static).update(str(self.track_obj['sub_index']))
self.query_one("#patternlabel", Static).update(str(self.track_obj['pattern_id']))
self.query_one("#languagelabel", Static).update(str(self.track_obj['language'].label()))
self.query_one("#titlelabel", Static).update(str(str(self.track_obj['title'])))
def compose(self):
yield Header()
with Grid():
#1
yield Static(f"Are you sure to delete the following {self.track_obj['type'].label()} track?", id="toplabel", classes="four")
#2
yield Static("sub index")
yield Static(" ", id="subindexlabel", classes="three")
#3
yield Static("from pattern")
yield Static(" ", id="patternlabel", classes="three")
#4
yield Static(" ", classes="four")
#5
yield Static("Language")
yield Static(" ", id="languagelabel", classes="three")
#6
yield Static("Title")
yield Static(" ", id="titlelabel", classes="three")
#7
yield Static(" ", classes="four")
#8
yield Static(" ", classes="four")
#9
yield Button("Delete", id="delete_button")
yield Button("Cancel", id="cancel_button")
yield Footer()
# Event handler for button press
def on_button_pressed(self, event: Button.Pressed) -> None:
if event.button.id == "delete_button":
trackId = self.__tc.findTrack(self.track_obj['pattern_id'], self.track_obj['type'], self.track_obj['sub_index'])
if trackId is not None:
if self.__tc.deleteTrack(trackId):
self.dismiss(self.track_obj)
else:
#TODO: Meldung
self.app.pop_screen()
if event.button.id == "cancel_button":
self.app.pop_screen()

View File

@@ -1,291 +0,0 @@
import click, time
from textual import events
from textual.app import App, ComposeResult
from textual.screen import Screen
from textual.widgets import Header, Footer, Placeholder, Label, ListView, ListItem, Static, DataTable, Button, Input, Checkbox, SelectionList, Select
from textual.containers import Grid, Horizontal
from ffx.model.show import Show
from ffx.model.pattern import Pattern
from .track_controller import TrackController
from .pattern_controller import PatternController
# from .show_controller import ShowController
from .track_type import TrackType
from .iso_language import IsoLanguage
from .track_disposition import TrackDisposition
# Screen[dict[int, str, int]]
class TrackDetailsScreen(Screen):
CSS = """
Grid {
grid-size: 5 20;
grid-rows: 2 2 2 2 2 3 2 2 2 2 2 6 2 2 6 2 2 2 2 6;
grid-columns: 25 25 25 25 225;
height: 100%;
width: 100%;
padding: 1;
}
Input {
border: none;
}
Button {
border: none;
}
SelectionList {
border: none;
min-height: 6;
}
Select {
border: none;
}
DataTable {
min-height: 6;
}
#toplabel {
height: 1;
}
.two {
column-span: 2;
}
.three {
column-span: 3;
}
.four {
column-span: 4;
}
.five {
column-span: 5;
}
.box {
height: 100%;
border: solid green;
}
"""
def __init__(self, trackId = None, patternId = None, trackType : TrackType = None, subIndex = None):
super().__init__()
self.context = self.app.getContext()
self.Session = self.context['database_session'] # convenience
self.__tc = TrackController(context = self.context)
self.__pc = PatternController(context = self.context)
self.track_obj = self.__tc.getTrackDescriptor(trackId) if trackId is not None else {}
if self.track_obj:
self.trackType = self.track_obj['type']
self.subIndex = self.track_obj['sub_index']
self.pattern_obj = self.__pc.getPatternDescriptor(self.track_obj['pattern_id'])
self.track_obj['is_new'] = False
else:
self.trackType = trackType
self.subIndex = subIndex
self.pattern_obj = self.__pc.getPatternDescriptor(patternId) if patternId is not None else {}
self.track_obj['is_new'] = True
if self.trackType is None:
raise click.ClickException('Track type is required to be set')
if self.subIndex is None:
raise click.ClickException('Sub index for track is required to be set')
def on_mount(self):
if self.pattern_obj:
self.query_one("#patternlabel", Static).update(str(self.pattern_obj['pattern']))
if self.subIndex is not None:
self.query_one("#subindexlabel", Static).update(str(self.subIndex))
for d in TrackDisposition:
dispositionIsSet = (self.track_obj
and 'disposition_list' in self.track_obj.keys()
and d in self.track_obj['disposition_list'])
disposition = (d.label(), d.index(), dispositionIsSet)
self.query_one("#dispositions_selection_list", SelectionList).add_option(disposition)
if 'language' in self.track_obj.keys():
self.query_one("#language_select", Select).value = self.track_obj['language'].label()
if 'title' in self.track_obj.keys():
self.query_one("#title_input", Input).value = str(self.track_obj['title'])
def compose(self):
self.trackTagsTable = DataTable(classes="five")
# Define the columns with headers
self.column_key_track_tag_key = self.trackTagsTable.add_column("Key", width=10)
self.column_key_track_tag_value = self.trackTagsTable.add_column("Value", width=30)
self.trackTagsTable.cursor_type = 'row'
languages = [l.label() for l in IsoLanguage]
yield Header()
with Grid():
# 1
yield Static(f"New {self.trackType.label()} stream" if self.track_obj['is_new'] else f"Edit {self.trackType.label()} stream", id="toplabel", classes="five")
# 2
yield Static("for pattern")
yield Static("", id="patternlabel", classes="four")
# 3
yield Static("sub index")
yield Static("", id="subindexlabel", classes="four")
# 4
yield Static(" ", classes="five")
# 5
yield Static(" ", classes="five")
# 6
yield Static("Language")
yield Select.from_values(languages, classes="four", id="language_select")
# 7
yield Static(" ", classes="five")
# 8
yield Static("Title")
yield Input(id="title_input", classes="four")
# 9
yield Static(" ", classes="five")
# 10
yield Static(" ", classes="five")
# 11
yield Static("Stream tags")
yield Static(" ", classes="two")
yield Button("Add", id="button_add_stream_tag")
yield Button("Delete", id="button_delete_stream_tag")
# 12
yield self.trackTagsTable
# 13
yield Static(" ", classes="five")
# 14
yield Static("Stream dispositions", classes="five")
# 15
yield SelectionList[int](
classes="five",
id = "dispositions_selection_list"
)
# 16
yield Static(" ", classes="five")
# 17
yield Static(" ", classes="five")
# 18
yield Button("Save", id="save_button")
yield Button("Cancel", id="cancel_button")
# 19
yield Static(" ", classes="five")
# 20
yield Static(" ", classes="five", id="messagestatic")
yield Footer(id="footer")
def getTrackDescriptorFromInput(self):
trackDescriptor = {}
trackDescriptor['pattern_id'] = int(self.pattern_obj['id'])
trackDescriptor['type'] = TrackType(self.trackType)
trackDescriptor['sub_index'] = self.subIndex
trackDescriptor['language'] = IsoLanguage.find(str(self.query_one("#language_select", Select).value))
trackDescriptor['title'] = str(self.query_one("#title_input", Input).value)
disposition_flags = sum([2**f for f in self.query_one("#dispositions_selection_list", SelectionList).selected])
trackDescriptor['disposition_list'] = TrackDisposition.toList(disposition_flags)
return trackDescriptor
# Event handler for button press
def on_button_pressed(self, event: Button.Pressed) -> None:
# Check if the button pressed is the one we are interested in
if event.button.id == "save_button":
trackDescriptor = self.getTrackDescriptorFromInput()
# Check for multiple default/forced disposition flags
trackIdList = self.__tc.findAllTracks(self.pattern_obj['id'])[self.trackType.label()]
descriptorList = [d for d in (self.__tc.getTrackDescriptor(t) for t in trackIdList)
if d['type'] == self.trackType
and d['sub_index'] != self.subIndex]
numDefaultTracks = [d for d in descriptorList if TrackDisposition.DEFAULT in d['disposition_list']]
numForcedTracks = [d for d in descriptorList if TrackDisposition.FORCED in d['disposition_list']]
doubleDefaultOrForced = ((TrackDisposition.DEFAULT in trackDescriptor['disposition_list'] and numDefaultTracks)
or (TrackDisposition.FORCED in trackDescriptor['disposition_list'] and numForcedTracks))
if doubleDefaultOrForced:
self.query_one("#messagestatic", Static).update("Cannot add another stream with disposition flag 'debug' or 'forced' set")
else:
self.query_one("#messagestatic", Static).update(" ")
if self.track_obj['is_new']:
self.__tc.addTrack(trackDescriptor)
self.dismiss(trackDescriptor)
else:
trackId = self.__tc.findTrack(self.pattern_obj['id'], self.trackType, self.subIndex)
if self.__tc.updateTrack(trackId, trackDescriptor):
self.dismiss(trackDescriptor)
else:
self.app.pop_screen()
if event.button.id == "cancel_button":
self.app.pop_screen()

View File

@@ -1,49 +0,0 @@
from enum import Enum
import difflib
class TrackDisposition(Enum):
DEFAULT = {"name": "default", "index": 0}
FORCED = {"name": "forced", "index": 1}
DUB = {"name": "dub", "index": 2}
ORIGINAL = {"name": "original", "index": 3}
COMMENT = {"name": "comment", "index": 4}
LYRICS = {"name": "lyrics", "index": 5}
KARAOKE = {"name": "karaoke", "index": 6}
HEARING_IMPAIRED = {"name": "hearing_impaired", "index": 7}
VISUAL_IMPAIRED = {"name": "visual_impaired", "index": 8}
CLEAN_EFFECTS = {"name": "clean_effects", "index": 9}
ATTACHED_PIC = {"name": "attached_pic", "index": 10}
TIMED_THUMBNAILS = {"name": "timed_thumbnails", "index": 11}
NON_DIEGETICS = {"name": "non_diegetic", "index": 12}
CAPTIONS = {"name": "captions", "index": 13}
DESCRIPTIONS = {"name": "descriptions", "index": 14}
METADATA = {"name": "metadata", "index": 15}
DEPENDENT = {"name": "dependent", "index": 16}
STILL_IMAGE = {"name": "still_image", "index": 17}
def label(self):
return str(self.value['name'])
def index(self):
return int(self.value['index'])
@staticmethod
def toFlags(dispositionList):
"""Flags stored in integer bits (2**index)"""
flags = 0
for d in dispositionList:
flags += 2 ** d.index()
return flags
@staticmethod
def toList(flags):
dispositionList = []
for d in TrackDisposition:
if flags & int(2 ** d.index()):
dispositionList += [d]
return dispositionList

View File

@@ -1,19 +0,0 @@
from enum import Enum
class TrackType(Enum):
VIDEO = 1
AUDIO = 2
SUBTITLE = 3
def label(self):
"""Returns the stream type as string"""
labels = {
TrackType.VIDEO: "video",
TrackType.AUDIO: "audio",
TrackType.SUBTITLE: "subtitle"
}
return labels.get(self, "undefined")

View File

@@ -1,11 +0,0 @@
from textual.app import App, ComposeResult
from textual.screen import Screen
from textual.widgets import Header, Footer, Placeholder, Label
class WarningScreen(Screen):
def __init__(self):
super().__init__()
context = self.app.getContext()
def compose(self) -> ComposeResult:
yield Label("Warning! This file is not compliant to the defined source schema!")
yield Footer()