Release v0.3.1

This commit is contained in:
Javanaut
2026-04-16 19:44:07 +02:00
35 changed files with 892 additions and 125 deletions

View File

@@ -99,6 +99,13 @@ TMDB-backed metadata enrichment requires `TMDB_API_KEY` to be set in the environ
## Version History ## Version History
### 0.3.1
- debug mode screen titles now append the active Textual screen class name, making screen-specific troubleshooting easier during inspect and edit flows
- `--cut` again works as a combined flag/option: omitted disables cutting, bare `--cut` applies the default `60,180`, and explicit duration or `START,DURATION` values stay supported
- H.265 unmux commands no longer force an invalid `-f h265` output format, keeping ffmpeg copy extraction aligned with the required Annex B bitstream filter
- H.264 encoding now falls back from `libx264` to `libopenh264` with a warning when needed, and the test fixtures use the same encoder fallback so the suite remains portable across ffmpeg builds
### 0.3.0 ### 0.3.0
- inspect and edit screens now refresh nested track and pattern changes more reliably, with inspect-mode tables aligned to the target pattern view shown in the differences pane - inspect and edit screens now refresh nested track and pattern changes more reliably, with inspect-mode tables aligned to the target pattern view shown in the differences pane

View File

@@ -1,7 +1,7 @@
[project] [project]
name = "ffx" name = "ffx"
description = "FFX recoding and metadata managing tool" description = "FFX recoding and metadata managing tool"
version = "0.3.0" version = "0.3.1"
license = {file = "LICENSE.md"} license = {file = "LICENSE.md"}
dependencies = [ dependencies = [
"requests", "requests",

View File

@@ -252,9 +252,15 @@ def buildRenameTargetFilename(
@click.pass_context @click.pass_context
@click.option('--language', 'app_language', type=str, default='', help='Set application language') @click.option('--language', 'app_language', type=str, default='', help='Set application language')
@click.option('--database-file', type=str, default='', help='Path to database file') @click.option('--database-file', type=str, default='', help='Path to database file')
@click.option(
'--debug',
is_flag=True,
default=False,
help='Enable debug-only TUI diagnostics such as the log pane',
)
@click.option('-v', '--verbose', type=int, default=0, help='Set verbosity of output') @click.option('-v', '--verbose', type=int, default=0, help='Set verbosity of output')
@click.option("--dry-run", is_flag=True, default=False) @click.option("--dry-run", is_flag=True, default=False)
def ffx(ctx, app_language, database_file, verbose, dry_run): def ffx(ctx, app_language, database_file, debug, verbose, dry_run):
"""FFX""" """FFX"""
ctx.obj = {} ctx.obj = {}
@@ -274,6 +280,7 @@ def ffx(ctx, app_language, database_file, verbose, dry_run):
) )
set_current_language(resolvedLanguage) set_current_language(resolvedLanguage)
ctx.obj['language'] = resolvedLanguage ctx.obj['language'] = resolvedLanguage
ctx.obj['debug'] = bool(debug)
if ctx.invoked_subcommand in LIGHTWEIGHT_COMMANDS: if ctx.invoked_subcommand in LIGHTWEIGHT_COMMANDS:
ctx.obj['dry_run'] = dry_run ctx.obj['dry_run'] = dry_run
@@ -287,6 +294,7 @@ def ffx(ctx, app_language, database_file, verbose, dry_run):
ctx.obj['dry_run'] = dry_run ctx.obj['dry_run'] = dry_run
ctx.obj['verbosity'] = verbose ctx.obj['verbosity'] = verbose
ctx.obj['debug'] = bool(debug)
ctx.obj['language'] = resolve_application_language( ctx.obj['language'] = resolve_application_language(
cli_language=app_language, cli_language=app_language,
config_language=ctx.obj['config'].getLanguage(), config_language=ctx.obj['config'].getLanguage(),
@@ -391,6 +399,20 @@ def runScriptWrapper(ctx, scriptPath, missingDescription, commandArgs):
ctx.exit(completed.returncode) ctx.exit(completed.returncode)
def runTuiApp(ctx) -> None:
from ffx.ffx_app import FfxApp
from ffx.logging_utils import set_ffx_console_logging_enabled
logger = ctx.obj.get('logger')
set_ffx_console_logging_enabled(logger, enabled=False)
try:
app = FfxApp(ctx.obj)
app.run()
finally:
set_ffx_console_logging_enabled(logger, enabled=True)
@ffx.command(name='setup') @ffx.command(name='setup')
@click.pass_context @click.pass_context
@click.option('--check', is_flag=True, default=False, help='Only verify bundle-setup readiness') @click.option('--check', is_flag=True, default=False, help='Only verify bundle-setup readiness')
@@ -527,14 +549,11 @@ def inspect(ctx, shift, filenames):
if len(filenames) != 1: if len(filenames) != 1:
raise click.ClickException("Inspect without --shift requires exactly one filename.") raise click.ClickException("Inspect without --shift requires exactly one filename.")
from ffx.ffx_app import FfxApp
ctx.obj['command'] = 'inspect' ctx.obj['command'] = 'inspect'
ctx.obj['arguments'] = {} ctx.obj['arguments'] = {}
ctx.obj['arguments']['filename'] = filenames[0] ctx.obj['arguments']['filename'] = filenames[0]
app = FfxApp(ctx.obj) runTuiApp(ctx)
app.run()
@ffx.command() @ffx.command()
@@ -544,8 +563,6 @@ def edit(ctx, filename):
if not os.path.isfile(filename): if not os.path.isfile(filename):
raise click.ClickException(f"File not found: {filename}") raise click.ClickException(f"File not found: {filename}")
from ffx.ffx_app import FfxApp
ctx.obj['command'] = 'edit' ctx.obj['command'] = 'edit'
ctx.obj['arguments'] = {'filename': filename} ctx.obj['arguments'] = {'filename': filename}
ctx.obj['use_pattern'] = False ctx.obj['use_pattern'] = False
@@ -554,8 +571,7 @@ def edit(ctx, filename):
ctx.obj['apply_metadata_normalization'] = True ctx.obj['apply_metadata_normalization'] = True
ctx.obj['resource_limits'] = ctx.obj.get('resource_limits', {}) ctx.obj['resource_limits'] = ctx.obj.get('resource_limits', {})
app = FfxApp(ctx.obj) runTuiApp(ctx)
app.run()
@ffx.command() @ffx.command()
@@ -615,21 +631,24 @@ def rename(ctx, paths, prefix, season, suffix, dry_run):
def getUnmuxSequence(trackDescriptor: TrackDescriptor, sourcePath, targetPrefix, targetDirectory = ''): def getUnmuxSequence(trackDescriptor: TrackDescriptor, sourcePath, targetPrefix, targetDirectory = ''):
from ffx.track_codec import TrackCodec
from ffx.track_type import TrackType
# executable and input file # executable and input file
commandTokens = list(FFMPEG_COMMAND_TOKENS) + ['-i', sourcePath] commandTokens = list(FFMPEG_COMMAND_TOKENS) + ['-i', sourcePath]
trackType = trackDescriptor.getType() trackType = trackDescriptor.getType()
trackCodec = trackDescriptor.getCodec()
targetPathBase = os.path.join(targetDirectory, targetPrefix) if targetDirectory else targetPrefix targetPathBase = os.path.join(targetDirectory, targetPrefix) if targetDirectory else targetPrefix
# mapping # mapping
commandTokens += ['-map', commandTokens += ['-map', f"0:{trackType.indicator()}:{trackDescriptor.getSubIndex()}"]
f"0:{trackType.indicator()}:{trackDescriptor.getSubIndex()}",
'-c',
'copy']
trackCodec = trackDescriptor.getCodec() if trackType == TrackType.VIDEO and trackCodec == TrackCodec.H265:
commandTokens += ['-c:v', 'copy', '-bsf:v', 'hevc_mp4toannexb']
else:
commandTokens += ['-c', 'copy']
# output format # output format
codecFormat = trackCodec.format() codecFormat = trackCodec.format()
@@ -837,12 +856,8 @@ def cropdetect(ctx,
@click.pass_context @click.pass_context
def shows(ctx): def shows(ctx):
from ffx.ffx_app import FfxApp
ctx.obj['command'] = 'shows' ctx.obj['command'] = 'shows'
runTuiApp(ctx)
app = FfxApp(ctx.obj)
app.run()
def checkUniqueDispositions(context, mediaDescriptor: MediaDescriptor): def checkUniqueDispositions(context, mediaDescriptor: MediaDescriptor):
@@ -943,7 +958,6 @@ def checkUniqueDispositions(context, mediaDescriptor: MediaDescriptor):
metavar="DURATION|START,DURATION", metavar="DURATION|START,DURATION",
is_flag=False, is_flag=False,
flag_value=DEFAULT_CUT_OPTION_VALUE, flag_value=DEFAULT_CUT_OPTION_VALUE,
default=None,
callback=normalizeCutOption, callback=normalizeCutOption,
help=CUT_OPTION_HELP, help=CUT_OPTION_HELP,
) )

View File

@@ -62,6 +62,13 @@ class ConfirmScreen(Screen):
yield build_screen_log_pane() yield build_screen_log_pane()
yield Footer() yield Footer()
def on_mount(self):
if getattr(self, 'context', {}).get('debug', False):
self.title = f"{self.app.title} - {self.__class__.__name__}"
def on_button_pressed(self, event: Button.Pressed) -> None: def on_button_pressed(self, event: Button.Pressed) -> None:
if event.button.id == "confirm_button": if event.button.id == "confirm_button":
self.dismiss(True) self.dismiss(True)

View File

@@ -1,4 +1,4 @@
VERSION='0.3.0' VERSION='0.3.1'
DATABASE_VERSION = 3 DATABASE_VERSION = 3
DEFAULT_QUALITY = 32 DEFAULT_QUALITY = 32

View File

@@ -4,7 +4,7 @@ from .i18n import set_current_language, t
from .shows_screen import ShowsScreen from .shows_screen import ShowsScreen
from .inspect_details_screen import InspectDetailsScreen from .inspect_details_screen import InspectDetailsScreen
from .media_edit_screen import MediaEditScreen from .media_edit_screen import MediaEditScreen
from .screen_support import toggle_screen_log_pane from .screen_support import configure_screen_log_handler, set_screen_log_pane_enabled
class FfxApp(App): class FfxApp(App):
@@ -14,7 +14,6 @@ class FfxApp(App):
BINDINGS = [ BINDINGS = [
("q", "quit()", t("Quit")), ("q", "quit()", t("Quit")),
("h", "switch_mode('help')", t("Help")), ("h", "switch_mode('help')", t("Help")),
("l", "toggle_log_pane", t("Log")),
] ]
@@ -24,6 +23,13 @@ class FfxApp(App):
# Data 'input' variable # Data 'input' variable
self.context = context self.context = context
set_current_language(self.context.get("language")) set_current_language(self.context.get("language"))
debug_mode = bool(self.context.get("debug", False))
set_screen_log_pane_enabled(debug_mode)
configure_screen_log_handler(
self.context.get("logger"),
self,
enabled=debug_mode,
)
def on_mount(self) -> None: def on_mount(self) -> None:
@@ -43,6 +49,3 @@ class FfxApp(App):
def getContext(self): def getContext(self):
"""Data 'output' method""" """Data 'output' method"""
return self.context return self.context
def action_toggle_log_pane(self) -> None:
toggle_screen_log_pane(self.screen)

View File

@@ -1,4 +1,5 @@
import os, click import os, click, subprocess
from functools import lru_cache
from logging import Logger from logging import Logger
from ffx.media_descriptor_change_set import MediaDescriptorChangeSet from ffx.media_descriptor_change_set import MediaDescriptorChangeSet
@@ -61,6 +62,41 @@ class FfxController():
sourceMediaDescriptor) sourceMediaDescriptor)
self.__logger: Logger = context['logger'] self.__logger: Logger = context['logger']
self.__warnedH264Fallback = False
@staticmethod
@lru_cache(maxsize=None)
def isFfmpegEncoderAvailable(encoderName: str) -> bool:
completed = subprocess.run(
["ffmpeg", "-encoders"],
capture_output=True,
text=True,
check=False,
)
if completed.returncode != 0:
return False
resolvedEncoderName = str(encoderName).strip()
for line in completed.stdout.splitlines():
if not line.startswith(" "):
continue
tokens = line.split(maxsplit=2)
if len(tokens) >= 2 and tokens[1] == resolvedEncoderName:
return True
return False
@classmethod
def getSupportedSoftwareH264Encoder(cls) -> str | None:
if cls.isFfmpegEncoderAvailable("libx264"):
return "libx264"
if cls.isFfmpegEncoderAvailable("libopenh264"):
return "libopenh264"
return None
def executeCommandSequence(self, commandSequence): def executeCommandSequence(self, commandSequence):
@@ -79,11 +115,28 @@ class FfxController():
# -c:v libx264 -preset slow -crf 17 # -c:v libx264 -preset slow -crf 17
def generateH264Tokens(self, quality, subIndex : int = 0): def generateH264Tokens(self, quality, subIndex : int = 0):
h264Encoder = self.getSupportedSoftwareH264Encoder()
if h264Encoder == "libx264":
return [f"-c:v:{int(subIndex)}", 'libx264', return [f"-c:v:{int(subIndex)}", 'libx264',
"-preset", "slow", "-preset", "slow",
'-crf', str(quality)] '-crf', str(quality)]
if h264Encoder == "libopenh264":
if not self.__warnedH264Fallback:
self.__logger.warning(
"libx264 encoder unavailable; falling back to libopenh264 for H.264 encoding."
)
self.__warnedH264Fallback = True
return [f"-c:v:{int(subIndex)}", 'libopenh264',
'-pix_fmt', 'yuv420p']
raise click.ClickException(
"H.264 encoding requested but no supported software H.264 encoder is available. "
+ "Tried libx264 and libopenh264."
)
# -c:v:0 libvpx-vp9 -row-mt 1 -crf 32 -pass 1 -speed 4 -frame-parallel 0 -g 9999 -aq-mode 0 # -c:v:0 libvpx-vp9 -row-mt 1 -crf 32 -pass 1 -speed 4 -frame-parallel 0 -g 9999 -aq-mode 0
def generateVP9Pass1Tokens(self, quality, subIndex : int = 0): def generateVP9Pass1Tokens(self, quality, subIndex : int = 0):

View File

@@ -20,5 +20,12 @@ class HelpScreen(Screen):
yield build_screen_log_pane() yield build_screen_log_pane()
yield Footer() yield Footer()
def on_mount(self):
if getattr(self, 'context', {}).get('debug', False):
self.title = f"{self.app.title} - {self.__class__.__name__}"
def action_back(self): def action_back(self):
go_back_or_exit(self) go_back_or_exit(self)

View File

@@ -6,12 +6,23 @@ from .configuration_controller import ConfigurationController
from .logging_utils import get_ffx_logger from .logging_utils import get_ffx_logger
from .show_descriptor import ShowDescriptor from .show_descriptor import ShowDescriptor
from enum import Enum
class EmptyStringUndefined(Undefined): class EmptyStringUndefined(Undefined):
def __str__(self): def __str__(self):
return '' return ''
class LogLevel(Enum):
DEBUG = 'debug'
INFO = 'info'
WARNING = 'warning'
ERROR = 'error'
CRITICAL = 'critical'
DIFF_ADDED_KEY = 'added' DIFF_ADDED_KEY = 'added'
DIFF_REMOVED_KEY = 'removed' DIFF_REMOVED_KEY = 'removed'
DIFF_CHANGED_KEY = 'changed' DIFF_CHANGED_KEY = 'changed'
@@ -119,7 +130,7 @@ def setDiff(a : set, b : set) -> set:
def permutateList(inputList: list, permutation: list): def permutateList(inputList: list, permutation: list):
# 0,1,2: ABC # 0,1,2: ABC
# 0,2,1: ACB # 0,2,1: ACBffmpeg:
# 1,2,0: BCA # 1,2,0: BCA
pass pass

View File

@@ -39,8 +39,8 @@ class InspectDetailsScreen(MediaWorkflowScreenBase):
CSS = f""" CSS = f"""
Grid {{ Grid {{
grid-size: 6 11; grid-size: 6 8;
grid-rows: 9 2 2 2 2 8 2 2 2 8 8; grid-rows: 9 2 2 2 2 10 2 10;
grid-columns: {GRID_COLUMN_LABEL_MIN} {GRID_COLUMN_2} {GRID_COLUMN_3} {GRID_COLUMN_4} {GRID_COLUMN_5} {GRID_COLUMN_6}; grid-columns: {GRID_COLUMN_LABEL_MIN} {GRID_COLUMN_2} {GRID_COLUMN_3} {GRID_COLUMN_4} {GRID_COLUMN_5} {GRID_COLUMN_6};
height: 100%; height: 100%;
width: 100%; width: 100%;
@@ -88,6 +88,10 @@ class InspectDetailsScreen(MediaWorkflowScreenBase):
#differences-table {{ #differences-table {{
row-span: 10; row-span: 10;
}} }}
.yellow {{
tint: yellow 40%;
}}
""" """
@classmethod @classmethod
@@ -157,6 +161,7 @@ class InspectDetailsScreen(MediaWorkflowScreenBase):
yield Static(" ") yield Static(" ")
yield self.differencesTable yield self.differencesTable
# Row 2 # Row 2
yield Static(" ", classes="five") yield Static(" ", classes="five")
@@ -165,29 +170,26 @@ class InspectDetailsScreen(MediaWorkflowScreenBase):
yield Button(t("Substitute"), id="pattern_button") yield Button(t("Substitute"), id="pattern_button")
yield Static(" ", classes="three") yield Static(" ", classes="three")
# Row 4 # Row 4
yield Static(t("Pattern")) yield Static(t("Pattern"))
yield Input(type="text", id="pattern_input", classes="three") yield Input(type="text", id="pattern_input", classes="three")
yield Static(" ") yield Static(" ")
# Row 5 # Row 5
yield Static(" ", classes="five") yield Static(" ", classes="five")
# Row 6 # Row 6
yield Static(t("Media Tags")) yield Static(t("Media Tags"))
yield self.mediaTagsTable yield self.mediaTagsTable
yield Static(" ", classes="two") yield Static(" ")
# Row 7 # Row 7
yield Static(" ", classes="five") yield Static(" ", classes="five")
# Row 8 # Row 8
yield Static(" ")
yield Button(t("Set Default"), id="select_default_button")
yield Button(t("Set Forced"), id="select_forced_button")
yield Static(" ", classes="two")
# Row 9
yield Static(t("Streams")) yield Static(t("Streams"))
yield self.tracksTable yield self.tracksTable
yield Static(" ") yield Static(" ")
@@ -314,6 +316,10 @@ class InspectDetailsScreen(MediaWorkflowScreenBase):
self._update_show_header_labels() self._update_show_header_labels()
def on_mount(self): def on_mount(self):
if getattr(self, 'context', {}).get('debug', False):
self.title = f"{self.app.title} - {self.__class__.__name__}"
self._update_grid_layout() self._update_grid_layout()
if self._currentPattern is None: if self._currentPattern is None:

View File

@@ -5,6 +5,7 @@ import os
FFX_LOGGER_NAME = "FFX" FFX_LOGGER_NAME = "FFX"
CONSOLE_HANDLER_NAME = "ffx-console" CONSOLE_HANDLER_NAME = "ffx-console"
FILE_HANDLER_NAME = "ffx-file" FILE_HANDLER_NAME = "ffx-file"
MUTED_CONSOLE_LEVEL = logging.CRITICAL + 1
def get_ffx_logger(name: str = FFX_LOGGER_NAME) -> logging.Logger: def get_ffx_logger(name: str = FFX_LOGGER_NAME) -> logging.Logger:
@@ -66,3 +67,31 @@ def configure_ffx_logger(
) )
return logger return logger
def set_ffx_console_logging_enabled(
logger: logging.Logger | None,
*,
enabled: bool,
):
if logger is None:
return None
console_handler = next(
(handler for handler in logger.handlers if handler.get_name() == CONSOLE_HANDLER_NAME),
None,
)
if console_handler is None:
return None
if enabled:
saved_level = getattr(console_handler, "_ffx_saved_level", None)
if saved_level is not None:
console_handler.setLevel(saved_level)
delattr(console_handler, "_ffx_saved_level")
return console_handler
if not hasattr(console_handler, "_ffx_saved_level"):
console_handler._ffx_saved_level = console_handler.level
console_handler.setLevel(MUTED_CONSOLE_LEVEL)
return console_handler

View File

@@ -12,11 +12,13 @@ from ffx.track_descriptor import TrackDescriptor
from .i18n import t from .i18n import t
from .confirm_screen import ConfirmScreen from .confirm_screen import ConfirmScreen
from .media_workflow_screen_base import MediaWorkflowScreenBase from .media_workflow_screen_base import MediaWorkflowScreenBase
from .screen_support import build_screen_log_pane, localized_column_width, write_screen_log from .screen_support import build_screen_log_pane, localized_column_width
from .tag_delete_screen import TagDeleteScreen from .tag_delete_screen import TagDeleteScreen
from .tag_details_screen import TagDetailsScreen from .tag_details_screen import TagDetailsScreen
from .track_details_screen import TrackDetailsScreen from .track_details_screen import TrackDetailsScreen
from .helper import LogLevel
class MediaEditScreen(MediaWorkflowScreenBase): class MediaEditScreen(MediaWorkflowScreenBase):
@@ -176,6 +178,10 @@ class MediaEditScreen(MediaWorkflowScreenBase):
yield Footer() yield Footer()
def on_mount(self): def on_mount(self):
if getattr(self, 'context', {}).get('debug', False):
self.title = f"{self.app.title} - {self.__class__.__name__}"
self._update_grid_layout() self._update_grid_layout()
self.updateMediaTags() self.updateMediaTags()
self.updateTracks() self.updateTracks()
@@ -207,9 +213,24 @@ class MediaEditScreen(MediaWorkflowScreenBase):
if self._messageText: if self._messageText:
self.notify(self._messageText) self.notify(self._messageText)
def _notify_from_worker(self, message: str) -> None:
self.app.call_from_thread(write_screen_log, self, str(message)) def workerLoggingHandler(self,
self.app.call_from_thread(self.notify, str(message)) message: str,
level: LogLevel = LogLevel.INFO) -> None:
if level == LogLevel.DEBUG:
self.context["logger"].debug(str(message))
elif level == LogLevel.INFO:
self.context["logger"].info(str(message))
elif level == LogLevel.WARNING:
self.context["logger"].warning(str(message))
elif level == LogLevel.ERROR:
self.context["logger"].error(str(message))
elif level == LogLevel.CRITICAL:
self.context["logger"].critical(str(message))
else:
raise Exception(f"Undefined Logging Level (msg={message})")
def _report_apply_timings(self, applyResult: dict, reloadSeconds: float = 0.0) -> None: def _report_apply_timings(self, applyResult: dict, reloadSeconds: float = 0.0) -> None:
timings = dict(applyResult.get("timings", {})) timings = dict(applyResult.get("timings", {}))
@@ -226,10 +247,6 @@ class MediaEditScreen(MediaWorkflowScreenBase):
+ f"total={totalSeconds:.2f}s" + f"total={totalSeconds:.2f}s"
) )
self.context["logger"].info(timingSummary) self.context["logger"].info(timingSummary)
write_screen_log(self, timingSummary)
if int(self.context.get("verbosity", 0) or 0) > 0:
self.notify(timingSummary)
def updateToggleButtons(self): def updateToggleButtons(self):
self._set_toggle_button_state( self._set_toggle_button_state(
@@ -402,9 +419,8 @@ class MediaEditScreen(MediaWorkflowScreenBase):
self.setMessage(t("Apply already running.")) self.setMessage(t("Apply already running."))
return return
write_screen_log( self.context["logger"].info(
self, t("Starting metadata apply for {filename}.", filename=self._mediaFilename)
t("Starting metadata apply for {filename}.", filename=self._mediaFilename),
) )
self._applyChangesWorker = self.run_apply_changes_worker() self._applyChangesWorker = self.run_apply_changes_worker()
@@ -420,7 +436,7 @@ class MediaEditScreen(MediaWorkflowScreenBase):
self._mediaFilename, self._mediaFilename,
self._baselineMediaDescriptor, self._baselineMediaDescriptor,
self._sourceMediaDescriptor, self._sourceMediaDescriptor,
notify=self._notify_from_worker, loggingHandler = self.workerLoggingHandler,
) )
def on_worker_state_changed(self, event: Worker.StateChanged) -> None: def on_worker_state_changed(self, event: Worker.StateChanged) -> None:
@@ -435,7 +451,6 @@ class MediaEditScreen(MediaWorkflowScreenBase):
self._mediaFilename, self._mediaFilename,
exc_info=(type(error), error, error.__traceback__), exc_info=(type(error), error, error.__traceback__),
) )
write_screen_log(self, t("Apply failed: {error}", error=error))
self.setMessage(t("Apply failed: {error}", error=error)) self.setMessage(t("Apply failed: {error}", error=error))
self._applyChangesWorker = None self._applyChangesWorker = None
return return
@@ -447,8 +462,7 @@ class MediaEditScreen(MediaWorkflowScreenBase):
if applyResult.get("dry_run", False): if applyResult.get("dry_run", False):
self._report_apply_timings(applyResult, reloadSeconds=0.0) self._report_apply_timings(applyResult, reloadSeconds=0.0)
write_screen_log( self.context["logger"].info(
self,
t( t(
"Dry-run prepared temporary output {target_path}.", "Dry-run prepared temporary output {target_path}.",
target_path=applyResult["target_path"], target_path=applyResult["target_path"],
@@ -464,12 +478,12 @@ class MediaEditScreen(MediaWorkflowScreenBase):
return return
reloadStart = monotonic() reloadStart = monotonic()
write_screen_log(self, t("Reloading file after metadata write.")) self.context["logger"].info(t("Reloading file after metadata write."))
self.reloadProperties(reset_draft=True) self.reloadProperties(reset_draft=True)
self.refreshAfterDraftChange() self.refreshAfterDraftChange()
reloadSeconds = monotonic() - reloadStart reloadSeconds = monotonic() - reloadStart
self._report_apply_timings(applyResult, reloadSeconds=reloadSeconds) self._report_apply_timings(applyResult, reloadSeconds=reloadSeconds)
write_screen_log(self, t("Changes applied and file reloaded.")) self.context["logger"].info(t("Changes applied and file reloaded."))
self.setMessage(t("Changes applied and file reloaded.")) self.setMessage(t("Changes applied and file reloaded."))
self._applyChangesWorker = None self._applyChangesWorker = None

View File

@@ -16,6 +16,8 @@ from .media_descriptor_change_set import MediaDescriptorChangeSet
from .process import executeProcess, formatCommandSequence from .process import executeProcess, formatCommandSequence
from .video_encoder import VideoEncoder from .video_encoder import VideoEncoder
from .helper import LogLevel
def create_temporary_output_path(source_path: str) -> str: def create_temporary_output_path(source_path: str) -> str:
sourceDirectory = os.path.dirname(os.path.abspath(source_path)) or "." sourceDirectory = os.path.dirname(os.path.abspath(source_path)) or "."
@@ -75,22 +77,22 @@ def notify_ffmpeg_invocation(
context: dict, context: dict,
command_sequence: list[str], command_sequence: list[str],
*, *,
notify=None, loggingHandler = None,
dry_run: bool = False, dry_run: bool = False,
) -> None: ) -> None:
notify_callback = notify or context.get("notify_callback") loggingCallback = loggingHandler or context.get("logging_handler")
if not callable(notify_callback): if not callable(loggingCallback):
return return
verbosity = int(context.get("verbosity", 0) or 0) verbosity = int(context.get("verbosity", 0) or 0)
if verbosity > 0: if verbosity > 0:
if dry_run: if dry_run:
notify_callback(f"ffmpeg dry-run: {formatCommandSequence(command_sequence)}") loggingCallback(f"ffmpeg dry-run: {formatCommandSequence(command_sequence)}", level = LogLevel.DEBUG)
else: else:
notify_callback(f"ffmpeg: {formatCommandSequence(command_sequence)}") loggingCallback(f"ffmpeg: {formatCommandSequence(command_sequence)}", level = LogLevel.DEBUG)
return return
notify_callback("ffmpeg dry-run prepared.") if dry_run else notify_callback( loggingCallback("ffmpeg dry-run prepared.") if dry_run else loggingCallback(
"ffmpeg metadata write started." "ffmpeg metadata write started."
) )
@@ -101,7 +103,7 @@ def apply_metadata_edits(
baseline_descriptor: MediaDescriptor, baseline_descriptor: MediaDescriptor,
draft_descriptor: MediaDescriptor, draft_descriptor: MediaDescriptor,
*, *,
notify=None, loggingHandler = None,
) -> dict[str, object]: ) -> dict[str, object]:
temporaryOutputPath = create_temporary_output_path(source_path) temporaryOutputPath = create_temporary_output_path(source_path)
@@ -126,7 +128,7 @@ def apply_metadata_edits(
notify_ffmpeg_invocation( notify_ffmpeg_invocation(
editContext, editContext,
commandSequence, commandSequence,
notify=notify, loggingHandler = loggingHandler,
dry_run=True, dry_run=True,
) )
@@ -142,7 +144,9 @@ def apply_metadata_edits(
}, },
} }
notify_ffmpeg_invocation(editContext, commandSequence, notify=notify) notify_ffmpeg_invocation(editContext,
commandSequence,
loggingHandler = loggingHandler)
ffmpegStart = monotonic() ffmpegStart = monotonic()
_out, err, rc = executeProcess(commandSequence, context=editContext) _out, err, rc = executeProcess(commandSequence, context=editContext)

View File

@@ -68,6 +68,10 @@ class PatternDeleteScreen(Screen):
def on_mount(self): def on_mount(self):
if getattr(self, 'context', {}).get('debug', False):
self.title = f"{self.app.title} - {self.__class__.__name__}"
if self.__showDescriptor: if self.__showDescriptor:
self.query_one("#showlabel", Static).update(f"{self.__showDescriptor.getId()} - {self.__showDescriptor.getName()} ({self.__showDescriptor.getYear()})") self.query_one("#showlabel", Static).update(f"{self.__showDescriptor.getId()} - {self.__showDescriptor.getName()} ({self.__showDescriptor.getYear()})")
if not self.__pattern is None: if not self.__pattern is None:

View File

@@ -326,6 +326,9 @@ class PatternDetailsScreen(Screen):
def on_mount(self): def on_mount(self):
if getattr(self, 'context', {}).get('debug', False):
self.title = f"{self.app.title} - {self.__class__.__name__}"
if not self.__showDescriptor is None: if not self.__showDescriptor is None:
self.query_one("#showlabel", Static).update(f"{self.__showDescriptor.getId()} - {self.__showDescriptor.getName()} ({self.__showDescriptor.getYear()})") self.query_one("#showlabel", Static).update(f"{self.__showDescriptor.getId()} - {self.__showDescriptor.getName()} ({self.__showDescriptor.getYear()})")

View File

@@ -1,12 +1,15 @@
from __future__ import annotations from __future__ import annotations
import logging
import weakref
from collections.abc import Mapping from collections.abc import Mapping
from dataclasses import dataclass from dataclasses import dataclass
from rich.cells import cell_len from rich.cells import cell_len
from rich.measure import measure_renderables from rich.measure import measure_renderables
from rich.text import Text from rich.text import Text
from textual.widgets import Collapsible, RichLog from textual import events
from textual.widgets import Collapsible, RichLog, Static
from .helper import formatRichColor from .helper import formatRichColor
from .i18n import t from .i18n import t
@@ -20,6 +23,152 @@ from .track_controller import TrackController
SCREEN_LOG_PANE_ID = "screen_log_pane" SCREEN_LOG_PANE_ID = "screen_log_pane"
SCREEN_LOG_VIEW_ID = "screen_log_view" SCREEN_LOG_VIEW_ID = "screen_log_view"
SCREEN_LOG_RESIZE_HANDLE_ID = "screen_log_resize_handle"
SCREEN_LOG_HANDLER_NAME = "ffx-screen-log"
SCREEN_LOG_DEFAULT_HEIGHT = 8
SCREEN_LOG_MIN_HEIGHT = 4
SCREEN_LOG_COMPONENT_WIDTH = 16
SCREEN_LOG_LEVEL_WIDTH = 8
_SCREEN_LOG_PANE_ENABLED = False
class ScreenLogHandler(logging.Handler):
"""Mirror logger output into the active screen log pane when available."""
def __init__(self, app) -> None:
super().__init__(level=logging.DEBUG)
self.set_name(SCREEN_LOG_HANDLER_NAME)
self.set_app(app)
def set_app(self, app) -> None:
self._app_ref = weakref.ref(app) if app is not None else lambda: None
def emit(self, record: logging.LogRecord) -> None:
app = self._app_ref()
if app is None:
return
try:
message = str(self.format(record)).strip()
except Exception:
self.handleError(record)
return
if not message:
return
try:
app.call_from_thread(write_screen_log, app.screen, message)
except RuntimeError:
write_screen_log(app.screen, message)
except Exception:
self.handleError(record)
class ScreenLogResizeHandle(Static):
DEFAULT_CSS = """
ScreenLogResizeHandle {
width: 100%;
height: 1;
content-align: center middle;
color: $text-muted;
background: $panel-lighten-1;
}
ScreenLogResizeHandle:hover {
color: $text;
background: $panel-lighten-2;
}
"""
def __init__(self) -> None:
super().__init__(" drag to resize ", id=SCREEN_LOG_RESIZE_HANDLE_ID)
self._drag_active = False
self._drag_origin_screen_y = 0
self._drag_origin_height = SCREEN_LOG_DEFAULT_HEIGHT
def _get_log_pane(self):
return self.parent.parent if self.parent is not None else None
def on_mouse_down(self, event: events.MouseDown) -> None:
if event.button != 1:
return
log_pane = self._get_log_pane()
if log_pane is None:
return
self._drag_active = True
self._drag_origin_screen_y = event.screen_y
self._drag_origin_height = log_pane.get_log_height()
self.capture_mouse()
event.stop()
def on_mouse_move(self, event: events.MouseMove) -> None:
if not self._drag_active:
return
log_pane = self._get_log_pane()
if log_pane is None:
return
next_height = self._drag_origin_height + (
self._drag_origin_screen_y - event.screen_y
)
log_pane.set_log_height(next_height)
event.stop()
def on_mouse_up(self, event: events.MouseUp) -> None:
if not self._drag_active:
return
self._drag_active = False
self.release_mouse()
event.stop()
class ResizableScreenLogPane(Collapsible):
def __init__(self) -> None:
self._log_view = RichLog(
id=SCREEN_LOG_VIEW_ID,
wrap=True,
markup=False,
highlight=False,
auto_scroll=True,
)
self._log_height = SCREEN_LOG_DEFAULT_HEIGHT
self._apply_log_height()
super().__init__(
ScreenLogResizeHandle(),
self._log_view,
title=t("Log"),
collapsed=True,
id=SCREEN_LOG_PANE_ID,
)
self.styles.width = "100%"
def _apply_log_height(self) -> None:
self._log_view.styles.height = self._log_height
self._log_view.styles.width = "100%"
def get_log_height(self) -> int:
return int(self._log_height)
def set_log_height(self, height: int) -> None:
next_height = max(SCREEN_LOG_MIN_HEIGHT, int(height))
try:
available_height = int(self.app.size.height) - 8
except Exception:
available_height = next_height
if available_height > 0:
next_height = min(next_height, available_height)
self._log_height = next_height
self._apply_log_height()
@dataclass(frozen=True) @dataclass(frozen=True)
@@ -52,6 +201,48 @@ def build_screen_bootstrap(context: dict) -> ScreenBootstrap:
) )
def set_screen_log_pane_enabled(enabled: bool) -> None:
global _SCREEN_LOG_PANE_ENABLED
_SCREEN_LOG_PANE_ENABLED = bool(enabled)
def is_screen_log_pane_enabled() -> bool:
return bool(_SCREEN_LOG_PANE_ENABLED)
def configure_screen_log_handler(logger, app, *, enabled: bool):
if logger is None:
return None
screen_log_handler = next(
(handler for handler in logger.handlers if handler.get_name() == SCREEN_LOG_HANDLER_NAME),
None,
)
if not enabled:
if screen_log_handler is not None:
logger.removeHandler(screen_log_handler)
screen_log_handler.close()
return None
if screen_log_handler is None:
screen_log_handler = ScreenLogHandler(app)
logger.addHandler(screen_log_handler)
elif isinstance(screen_log_handler, ScreenLogHandler):
screen_log_handler.set_app(app)
screen_log_handler.setLevel(logging.DEBUG)
screen_log_handler.setFormatter(
logging.Formatter(
f"%(name)-{SCREEN_LOG_COMPONENT_WIDTH}s "
+ f"%(levelname)-{SCREEN_LOG_LEVEL_WIDTH}s "
+ "%(asctime)s | %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
)
return screen_log_handler
def build_screen_controllers( def build_screen_controllers(
context: dict, context: dict,
*, *,
@@ -149,27 +340,15 @@ def update_table_column_label(table, column_key, label) -> None:
table.refresh() table.refresh()
def build_screen_log_pane() -> Collapsible: def build_screen_log_pane() -> ResizableScreenLogPane | Static:
"""Create a shared collapsible log pane for screen-local diagnostics.""" """Create a shared collapsible log pane for screen-local diagnostics."""
logView = RichLog( if not is_screen_log_pane_enabled():
id=SCREEN_LOG_VIEW_ID, hidden = Static("", id=f"{SCREEN_LOG_PANE_ID}_disabled")
wrap=True, hidden.display = False
markup=False, return hidden
highlight=False,
auto_scroll=True,
)
logView.styles.height = 8
logView.styles.width = "100%"
logPane = Collapsible( return ResizableScreenLogPane()
logView,
title=t("Log"),
collapsed=True,
id=SCREEN_LOG_PANE_ID,
)
logPane.styles.width = "100%"
return logPane
def toggle_screen_log_pane(screen) -> bool: def toggle_screen_log_pane(screen) -> bool:

View File

@@ -20,5 +20,12 @@ class SettingsScreen(Screen):
yield build_screen_log_pane() yield build_screen_log_pane()
yield Footer() yield Footer()
def on_mount(self):
if getattr(self, 'context', {}).get('debug', False):
self.title = f"{self.app.title} - {self.__class__.__name__}"
def action_back(self): def action_back(self):
go_back_or_exit(self) go_back_or_exit(self)

View File

@@ -67,6 +67,9 @@ class ShiftedSeasonDeleteScreen(Screen):
def on_mount(self): def on_mount(self):
if getattr(self, 'context', {}).get('debug', False):
self.title = f"{self.app.title} - {self.__class__.__name__}"
shiftedSeason: ShiftedSeason = self.__ssc.getShiftedSeason(self.__shiftedSeasonId) shiftedSeason: ShiftedSeason = self.__ssc.getShiftedSeason(self.__shiftedSeasonId)
ownerLabel = ( ownerLabel = (

View File

@@ -109,6 +109,9 @@ class ShiftedSeasonDetailsScreen(Screen):
def on_mount(self): def on_mount(self):
if getattr(self, 'context', {}).get('debug', False):
self.title = f"{self.app.title} - {self.__class__.__name__}"
if self.__shiftedSeasonId is not None: if self.__shiftedSeasonId is not None:
shiftedSeason: ShiftedSeason = self.__ssc.getShiftedSeason(self.__shiftedSeasonId) shiftedSeason: ShiftedSeason = self.__ssc.getShiftedSeason(self.__shiftedSeasonId)

View File

@@ -109,5 +109,12 @@ class ShowDeleteScreen(Screen):
if event.button.id == "cancel_button": if event.button.id == "cancel_button":
self.app.pop_screen() self.app.pop_screen()
def on_mount(self):
if getattr(self, 'context', {}).get('debug', False):
self.title = f"{self.app.title} - {self.__class__.__name__}"
def action_back(self): def action_back(self):
go_back_or_exit(self) go_back_or_exit(self)

View File

@@ -175,6 +175,9 @@ class ShowDetailsScreen(Screen):
def on_mount(self): def on_mount(self):
if getattr(self, 'context', {}).get('debug', False):
self.title = f"{self.app.title} - {self.__class__.__name__}"
if self.__showDescriptor is not None: if self.__showDescriptor is not None:
showId = int(self.__showDescriptor.getId()) showId = int(self.__showDescriptor.getId())

View File

@@ -244,6 +244,10 @@ class ShowsScreen(Screen):
def on_mount(self) -> None: def on_mount(self) -> None:
if getattr(self, 'context', {}).get('debug', False):
self.title = f"{self.app.title} - {self.__class__.__name__}"
for show in self.__sc.getAllShows(): for show in self.__sc.getAllShows():
self._add_show_row(show.getDescriptor(self.context)) self._add_show_row(show.getDescriptor(self.context))

View File

@@ -64,6 +64,9 @@ class TagDeleteScreen(Screen):
def on_mount(self): def on_mount(self):
if getattr(self, 'context', {}).get('debug', False):
self.title = f"{self.app.title} - {self.__class__.__name__}"
self.query_one("#keylabel", Static).update(str(self.__key)) self.query_one("#keylabel", Static).update(str(self.__key))
self.query_one("#valuelabel", Static).update(str(self.__value)) self.query_one("#valuelabel", Static).update(str(self.__value))

View File

@@ -87,6 +87,9 @@ class TagDetailsScreen(Screen):
def on_mount(self): def on_mount(self):
if getattr(self, 'context', {}).get('debug', False):
self.title = f"{self.app.title} - {self.__class__.__name__}"
if self.__key is not None: if self.__key is not None:
self.query_one("#key_input", Input).value = str(self.__key) self.query_one("#key_input", Input).value = str(self.__key)

View File

@@ -4,7 +4,7 @@ from enum import Enum
class TrackCodec(Enum): class TrackCodec(Enum):
VP9 = {'identifier': 'vp9', 'format': 'ivf', 'extension': 'ivf' , 'label': 'VP9'} VP9 = {'identifier': 'vp9', 'format': 'ivf', 'extension': 'ivf' , 'label': 'VP9'}
H265 = {'identifier': 'hevc', 'format': 'h265', 'extension': 'h265' ,'label': 'H.265'} H265 = {'identifier': 'hevc', 'format': None, 'extension': 'h265' ,'label': 'H.265'}
H264 = {'identifier': 'h264', 'format': 'h264', 'extension': 'h264' ,'label': 'H.264'} H264 = {'identifier': 'h264', 'format': 'h264', 'extension': 'h264' ,'label': 'H.264'}
MPEG4 = {'identifier': 'mpeg4', 'format': 'm4v', 'extension': 'm4v' ,'label': 'MPEG-4'} MPEG4 = {'identifier': 'mpeg4', 'format': 'm4v', 'extension': 'm4v' ,'label': 'MPEG-4'}
MPEG2 = {'identifier': 'mpeg2video', 'format': 'mpeg2video', 'extension': 'mpg' ,'label': 'MPEG-2'} MPEG2 = {'identifier': 'mpeg2video', 'format': 'mpeg2video', 'extension': 'mpg' ,'label': 'MPEG-2'}

View File

@@ -67,6 +67,9 @@ class TrackDeleteScreen(Screen):
def on_mount(self): def on_mount(self):
if getattr(self, 'context', {}).get('debug', False):
self.title = f"{self.app.title} - {self.__class__.__name__}"
self.query_one("#subindexlabel", Static).update(str(self.__trackDescriptor.getSubIndex())) self.query_one("#subindexlabel", Static).update(str(self.__trackDescriptor.getSubIndex()))
self.query_one("#patternlabel", Static).update(str(self.__trackDescriptor.getPatternId())) self.query_one("#patternlabel", Static).update(str(self.__trackDescriptor.getPatternId()))
self.query_one("#languagelabel", Static).update(str(self.__trackDescriptor.getLanguage().label())) self.query_one("#languagelabel", Static).update(str(self.__trackDescriptor.getLanguage().label()))

View File

@@ -236,6 +236,9 @@ class TrackDetailsScreen(Screen):
def on_mount(self): def on_mount(self):
if getattr(self, 'context', {}).get('debug', False):
self.title = f"{self.app.title} - {self.__class__.__name__}"
self.query_one("#index_label", Static).update( self.query_one("#index_label", Static).update(
str(self.__index) if self.__index is not None else "-" str(self.__index) if self.__index is not None else "-"
) )

View File

@@ -7,6 +7,7 @@ import os
from pathlib import Path from pathlib import Path
import subprocess import subprocess
import sys import sys
from functools import lru_cache
from typing import Mapping from typing import Mapping
@@ -95,8 +96,69 @@ def write_vtt(path: Path, lines: tuple[str, ...]) -> Path:
return path return path
def create_source_fixture(workdir: Path, filename: str, tracks: list[SourceTrackSpec], duration_seconds: int = 1) -> Path: @lru_cache(maxsize=None)
def _ffmpeg_encoder_is_available(encoder_name: str) -> bool:
completed = subprocess.run(
["ffmpeg", "-encoders"],
capture_output=True,
text=True,
)
if completed.returncode != 0:
return False
encoder_label = str(encoder_name).strip()
for line in completed.stdout.splitlines():
if not line.startswith(" "):
continue
tokens = line.split(maxsplit=2)
if len(tokens) >= 2 and tokens[1] == encoder_label:
return True
return False
def _resolve_fixture_video_encoder(
video_encoder: str,
video_encoder_options: tuple[str, ...],
) -> tuple[str, tuple[str, ...]]:
if video_encoder != "libx264":
return video_encoder, video_encoder_options
if _ffmpeg_encoder_is_available("libx264"):
return video_encoder, video_encoder_options
if _ffmpeg_encoder_is_available("libopenh264"):
# Keep fixture generation software-based when libx264 is missing.
return "libopenh264", ("-pix_fmt", "yuv420p")
return video_encoder, video_encoder_options
def create_source_fixture(
workdir: Path,
filename: str,
tracks: list[SourceTrackSpec],
duration_seconds: int = 1,
*,
video_encoder: str = "libx264",
video_encoder_options: tuple[str, ...] = (
"-preset",
"ultrafast",
"-crf",
"35",
"-pix_fmt",
"yuv420p",
),
audio_encoder: str = "aac",
audio_encoder_options: tuple[str, ...] = ("-b:a", "48k"),
subtitle_encoder: str = "webvtt",
) -> Path:
output_path = workdir / filename output_path = workdir / filename
video_encoder, video_encoder_options = _resolve_fixture_video_encoder(
video_encoder,
video_encoder_options,
)
has_video = any(track.track_type == TrackType.VIDEO for track in tracks) has_video = any(track.track_type == TrackType.VIDEO for track in tracks)
has_audio = any(track.track_type == TrackType.AUDIO for track in tracks) has_audio = any(track.track_type == TrackType.AUDIO for track in tracks)
@@ -189,21 +251,16 @@ def create_source_fixture(workdir: Path, filename: str, tracks: list[SourceTrack
command += map_tokens command += map_tokens
command += metadata_tokens command += metadata_tokens
command += disposition_tokens command += disposition_tokens
if has_video:
command += ["-c:v", video_encoder] + list(video_encoder_options)
if has_audio:
command += ["-c:a", audio_encoder] + list(audio_encoder_options)
if subtitle_input_indices:
command += ["-c:s", subtitle_encoder]
command += [ command += [
"-c:v",
"libx264",
"-preset",
"ultrafast",
"-crf",
"35",
"-pix_fmt",
"yuv420p",
"-c:a",
"aac",
"-b:a",
"48k",
"-c:s",
"webvtt",
"-t", "-t",
str(duration_seconds), str(duration_seconds),
"-shortest", "-shortest",

View File

@@ -168,6 +168,40 @@ class CliLazyImportTests(unittest.TestCase):
result["modules"], result["modules"],
) )
def test_root_debug_flag_parses_without_loading_runtime_modules(self):
result = self.run_python(
textwrap.dedent(
f"""
import json
import sys
sys.path.insert(0, {str(SRC_ROOT)!r})
import ffx.cli
context = ffx.cli.ffx.make_context(
"ffx",
["--debug", "help"],
resilient_parsing=True,
)
print(json.dumps({{
"debug": context.params["debug"],
"modules": {{
module_name: module_name in sys.modules
for module_name in {HEAVY_MODULES!r}
}},
}}))
"""
)
)
self.assertTrue(result["debug"])
self.assertTrue(
all(not is_loaded for is_loaded in result["modules"].values()),
result["modules"],
)
def test_convert_cut_option_supports_flag_duration_and_start_duration_forms(self): def test_convert_cut_option_supports_flag_duration_and_start_duration_forms(self):
result = self.run_python( result = self.run_python(
textwrap.dedent( textwrap.dedent(

View File

@@ -0,0 +1,89 @@
from __future__ import annotations
from pathlib import Path
import sys
import unittest
SRC_ROOT = Path(__file__).resolve().parents[2] / "src"
if str(SRC_ROOT) not in sys.path:
sys.path.insert(0, str(SRC_ROOT))
from ffx import cli # noqa: E402
from ffx.track_codec import TrackCodec # noqa: E402
from ffx.track_descriptor import TrackDescriptor # noqa: E402
from ffx.track_type import TrackType # noqa: E402
class UnmuxSequenceTests(unittest.TestCase):
def test_h265_video_unmux_uses_annex_b_bitstream_filter_without_forced_format(self):
track_descriptor = TrackDescriptor(
index=0,
sub_index=0,
track_type=TrackType.VIDEO,
codec_name=TrackCodec.H265,
tags={},
disposition_set=set(),
)
sequence = cli.getUnmuxSequence(
track_descriptor,
"input.mp4",
"episode_0_eng",
)
self.assertEqual(
[
"ffmpeg",
"-y",
"-i",
"input.mp4",
"-map",
"0:v:0",
"-c:v",
"copy",
"-bsf:v",
"hevc_mp4toannexb",
"episode_0_eng.h265",
],
sequence,
)
def test_non_h265_unmux_keeps_generic_copy_behavior(self):
track_descriptor = TrackDescriptor(
index=1,
sub_index=0,
track_type=TrackType.SUBTITLE,
codec_name=TrackCodec.SRT,
tags={},
disposition_set=set(),
)
sequence = cli.getUnmuxSequence(
track_descriptor,
"input.mkv",
"episode_1_eng",
)
self.assertEqual(
[
"ffmpeg",
"-y",
"-i",
"input.mkv",
"-map",
"0:s:0",
"-c",
"copy",
"-f",
"srt",
"episode_1_eng.srt",
],
sequence,
)
if __name__ == "__main__":
unittest.main()

View File

@@ -1,5 +1,6 @@
from __future__ import annotations from __future__ import annotations
import click
from pathlib import Path from pathlib import Path
import sys import sys
import unittest import unittest
@@ -32,6 +33,9 @@ class StaticConfig:
class FfxControllerTests(unittest.TestCase): class FfxControllerTests(unittest.TestCase):
def tearDown(self):
FfxController.isFfmpegEncoderAvailable.cache_clear()
def make_context(self, video_encoder: VideoEncoder) -> dict: def make_context(self, video_encoder: VideoEncoder) -> dict:
return { return {
"logger": get_ffx_logger(), "logger": get_ffx_logger(),
@@ -192,6 +196,62 @@ class FfxControllerTests(unittest.TestCase):
self.assertIn("ENCODING_QUALITY=19", commands[0]) self.assertIn("ENCODING_QUALITY=19", commands[0])
mocked_info.assert_any_call("Setting quality 19 from pattern") mocked_info.assert_any_call("Setting quality 19 from pattern")
def test_generate_h264_tokens_prefers_libx264_when_available(self):
context = self.make_context(VideoEncoder.H264)
target_descriptor, source_descriptor = self.make_media_descriptors()
controller = FfxController(context, target_descriptor, source_descriptor)
with patch.object(
FfxController,
"getSupportedSoftwareH264Encoder",
return_value="libx264",
):
tokens = controller.generateH264Tokens(23)
self.assertEqual(
["-c:v:0", "libx264", "-preset", "slow", "-crf", "23"],
tokens,
)
def test_generate_h264_tokens_falls_back_to_libopenh264_and_logs_warning(self):
context = self.make_context(VideoEncoder.H264)
target_descriptor, source_descriptor = self.make_media_descriptors()
controller = FfxController(context, target_descriptor, source_descriptor)
with (
patch.object(
FfxController,
"getSupportedSoftwareH264Encoder",
return_value="libopenh264",
),
patch.object(context["logger"], "warning") as mocked_warning,
):
tokens = controller.generateH264Tokens(23)
self.assertEqual(
["-c:v:0", "libopenh264", "-pix_fmt", "yuv420p"],
tokens,
)
mocked_warning.assert_called_once_with(
"libx264 encoder unavailable; falling back to libopenh264 for H.264 encoding."
)
def test_generate_h264_tokens_raises_when_no_supported_software_encoder_exists(self):
context = self.make_context(VideoEncoder.H264)
target_descriptor, source_descriptor = self.make_media_descriptors()
controller = FfxController(context, target_descriptor, source_descriptor)
with patch.object(
FfxController,
"getSupportedSoftwareH264Encoder",
return_value=None,
):
with self.assertRaisesRegex(
click.ClickException,
"no supported software H.264 encoder is available",
):
controller.generateH264Tokens(23)
if __name__ == "__main__": if __name__ == "__main__":
unittest.main() unittest.main()

View File

@@ -2,6 +2,7 @@ from __future__ import annotations
from pathlib import Path from pathlib import Path
import sys import sys
import tempfile
import unittest import unittest
@@ -16,6 +17,7 @@ from ffx.i18n import set_current_language # noqa: E402
from ffx.logging_utils import get_ffx_logger # noqa: E402 from ffx.logging_utils import get_ffx_logger # noqa: E402
from ffx.track_codec import TrackCodec # noqa: E402 from ffx.track_codec import TrackCodec # noqa: E402
from ffx.track_type import TrackType # noqa: E402 from ffx.track_type import TrackType # noqa: E402
from tests.support.ffx_bundle import SourceTrackSpec, create_source_fixture # noqa: E402
class StaticConfig: class StaticConfig:
@@ -39,10 +41,26 @@ class FilePropertiesAssetProbeTests(unittest.TestCase):
} }
set_current_language("de") set_current_language("de")
media_path = ( with tempfile.TemporaryDirectory() as tmpdir:
Path(__file__).resolve().parents[1] media_path = create_source_fixture(
/ "assets" Path(tmpdir),
/ "Boruto; Naruto Next Generations (2017) - 0069 Super-Chochos Liebestaumel - S01E0069.webm" "fixture.webm",
[
SourceTrackSpec(TrackType.VIDEO, identity="video-0"),
SourceTrackSpec(TrackType.AUDIO, identity="audio-1", language="eng"),
SourceTrackSpec(
TrackType.SUBTITLE,
identity="subtitle-2",
language="eng",
subtitle_lines=("Lorem ipsum dolor sit amet.",),
),
],
duration_seconds=3,
video_encoder="libvpx-vp9",
video_encoder_options=("-b:v", "0", "-crf", "45"),
audio_encoder="libopus",
audio_encoder_options=("-b:a", "48k"),
subtitle_encoder="webvtt",
) )
file_properties = FileProperties(context, str(media_path)) file_properties = FileProperties(context, str(media_path))

View File

@@ -16,8 +16,10 @@ if str(SRC_ROOT) not in sys.path:
from ffx.logging_utils import ( # noqa: E402 from ffx.logging_utils import ( # noqa: E402
CONSOLE_HANDLER_NAME, CONSOLE_HANDLER_NAME,
FILE_HANDLER_NAME, FILE_HANDLER_NAME,
MUTED_CONSOLE_LEVEL,
configure_ffx_logger, configure_ffx_logger,
get_ffx_logger, get_ffx_logger,
set_ffx_console_logging_enabled,
) )
@@ -81,6 +83,33 @@ class LoggingUtilsTests(unittest.TestCase):
self.cleanup_logger(logger_name) self.cleanup_logger(logger_name)
def test_set_ffx_console_logging_enabled_mutes_and_restores_console_handler(self):
logger_name = "ffx-test-console-mute"
self.cleanup_logger(logger_name)
with tempfile.TemporaryDirectory() as tempdir:
log_path = Path(tempdir) / "ffx.log"
logger = configure_ffx_logger(
str(log_path),
logging.DEBUG,
logging.INFO,
name=logger_name,
)
console_handler = next(
handler for handler in logger.handlers if handler.get_name() == CONSOLE_HANDLER_NAME
)
self.assertEqual(logging.INFO, console_handler.level)
set_ffx_console_logging_enabled(logger, enabled=False)
self.assertEqual(MUTED_CONSOLE_LEVEL, console_handler.level)
set_ffx_console_logging_enabled(logger, enabled=True)
self.assertEqual(logging.INFO, console_handler.level)
self.cleanup_logger(logger_name)
if __name__ == "__main__": if __name__ == "__main__":
unittest.main() unittest.main()

View File

@@ -15,6 +15,7 @@ if str(SRC_ROOT) not in sys.path:
from ffx.logging_utils import get_ffx_logger # noqa: E402 from ffx.logging_utils import get_ffx_logger # noqa: E402
from ffx.helper import LogLevel # noqa: E402
from ffx.media_descriptor import MediaDescriptor # noqa: E402 from ffx.media_descriptor import MediaDescriptor # noqa: E402
from ffx.metadata_editor import ( # noqa: E402 from ffx.metadata_editor import ( # noqa: E402
apply_metadata_edits, apply_metadata_edits,
@@ -33,6 +34,16 @@ class StaticConfig:
return {} return {}
class NotificationCollector:
def __init__(self) -> None:
self.messages: list[str] = []
self.levels: list[LogLevel | None] = []
def __call__(self, message: str, level: LogLevel | None = None) -> None:
self.messages.append(message)
self.levels.append(level)
def make_context(*, dry_run: bool = False) -> dict: def make_context(*, dry_run: bool = False) -> dict:
return { return {
"logger": get_ffx_logger(), "logger": get_ffx_logger(),
@@ -151,7 +162,7 @@ class MetadataEditorTests(unittest.TestCase):
context = make_context(dry_run=True) context = make_context(dry_run=True)
baseline_descriptor = make_descriptor() baseline_descriptor = make_descriptor()
draft_descriptor = baseline_descriptor.clone(context=context) draft_descriptor = baseline_descriptor.clone(context=context)
notifications = [] notifications = NotificationCollector()
expected_command = build_metadata_edit_command( expected_command = build_metadata_edit_command(
build_metadata_edit_context(context), build_metadata_edit_context(context),
"/tmp/example.mkv", "/tmp/example.mkv",
@@ -170,12 +181,13 @@ class MetadataEditorTests(unittest.TestCase):
"/tmp/example.mkv", "/tmp/example.mkv",
baseline_descriptor, baseline_descriptor,
draft_descriptor, draft_descriptor,
notify=notifications.append, loggingHandler = notifications,
) )
mocked_execute.assert_not_called() mocked_execute.assert_not_called()
mocked_replace.assert_not_called() mocked_replace.assert_not_called()
self.assertEqual(["ffmpeg dry-run prepared."], notifications) self.assertEqual(["ffmpeg dry-run prepared."], notifications.messages)
self.assertEqual([None], notifications.levels)
self.assertEqual( self.assertEqual(
{ {
"applied": False, "applied": False,
@@ -204,7 +216,7 @@ class MetadataEditorTests(unittest.TestCase):
context["verbosity"] = 1 context["verbosity"] = 1
baseline_descriptor = make_descriptor() baseline_descriptor = make_descriptor()
draft_descriptor = baseline_descriptor.clone(context=context) draft_descriptor = baseline_descriptor.clone(context=context)
notifications = [] notifications = NotificationCollector()
with ( with (
patch("ffx.metadata_editor.create_temporary_output_path", return_value="/tmp/.edit.mkv"), patch("ffx.metadata_editor.create_temporary_output_path", return_value="/tmp/.edit.mkv"),
@@ -216,11 +228,12 @@ class MetadataEditorTests(unittest.TestCase):
"/tmp/example.mkv", "/tmp/example.mkv",
baseline_descriptor, baseline_descriptor,
draft_descriptor, draft_descriptor,
notify=notifications.append, loggingHandler = notifications,
) )
self.assertEqual(1, len(notifications)) self.assertEqual(1, len(notifications.messages))
self.assertTrue(notifications[0].startswith("ffmpeg: ffmpeg ")) self.assertTrue(notifications.messages[0].startswith("ffmpeg: ffmpeg "))
self.assertEqual([LogLevel.DEBUG], notifications.levels)
if __name__ == "__main__": if __name__ == "__main__":

View File

@@ -1,6 +1,7 @@
from __future__ import annotations from __future__ import annotations
from pathlib import Path from pathlib import Path
import logging
import sys import sys
import unittest import unittest
from unittest.mock import patch from unittest.mock import patch
@@ -57,9 +58,38 @@ class FakeScreen:
self.app = FakeApp(screen_stack) self.app = FakeApp(screen_stack)
class FakeRichLog:
def __init__(self):
self.messages = []
def write(self, message):
self.messages.append(message)
class FakeScreenWithLog:
def __init__(self):
self.log_view = FakeRichLog()
def query_one(self, selector, _widget_type=None):
if selector == f"#{screen_support.SCREEN_LOG_VIEW_ID}":
return self.log_view
raise LookupError(selector)
class FakeThreadedApp:
def __init__(self, screen):
self.screen = screen
self.calls = []
def call_from_thread(self, func, *args):
self.calls.append((func, args))
return func(*args)
class ScreenSupportTests(unittest.TestCase): class ScreenSupportTests(unittest.TestCase):
def tearDown(self): def tearDown(self):
set_current_language("de") set_current_language("de")
screen_support.set_screen_log_pane_enabled(False)
def make_context(self): def make_context(self):
return { return {
@@ -168,6 +198,63 @@ class ScreenSupportTests(unittest.TestCase):
self.assertGreater(len(translated), 8) self.assertGreater(len(translated), 8)
self.assertEqual(len(translated) + 2, screen_support.localized_column_width(translated, 8)) self.assertEqual(len(translated) + 2, screen_support.localized_column_width(translated, 8))
def test_build_screen_log_pane_is_hidden_when_debug_mode_is_disabled(self):
screen_support.set_screen_log_pane_enabled(False)
log_pane = screen_support.build_screen_log_pane()
self.assertFalse(log_pane.display)
def test_build_screen_log_pane_is_collapsed_when_debug_mode_is_enabled(self):
screen_support.set_screen_log_pane_enabled(True)
log_pane = screen_support.build_screen_log_pane()
self.assertIsInstance(log_pane, screen_support.ResizableScreenLogPane)
self.assertEqual(screen_support.SCREEN_LOG_PANE_ID, log_pane.id)
self.assertTrue(log_pane.collapsed)
def test_resizable_screen_log_pane_clamps_height_to_minimum(self):
log_pane = screen_support.ResizableScreenLogPane()
log_pane.set_log_height(1)
self.assertEqual(screen_support.SCREEN_LOG_MIN_HEIGHT, log_pane.get_log_height())
def test_configure_screen_log_handler_routes_logger_messages_to_active_screen(self):
logger_name = "ffx-test-screen-log-handler"
logger = logging.getLogger(logger_name)
logger.setLevel(logging.DEBUG)
logger.propagate = False
for handler in list(logger.handlers):
logger.removeHandler(handler)
handler.close()
screen = FakeScreenWithLog()
app = FakeThreadedApp(screen)
try:
handler = screen_support.configure_screen_log_handler(
logger,
app,
enabled=True,
)
self.assertIsNotNone(handler)
logger.info("hello pane")
self.assertEqual(1, len(screen.log_view.messages))
self.assertRegex(
screen.log_view.messages[0],
r"^ffx-test-screen-log-handler\s+INFO\s+\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2} \| hello pane$",
)
finally:
screen_support.configure_screen_log_handler(logger, app, enabled=False)
for handler in list(logger.handlers):
logger.removeHandler(handler)
handler.close()
if __name__ == "__main__": if __name__ == "__main__":
unittest.main() unittest.main()