Misc Opts

This commit is contained in:
Javanaut
2026-04-11 16:52:58 +02:00
parent 609f93b783
commit 9611930949
16 changed files with 516 additions and 178 deletions

View File

@@ -15,6 +15,8 @@ if __package__ in (None, ''):
from ffx.constants import (
DEFAULT_AC3_BANDWIDTH,
DEFAULT_CROPDETECT_DURATION_SECONDS,
DEFAULT_CROPDETECT_SEEK_SECONDS,
DEFAULT_CONTAINER_EXTENSION,
DEFAULT_CONTAINER_FORMAT,
DEFAULT_DTS_BANDWIDTH,
@@ -29,12 +31,20 @@ if TYPE_CHECKING:
from ffx.media_descriptor import MediaDescriptor
from ffx.track_descriptor import TrackDescriptor
LIGHTWEIGHT_COMMANDS = {None, 'version', 'help', 'configure_workstation', 'upgrade'}
LIGHTWEIGHT_COMMANDS = {None, 'version', 'help', 'setup', 'configure_workstation', 'upgrade'}
CPU_OPTION_HELP = (
"Limit CPU for started processes. Use an absolute cpulimit value such as 200 "
+ "(about 2 cores), or use a percentage such as 25% for a share of present cores. "
+ "Omit to disable; 0 also disables."
)
CROPDETECT_SEEK_OPTION_HELP = (
"Start crop detection this many seconds into the input. "
+ "Useful for skipping logos, intros, or black frames."
)
CROPDETECT_DURATION_OPTION_HELP = (
"Analyze this many seconds for crop detection. "
+ "Shorter windows are faster; longer windows are usually steadier."
)
def normalizeNicenessOption(ctx, param, value):
@@ -111,7 +121,9 @@ def version():
@ffx.command()
def help():
click.echo(f"ffx {VERSION}\n")
click.echo(f"Usage: ffx [input file] [output file] [vp9|av1] [q=[nn[,nn,...]]] [p=nn] [a=nnn[k]] [ac3=nnn[k]] [dts=nnn[k]] [crop]")
click.echo("Maintenance commands: setup, configure_workstation, upgrade")
click.echo("Media commands: shows, inspect, convert, unmux, cropdetect")
click.echo("Use 'ffx --help' or 'ffx <command> --help' for full command help.")
def getRepoRootPath():
@@ -123,6 +135,10 @@ def getConfigureWorkstationScriptPath():
return os.path.join(getRepoRootPath(), 'tools', 'configure_workstation.sh')
def getSetupScriptPath():
return os.path.join(getRepoRootPath(), 'tools', 'setup.sh')
def getBundleVenvDirectory():
return os.path.join(os.path.expanduser('~'), '.local', 'share', 'ffx.venv')
@@ -153,23 +169,11 @@ def getTrackedGitChanges(repoPath):
return [line for line in completed.stdout.splitlines() if line.strip()]
@ffx.command(name='configure_workstation')
@click.pass_context
@click.option('--check', is_flag=True, default=False, help='Only verify workstation-configuration readiness')
@click.argument('configure_args', nargs=-1, type=click.UNPROCESSED)
def configure_workstation(ctx, check, configure_args):
"""Prepare workstation dependencies and local config after bundle install."""
configureScriptPath = getConfigureWorkstationScriptPath()
def runScriptWrapper(ctx, scriptPath, missingDescription, commandArgs):
if not os.path.isfile(scriptPath):
raise click.ClickException(f"{missingDescription} not found at {scriptPath}")
if not os.path.isfile(configureScriptPath):
raise click.ClickException(f"Workstation configuration script not found at {configureScriptPath}")
commandSequence = ['bash', configureScriptPath]
if check:
commandSequence.append('--check')
commandSequence += list(configure_args)
commandSequence = ['bash', scriptPath] + list(commandArgs)
if ctx.obj.get('dry_run', False):
click.echo(' '.join(commandSequence))
@@ -179,6 +183,44 @@ def configure_workstation(ctx, check, configure_args):
ctx.exit(completed.returncode)
@ffx.command(name='setup')
@click.pass_context
@click.option('--check', is_flag=True, default=False, help='Only verify bundle-setup readiness')
@click.option('--with-tests', is_flag=True, default=False, help='Also install or verify Python test packages in the bundle venv')
@click.argument('setup_args', nargs=-1, type=click.UNPROCESSED)
def setup(ctx, check, with_tests, setup_args):
"""Prepare or repair the FFX bundle virtualenv and shell alias."""
commandArgs = []
if check:
commandArgs.append('--check')
if with_tests:
commandArgs.append('--with-tests')
commandArgs += list(setup_args)
runScriptWrapper(ctx, getSetupScriptPath(), "Bundle setup script", commandArgs)
@ffx.command(name='configure_workstation')
@click.pass_context
@click.option('--check', is_flag=True, default=False, help='Only verify workstation-configuration readiness')
@click.argument('configure_args', nargs=-1, type=click.UNPROCESSED)
def configure_workstation(ctx, check, configure_args):
"""Prepare workstation dependencies and local config after bundle install."""
commandArgs = []
if check:
commandArgs.append('--check')
commandArgs += list(configure_args)
runScriptWrapper(
ctx,
getConfigureWorkstationScriptPath(),
"Workstation configuration script",
commandArgs,
)
@ffx.command(name='upgrade')
@click.pass_context
@click.option('--branch', type=str, default='', help='Checkout this branch before pulling')
@@ -389,10 +431,26 @@ def unmux(ctx,
show_default='disabled',
help=CPU_OPTION_HELP,
)
@click.option(
'--crop-seek',
type=click.IntRange(min=0),
default=DEFAULT_CROPDETECT_SEEK_SECONDS,
show_default=True,
help=CROPDETECT_SEEK_OPTION_HELP,
)
@click.option(
'--crop-duration',
type=click.IntRange(min=1),
default=DEFAULT_CROPDETECT_DURATION_SECONDS,
show_default=True,
help=CROPDETECT_DURATION_OPTION_HELP,
)
def cropdetect(ctx,
paths,
nice,
cpu):
cpu,
crop_seek,
crop_duration):
from ffx.file_properties import FileProperties
existingSourcePaths = [p for p in paths if os.path.isfile(p)]
@@ -402,6 +460,10 @@ def cropdetect(ctx,
ctx.obj['resource_limits']['niceness'] = nice
ctx.obj['resource_limits']['cpu_limit'] = cpu
ctx.obj['resource_limits']['cpu_percent'] = cpu
ctx.obj['cropdetect'] = {
'seek_seconds': crop_seek,
'duration_seconds': crop_duration,
}
for sourcePath in existingSourcePaths:
@@ -409,7 +471,7 @@ def cropdetect(ctx,
try:
fp = FileProperties(ctx.obj, sourcePath)
cropParams = fp.findCropParams()
cropParams = fp.findCropArguments()
click.echo(cropParams)
@@ -506,6 +568,20 @@ def checkUniqueDispositions(context, mediaDescriptor: MediaDescriptor):
@click.option('--rearrange-streams', type=str, default="", help='Rearrange output streams order. Use format comma separated integers')
@click.option("--crop", is_flag=False, flag_value="auto", default="none")
@click.option(
'--crop-seek',
type=click.IntRange(min=0),
default=DEFAULT_CROPDETECT_SEEK_SECONDS,
show_default=True,
help='When --crop auto is used, start crop detection this many seconds into the input.',
)
@click.option(
'--crop-duration',
type=click.IntRange(min=1),
default=DEFAULT_CROPDETECT_DURATION_SECONDS,
show_default=True,
help='When --crop auto is used, analyze this many seconds for crop detection.',
)
@click.option("--cut", is_flag=False, flag_value="default", default="none")
@click.option("--output-directory", type=str, default='')
@@ -578,6 +654,8 @@ def convert(ctx,
rearrange_streams,
crop,
crop_seek,
crop_duration,
cut,
output_directory,
@@ -652,6 +730,10 @@ def convert(ctx,
context['resource_limits']['niceness'] = nice
context['resource_limits']['cpu_limit'] = cpu
context['resource_limits']['cpu_percent'] = cpu
context['cropdetect'] = {
'seek_seconds': crop_seek,
'duration_seconds': crop_duration,
}
context['import_subtitles'] = (subtitle_directory and subtitle_prefix)

View File

@@ -16,6 +16,9 @@ DEFAULT_AC3_BANDWIDTH = "256"
DEFAULT_DTS_BANDWIDTH = "320"
DEFAULT_7_1_BANDWIDTH = "384"
DEFAULT_CROPDETECT_SEEK_SECONDS = 60
DEFAULT_CROPDETECT_DURATION_SECONDS = 180
DEFAULT_cut_start = 60
DEFAULT_cut_length = 180

View File

@@ -1,5 +1,11 @@
import os, re, json
from .constants import (
DEFAULT_CROPDETECT_DURATION_SECONDS,
DEFAULT_CROPDETECT_SEEK_SECONDS,
FFMPEG_COMMAND_TOKENS,
FFMPEG_NULL_OUTPUT_TOKENS,
)
from .media_descriptor import MediaDescriptor
from .pattern_controller import PatternController
@@ -11,6 +17,7 @@ from ffx.model.pattern import Pattern
class FileProperties():
_cropdetect_cache: dict[tuple[str, int, int, int, int], dict[str, str]] = {}
FILE_EXTENSIONS = ['mkv', 'mp4', 'avi', 'flv', 'webm']
FFPROBE_COMMAND_TOKENS = ["ffprobe", "-hide_banner", "-show_format", "-show_streams", "-of", "json"]
@@ -81,6 +88,34 @@ class FileProperties():
self.__ffprobeData = None
def _getCropdetectWindow(self):
cropdetectContext = self.context.get('cropdetect', {})
seekSeconds = int(cropdetectContext.get('seek_seconds', DEFAULT_CROPDETECT_SEEK_SECONDS))
durationSeconds = int(cropdetectContext.get('duration_seconds', DEFAULT_CROPDETECT_DURATION_SECONDS))
if seekSeconds < 0:
raise ValueError("Crop detection seek seconds must be zero or greater.")
if durationSeconds <= 0:
raise ValueError("Crop detection duration seconds must be greater than zero.")
return seekSeconds, durationSeconds
def _getCropdetectCacheKey(self):
sourceStat = os.stat(self.__sourcePath)
seekSeconds, durationSeconds = self._getCropdetectWindow()
return (
os.path.abspath(self.__sourcePath),
sourceStat.st_mtime_ns,
sourceStat.st_size,
seekSeconds,
durationSeconds,
)
@classmethod
def _clear_cropdetect_cache(cls):
cls._cropdetect_cache.clear()
def _getFfprobeData(self):
if self.__ffprobeData is not None:
@@ -172,16 +207,25 @@ class FileProperties():
def findCropArguments(self):
""""""
# ffmpeg -i <input.file> -vf cropdetect -f null -
ffprobeOutput, ffprobeError, returnCode = executeProcess(["ffmpeg", "-i",
self.__sourcePath,
"-vf", "cropdetect",
"-ss", "60",
"-t", "180",
"-f", "null", "-"
])
cacheKey = self._getCropdetectCacheKey()
cachedCropArguments = FileProperties._cropdetect_cache.get(cacheKey)
if cachedCropArguments is not None:
self.__logger.debug(
"FileProperties.findCropArguments(): Reusing cached cropdetect result for %s",
self.__sourcePath,
)
return dict(cachedCropArguments)
errorLines = ffprobeError.split('\n')
seekSeconds, durationSeconds = self._getCropdetectWindow()
cropdetectCommand = (
list(FFMPEG_COMMAND_TOKENS)
+ ["-ss", str(seekSeconds), "-i", self.__sourcePath, "-t", str(durationSeconds), "-vf", "cropdetect"]
+ list(FFMPEG_NULL_OUTPUT_TOKENS)
)
_ffmpegOutput, ffmpegError, returnCode = executeProcess(cropdetectCommand, context=self.context)
errorLines = ffmpegError.split('\n')
crops = {}
for el in errorLines:
@@ -194,21 +238,26 @@ class FileProperties():
crops[cropParam] = crops.get(cropParam, 0) + 1
if crops:
cropHistogram = sorted(crops, reverse=True)
cropString = cropHistogram[0]
cropString = max(crops.items(), key=lambda item: (item[1], item[0]))[0]
cropTokens = cropString.split('=')
cropValueTokens = cropTokens[1]
cropValues = cropValueTokens.split(':')
return {
cropArguments = {
CropFilter.OUTPUT_WIDTH_KEY: cropValues[0],
CropFilter.OUTPUT_HEIGHT_KEY: cropValues[1],
CropFilter.OFFSET_X_KEY: cropValues[2],
CropFilter.OFFSET_Y_KEY: cropValues[3]
}
else:
return {}
FileProperties._cropdetect_cache[cacheKey] = dict(cropArguments)
return cropArguments
if returnCode != 0:
raise Exception(f"ffmpeg cropdetect returned with error {returnCode}")
FileProperties._cropdetect_cache[cacheKey] = {}
return {}
def getMediaDescriptor(self):

View File

@@ -6,13 +6,9 @@ from textual.containers import Grid
from ffx.audio_layout import AudioLayout
from .pattern_controller import PatternController
from .show_controller import ShowController
from .track_controller import TrackController
from .tag_controller import TagController
from .show_details_screen import ShowDetailsScreen
from .pattern_details_screen import PatternDetailsScreen
from .screen_support import build_screen_bootstrap, build_screen_controllers
from ffx.track_type import TrackType
from ffx.track_codec import TrackCodec
@@ -135,29 +131,23 @@ class MediaDetailsScreen(Screen):
def __init__(self):
super().__init__()
self.context = self.app.getContext()
self.Session = self.context['database']['session'] # convenience
bootstrap = build_screen_bootstrap(self.app.getContext())
self.context = bootstrap.context
self.__removeGlobalKeys = bootstrap.remove_global_keys
self.__ignoreGlobalKeys = bootstrap.ignore_global_keys
self.__configurationData = self.context['config'].getData()
metadataConfiguration = self.__configurationData['metadata'] if 'metadata' in self.__configurationData.keys() else {}
self.__signatureTags = metadataConfiguration['signature'] if 'signature' in metadataConfiguration.keys() else {}
self.__removeGlobalKeys = metadataConfiguration['remove'] if 'remove' in metadataConfiguration.keys() else []
self.__ignoreGlobalKeys = metadataConfiguration['ignore'] if 'ignore' in metadataConfiguration.keys() else []
self.__removeTrackKeys = (metadataConfiguration['streams']['remove']
if 'streams' in metadataConfiguration.keys()
and 'remove' in metadataConfiguration['streams'].keys() else [])
self.__ignoreTrackKeys = (metadataConfiguration['streams']['ignore']
if 'streams' in metadataConfiguration.keys()
and 'ignore' in metadataConfiguration['streams'].keys() else [])
self.__pc = PatternController(context = self.context)
self.__sc = ShowController(context = self.context)
self.__tc = TrackController(context = self.context)
self.__tac = TagController(context = self.context)
controllers = build_screen_controllers(
self.context,
pattern=True,
show=True,
track=True,
tag=True,
)
self.__pc = controllers['pattern']
self.__sc = controllers['show']
self.__tc = controllers['track']
self.__tac = controllers['tag']
if not 'command' in self.context.keys() or self.context['command'] != 'inspect':
raise click.ClickException(f"MediaDetailsScreen.__init__(): Can only perform command 'inspect'")

View File

@@ -305,6 +305,29 @@ class PatternController:
if session is not None:
session.close()
def getPatternsForShow(self, showId: int) -> list[Pattern]:
if type(showId) is not int:
raise ValueError(
"PatternController.getPatternsForShow(): Argument showId is required to be of type int"
)
session = None
try:
session = self.Session()
return (
session.query(Pattern)
.filter(Pattern.show_id == int(showId))
.order_by(Pattern.id)
.all()
)
except Exception as ex:
raise click.ClickException(f"PatternController.getPatternsForShow(): {repr(ex)}")
finally:
if session is not None:
session.close()
def getPattern(self, patternId: int):
if type(patternId) is not int:

View File

@@ -7,16 +7,12 @@ from textual.containers import Grid
from ffx.model.pattern import Pattern
from .pattern_controller import PatternController
from .show_controller import ShowController
from .track_controller import TrackController
from .tag_controller import TagController
from .track_details_screen import TrackDetailsScreen
from .track_delete_screen import TrackDeleteScreen
from .tag_details_screen import TagDetailsScreen
from .tag_delete_screen import TagDeleteScreen
from .screen_support import build_screen_bootstrap, build_screen_controllers
from ffx.track_type import TrackType
@@ -107,27 +103,23 @@ class PatternDetailsScreen(Screen):
def __init__(self, patternId = None, showId = None):
super().__init__()
self.context = self.app.getContext()
self.Session = self.context['database']['session'] # convenience
bootstrap = build_screen_bootstrap(self.app.getContext())
self.context = bootstrap.context
self.__configurationData = self.context['config'].getData()
self.__removeGlobalKeys = bootstrap.remove_global_keys
self.__ignoreGlobalKeys = bootstrap.ignore_global_keys
metadataConfiguration = self.__configurationData['metadata'] if 'metadata' in self.__configurationData.keys() else {}
self.__signatureTags = metadataConfiguration['signature'] if 'signature' in metadataConfiguration.keys() else {}
self.__removeGlobalKeys = metadataConfiguration['remove'] if 'remove' in metadataConfiguration.keys() else []
self.__ignoreGlobalKeys = metadataConfiguration['ignore'] if 'ignore' in metadataConfiguration.keys() else []
self.__removeTrackKeys = (metadataConfiguration['streams']['remove']
if 'streams' in metadataConfiguration.keys()
and 'remove' in metadataConfiguration['streams'].keys() else [])
self.__ignoreTrackKeys = (metadataConfiguration['streams']['ignore']
if 'streams' in metadataConfiguration.keys()
and 'ignore' in metadataConfiguration['streams'].keys() else [])
self.__pc = PatternController(context = self.context)
self.__sc = ShowController(context = self.context)
self.__tc = TrackController(context = self.context)
self.__tac = TagController(context = self.context)
controllers = build_screen_controllers(
self.context,
pattern=True,
show=True,
track=True,
tag=True,
)
self.__pc = controllers['pattern']
self.__sc = controllers['show']
self.__tc = controllers['track']
self.__tac = controllers['tag']
self.__pattern : Pattern = self.__pc.getPattern(patternId) if patternId is not None else None
self.__showDescriptor = self.__sc.getShowDescriptor(showId) if showId is not None else None
@@ -135,26 +127,6 @@ class PatternDetailsScreen(Screen):
self.__draftTags : dict[str, str] = {}
#TODO: per controller
def loadTracks(self, show_id):
try:
tracks = {}
tracks['audio'] = {}
tracks['subtitle'] = {}
s = self.Session()
q = s.query(Pattern).filter(Pattern.show_id == int(show_id))
return [{'id': int(p.id), 'pattern': p.pattern} for p in q.all()]
except Exception as ex:
raise click.ClickException(f"loadTracks(): {repr(ex)}")
finally:
s.close()
def updateTracks(self):
self.tracksTable.clear()

65
src/ffx/screen_support.py Normal file
View File

@@ -0,0 +1,65 @@
from __future__ import annotations
from dataclasses import dataclass
from .pattern_controller import PatternController
from .show_controller import ShowController
from .shifted_season_controller import ShiftedSeasonController
from .tag_controller import TagController
from .tmdb_controller import TmdbController
from .track_controller import TrackController
@dataclass(frozen=True)
class ScreenBootstrap:
context: dict
configuration_data: dict
signature_tags: dict
remove_global_keys: list
ignore_global_keys: list
remove_track_keys: list
ignore_track_keys: list
def build_screen_bootstrap(context: dict) -> ScreenBootstrap:
configurationData = context['config'].getData()
metadataConfiguration = configurationData.get('metadata', {})
streamMetadataConfiguration = metadataConfiguration.get('streams', {})
return ScreenBootstrap(
context=context,
configuration_data=configurationData,
signature_tags=metadataConfiguration.get('signature', {}),
remove_global_keys=metadataConfiguration.get('remove', []),
ignore_global_keys=metadataConfiguration.get('ignore', []),
remove_track_keys=streamMetadataConfiguration.get('remove', []),
ignore_track_keys=streamMetadataConfiguration.get('ignore', []),
)
def build_screen_controllers(
context: dict,
*,
pattern: bool = False,
show: bool = False,
track: bool = False,
tag: bool = False,
tmdb: bool = False,
shifted_season: bool = False,
) -> dict[str, object]:
controllers = {}
if pattern:
controllers['pattern'] = PatternController(context=context)
if show:
controllers['show'] = ShowController(context=context)
if track:
controllers['track'] = TrackController(context=context)
if tag:
controllers['tag'] = TagController(context=context)
if tmdb:
controllers['tmdb'] = TmdbController()
if shifted_season:
controllers['shifted_season'] = ShiftedSeasonController(context=context)
return controllers

View File

@@ -5,16 +5,9 @@ from textual.widgets import Header, Footer, Static, Button, DataTable, Input
from textual.containers import Grid
from textual.widgets._data_table import CellDoesNotExist
from ffx.model.pattern import Pattern
from .pattern_details_screen import PatternDetailsScreen
from .pattern_delete_screen import PatternDeleteScreen
from .show_controller import ShowController
from .pattern_controller import PatternController
from .tmdb_controller import TmdbController
from .shifted_season_controller import ShiftedSeasonController
from .show_descriptor import ShowDescriptor
from .shifted_season_details_screen import ShiftedSeasonDetailsScreen
@@ -23,6 +16,7 @@ from .shifted_season_delete_screen import ShiftedSeasonDeleteScreen
from ffx.model.shifted_season import ShiftedSeason
from .helper import filterFilename
from .screen_support import build_screen_bootstrap, build_screen_controllers
# Screen[dict[int, str, int]]
@@ -94,31 +88,24 @@ class ShowDetailsScreen(Screen):
def __init__(self, showId = None):
super().__init__()
self.context = self.app.getContext()
self.Session = self.context['database']['session'] # convenience
self.__sc = ShowController(context = self.context)
self.__pc = PatternController(context = self.context)
self.__tc = TmdbController()
self.__ssc = ShiftedSeasonController(context = self.context)
bootstrap = build_screen_bootstrap(self.app.getContext())
self.context = bootstrap.context
controllers = build_screen_controllers(
self.context,
pattern=True,
show=True,
tmdb=True,
shifted_season=True,
)
self.__sc = controllers['show']
self.__pc = controllers['pattern']
self.__tc = controllers['tmdb']
self.__ssc = controllers['shifted_season']
self.__showDescriptor = self.__sc.getShowDescriptor(showId) if showId is not None else None
def loadPatterns(self, show_id : int):
try:
s = self.Session()
q = s.query(Pattern).filter(Pattern.show_id == int(show_id))
return [{'id': int(p.id), 'pattern': str(p.pattern)} for p in q.all()]
except Exception as ex:
raise click.ClickException(f"ShowDetailsScreen.loadPatterns(): {repr(ex)}")
finally:
s.close()
def updateShiftedSeasons(self):
@@ -166,10 +153,8 @@ class ShowDetailsScreen(Screen):
#raise click.ClickException(f"show_id {showId}")
patternList = self.loadPatterns(showId)
# raise click.ClickException(f"patternList {patternList}")
for pattern in patternList:
row = (pattern['pattern'],)
for pattern in self.__pc.getPatternsForShow(showId):
row = (pattern.getPattern(),)
self.patternTable.add_row(*map(str, row))
self.updateShiftedSeasons()
@@ -489,4 +474,4 @@ class ShowDetailsScreen(Screen):
self.updateShiftedSeasons()
def handle_delete_shifted_season(self, screenResult):
self.updateShiftedSeasons()
self.updateShiftedSeasons()