3 Commits

Author SHA1 Message Date
Javanaut
0728ece4b8 Fix h265 subtrack unmux 2026-04-15 00:03:17 +02:00
Javanaut
02e375fbf2 nnn 2026-04-14 19:08:29 +02:00
Javanaut
14e6ce8458 Fix logging 2026-04-14 10:04:39 +02:00
13 changed files with 570 additions and 72 deletions

View File

@@ -69,3 +69,9 @@
## Delete When ## Delete When
- Delete this scratchpad once the optimization backlog is either converted into issues/work items or distilled into durable project guidance. - Delete this scratchpad once the optimization backlog is either converted into issues/work items or distilled into durable project guidance.
## Missing Timestamps
Detect ffmpeg warning "imestamps are unset in a packet for stream 0. This is deprecated and will stop working in the future. Fix your code to set the timestamps properly" and try autofix by -fflags +genpts -> Warning if fails -> Error. Check if flags collide with anything.

View File

@@ -252,9 +252,15 @@ def buildRenameTargetFilename(
@click.pass_context @click.pass_context
@click.option('--language', 'app_language', type=str, default='', help='Set application language') @click.option('--language', 'app_language', type=str, default='', help='Set application language')
@click.option('--database-file', type=str, default='', help='Path to database file') @click.option('--database-file', type=str, default='', help='Path to database file')
@click.option(
'--debug',
is_flag=True,
default=False,
help='Enable debug-only TUI diagnostics such as the log pane',
)
@click.option('-v', '--verbose', type=int, default=0, help='Set verbosity of output') @click.option('-v', '--verbose', type=int, default=0, help='Set verbosity of output')
@click.option("--dry-run", is_flag=True, default=False) @click.option("--dry-run", is_flag=True, default=False)
def ffx(ctx, app_language, database_file, verbose, dry_run): def ffx(ctx, app_language, database_file, debug, verbose, dry_run):
"""FFX""" """FFX"""
ctx.obj = {} ctx.obj = {}
@@ -274,6 +280,7 @@ def ffx(ctx, app_language, database_file, verbose, dry_run):
) )
set_current_language(resolvedLanguage) set_current_language(resolvedLanguage)
ctx.obj['language'] = resolvedLanguage ctx.obj['language'] = resolvedLanguage
ctx.obj['debug'] = bool(debug)
if ctx.invoked_subcommand in LIGHTWEIGHT_COMMANDS: if ctx.invoked_subcommand in LIGHTWEIGHT_COMMANDS:
ctx.obj['dry_run'] = dry_run ctx.obj['dry_run'] = dry_run
@@ -287,6 +294,7 @@ def ffx(ctx, app_language, database_file, verbose, dry_run):
ctx.obj['dry_run'] = dry_run ctx.obj['dry_run'] = dry_run
ctx.obj['verbosity'] = verbose ctx.obj['verbosity'] = verbose
ctx.obj['debug'] = bool(debug)
ctx.obj['language'] = resolve_application_language( ctx.obj['language'] = resolve_application_language(
cli_language=app_language, cli_language=app_language,
config_language=ctx.obj['config'].getLanguage(), config_language=ctx.obj['config'].getLanguage(),
@@ -391,6 +399,20 @@ def runScriptWrapper(ctx, scriptPath, missingDescription, commandArgs):
ctx.exit(completed.returncode) ctx.exit(completed.returncode)
def runTuiApp(ctx) -> None:
from ffx.ffx_app import FfxApp
from ffx.logging_utils import set_ffx_console_logging_enabled
logger = ctx.obj.get('logger')
set_ffx_console_logging_enabled(logger, enabled=False)
try:
app = FfxApp(ctx.obj)
app.run()
finally:
set_ffx_console_logging_enabled(logger, enabled=True)
@ffx.command(name='setup') @ffx.command(name='setup')
@click.pass_context @click.pass_context
@click.option('--check', is_flag=True, default=False, help='Only verify bundle-setup readiness') @click.option('--check', is_flag=True, default=False, help='Only verify bundle-setup readiness')
@@ -527,14 +549,11 @@ def inspect(ctx, shift, filenames):
if len(filenames) != 1: if len(filenames) != 1:
raise click.ClickException("Inspect without --shift requires exactly one filename.") raise click.ClickException("Inspect without --shift requires exactly one filename.")
from ffx.ffx_app import FfxApp
ctx.obj['command'] = 'inspect' ctx.obj['command'] = 'inspect'
ctx.obj['arguments'] = {} ctx.obj['arguments'] = {}
ctx.obj['arguments']['filename'] = filenames[0] ctx.obj['arguments']['filename'] = filenames[0]
app = FfxApp(ctx.obj) runTuiApp(ctx)
app.run()
@ffx.command() @ffx.command()
@@ -544,8 +563,6 @@ def edit(ctx, filename):
if not os.path.isfile(filename): if not os.path.isfile(filename):
raise click.ClickException(f"File not found: {filename}") raise click.ClickException(f"File not found: {filename}")
from ffx.ffx_app import FfxApp
ctx.obj['command'] = 'edit' ctx.obj['command'] = 'edit'
ctx.obj['arguments'] = {'filename': filename} ctx.obj['arguments'] = {'filename': filename}
ctx.obj['use_pattern'] = False ctx.obj['use_pattern'] = False
@@ -554,8 +571,7 @@ def edit(ctx, filename):
ctx.obj['apply_metadata_normalization'] = True ctx.obj['apply_metadata_normalization'] = True
ctx.obj['resource_limits'] = ctx.obj.get('resource_limits', {}) ctx.obj['resource_limits'] = ctx.obj.get('resource_limits', {})
app = FfxApp(ctx.obj) runTuiApp(ctx)
app.run()
@ffx.command() @ffx.command()
@@ -615,21 +631,24 @@ def rename(ctx, paths, prefix, season, suffix, dry_run):
def getUnmuxSequence(trackDescriptor: TrackDescriptor, sourcePath, targetPrefix, targetDirectory = ''): def getUnmuxSequence(trackDescriptor: TrackDescriptor, sourcePath, targetPrefix, targetDirectory = ''):
from ffx.track_codec import TrackCodec
from ffx.track_type import TrackType
# executable and input file # executable and input file
commandTokens = list(FFMPEG_COMMAND_TOKENS) + ['-i', sourcePath] commandTokens = list(FFMPEG_COMMAND_TOKENS) + ['-i', sourcePath]
trackType = trackDescriptor.getType() trackType = trackDescriptor.getType()
trackCodec = trackDescriptor.getCodec()
targetPathBase = os.path.join(targetDirectory, targetPrefix) if targetDirectory else targetPrefix targetPathBase = os.path.join(targetDirectory, targetPrefix) if targetDirectory else targetPrefix
# mapping # mapping
commandTokens += ['-map', commandTokens += ['-map', f"0:{trackType.indicator()}:{trackDescriptor.getSubIndex()}"]
f"0:{trackType.indicator()}:{trackDescriptor.getSubIndex()}",
'-c',
'copy']
trackCodec = trackDescriptor.getCodec() if trackType == TrackType.VIDEO and trackCodec == TrackCodec.H265:
commandTokens += ['-c:v', 'copy', '-bsf:v', 'hevc_mp4toannexb']
else:
commandTokens += ['-c', 'copy']
# output format # output format
codecFormat = trackCodec.format() codecFormat = trackCodec.format()
@@ -837,12 +856,8 @@ def cropdetect(ctx,
@click.pass_context @click.pass_context
def shows(ctx): def shows(ctx):
from ffx.ffx_app import FfxApp
ctx.obj['command'] = 'shows' ctx.obj['command'] = 'shows'
runTuiApp(ctx)
app = FfxApp(ctx.obj)
app.run()
def checkUniqueDispositions(context, mediaDescriptor: MediaDescriptor): def checkUniqueDispositions(context, mediaDescriptor: MediaDescriptor):

View File

@@ -4,7 +4,7 @@ from .i18n import set_current_language, t
from .shows_screen import ShowsScreen from .shows_screen import ShowsScreen
from .inspect_details_screen import InspectDetailsScreen from .inspect_details_screen import InspectDetailsScreen
from .media_edit_screen import MediaEditScreen from .media_edit_screen import MediaEditScreen
from .screen_support import toggle_screen_log_pane from .screen_support import configure_screen_log_handler, set_screen_log_pane_enabled
class FfxApp(App): class FfxApp(App):
@@ -14,7 +14,6 @@ class FfxApp(App):
BINDINGS = [ BINDINGS = [
("q", "quit()", t("Quit")), ("q", "quit()", t("Quit")),
("h", "switch_mode('help')", t("Help")), ("h", "switch_mode('help')", t("Help")),
("l", "toggle_log_pane", t("Log")),
] ]
@@ -24,6 +23,13 @@ class FfxApp(App):
# Data 'input' variable # Data 'input' variable
self.context = context self.context = context
set_current_language(self.context.get("language")) set_current_language(self.context.get("language"))
debug_mode = bool(self.context.get("debug", False))
set_screen_log_pane_enabled(debug_mode)
configure_screen_log_handler(
self.context.get("logger"),
self,
enabled=debug_mode,
)
def on_mount(self) -> None: def on_mount(self) -> None:
@@ -43,6 +49,3 @@ class FfxApp(App):
def getContext(self): def getContext(self):
"""Data 'output' method""" """Data 'output' method"""
return self.context return self.context
def action_toggle_log_pane(self) -> None:
toggle_screen_log_pane(self.screen)

View File

@@ -6,12 +6,23 @@ from .configuration_controller import ConfigurationController
from .logging_utils import get_ffx_logger from .logging_utils import get_ffx_logger
from .show_descriptor import ShowDescriptor from .show_descriptor import ShowDescriptor
from enum import Enum
class EmptyStringUndefined(Undefined): class EmptyStringUndefined(Undefined):
def __str__(self): def __str__(self):
return '' return ''
class LogLevel(Enum):
DEBUG = 'debug'
INFO = 'info'
WARNING = 'warning'
ERROR = 'error'
CRITICAL = 'critical'
DIFF_ADDED_KEY = 'added' DIFF_ADDED_KEY = 'added'
DIFF_REMOVED_KEY = 'removed' DIFF_REMOVED_KEY = 'removed'
DIFF_CHANGED_KEY = 'changed' DIFF_CHANGED_KEY = 'changed'
@@ -119,7 +130,7 @@ def setDiff(a : set, b : set) -> set:
def permutateList(inputList: list, permutation: list): def permutateList(inputList: list, permutation: list):
# 0,1,2: ABC # 0,1,2: ABC
# 0,2,1: ACB # 0,2,1: ACBffmpeg:
# 1,2,0: BCA # 1,2,0: BCA
pass pass

View File

@@ -5,6 +5,7 @@ import os
FFX_LOGGER_NAME = "FFX" FFX_LOGGER_NAME = "FFX"
CONSOLE_HANDLER_NAME = "ffx-console" CONSOLE_HANDLER_NAME = "ffx-console"
FILE_HANDLER_NAME = "ffx-file" FILE_HANDLER_NAME = "ffx-file"
MUTED_CONSOLE_LEVEL = logging.CRITICAL + 1
def get_ffx_logger(name: str = FFX_LOGGER_NAME) -> logging.Logger: def get_ffx_logger(name: str = FFX_LOGGER_NAME) -> logging.Logger:
@@ -66,3 +67,31 @@ def configure_ffx_logger(
) )
return logger return logger
def set_ffx_console_logging_enabled(
logger: logging.Logger | None,
*,
enabled: bool,
):
if logger is None:
return None
console_handler = next(
(handler for handler in logger.handlers if handler.get_name() == CONSOLE_HANDLER_NAME),
None,
)
if console_handler is None:
return None
if enabled:
saved_level = getattr(console_handler, "_ffx_saved_level", None)
if saved_level is not None:
console_handler.setLevel(saved_level)
delattr(console_handler, "_ffx_saved_level")
return console_handler
if not hasattr(console_handler, "_ffx_saved_level"):
console_handler._ffx_saved_level = console_handler.level
console_handler.setLevel(MUTED_CONSOLE_LEVEL)
return console_handler

View File

@@ -12,11 +12,13 @@ from ffx.track_descriptor import TrackDescriptor
from .i18n import t from .i18n import t
from .confirm_screen import ConfirmScreen from .confirm_screen import ConfirmScreen
from .media_workflow_screen_base import MediaWorkflowScreenBase from .media_workflow_screen_base import MediaWorkflowScreenBase
from .screen_support import build_screen_log_pane, localized_column_width, write_screen_log from .screen_support import build_screen_log_pane, localized_column_width
from .tag_delete_screen import TagDeleteScreen from .tag_delete_screen import TagDeleteScreen
from .tag_details_screen import TagDetailsScreen from .tag_details_screen import TagDetailsScreen
from .track_details_screen import TrackDetailsScreen from .track_details_screen import TrackDetailsScreen
from .helper import LogLevel
class MediaEditScreen(MediaWorkflowScreenBase): class MediaEditScreen(MediaWorkflowScreenBase):
@@ -207,9 +209,24 @@ class MediaEditScreen(MediaWorkflowScreenBase):
if self._messageText: if self._messageText:
self.notify(self._messageText) self.notify(self._messageText)
def _notify_from_worker(self, message: str) -> None:
self.app.call_from_thread(write_screen_log, self, str(message)) def workerLoggingHandler(self,
self.app.call_from_thread(self.notify, str(message)) message: str,
level: LogLevel = LogLevel.INFO) -> None:
if level == LogLevel.DEBUG:
self.context["logger"].debug(str(message))
elif level == LogLevel.INFO:
self.context["logger"].info(str(message))
elif level == LogLevel.WARNING:
self.context["logger"].warning(str(message))
elif level == LogLevel.ERROR:
self.context["logger"].error(str(message))
elif level == LogLevel.CRITICAL:
self.context["logger"].critical(str(message))
else:
raise Exception(f"Undefined Logging Level (msg={message})")
def _report_apply_timings(self, applyResult: dict, reloadSeconds: float = 0.0) -> None: def _report_apply_timings(self, applyResult: dict, reloadSeconds: float = 0.0) -> None:
timings = dict(applyResult.get("timings", {})) timings = dict(applyResult.get("timings", {}))
@@ -226,10 +243,6 @@ class MediaEditScreen(MediaWorkflowScreenBase):
+ f"total={totalSeconds:.2f}s" + f"total={totalSeconds:.2f}s"
) )
self.context["logger"].info(timingSummary) self.context["logger"].info(timingSummary)
write_screen_log(self, timingSummary)
if int(self.context.get("verbosity", 0) or 0) > 0:
self.notify(timingSummary)
def updateToggleButtons(self): def updateToggleButtons(self):
self._set_toggle_button_state( self._set_toggle_button_state(
@@ -402,9 +415,8 @@ class MediaEditScreen(MediaWorkflowScreenBase):
self.setMessage(t("Apply already running.")) self.setMessage(t("Apply already running."))
return return
write_screen_log( self.context["logger"].info(
self, t("Starting metadata apply for {filename}.", filename=self._mediaFilename)
t("Starting metadata apply for {filename}.", filename=self._mediaFilename),
) )
self._applyChangesWorker = self.run_apply_changes_worker() self._applyChangesWorker = self.run_apply_changes_worker()
@@ -420,7 +432,7 @@ class MediaEditScreen(MediaWorkflowScreenBase):
self._mediaFilename, self._mediaFilename,
self._baselineMediaDescriptor, self._baselineMediaDescriptor,
self._sourceMediaDescriptor, self._sourceMediaDescriptor,
notify=self._notify_from_worker, loggingHandler = self.workerLoggingHandler,
) )
def on_worker_state_changed(self, event: Worker.StateChanged) -> None: def on_worker_state_changed(self, event: Worker.StateChanged) -> None:
@@ -435,7 +447,6 @@ class MediaEditScreen(MediaWorkflowScreenBase):
self._mediaFilename, self._mediaFilename,
exc_info=(type(error), error, error.__traceback__), exc_info=(type(error), error, error.__traceback__),
) )
write_screen_log(self, t("Apply failed: {error}", error=error))
self.setMessage(t("Apply failed: {error}", error=error)) self.setMessage(t("Apply failed: {error}", error=error))
self._applyChangesWorker = None self._applyChangesWorker = None
return return
@@ -447,8 +458,7 @@ class MediaEditScreen(MediaWorkflowScreenBase):
if applyResult.get("dry_run", False): if applyResult.get("dry_run", False):
self._report_apply_timings(applyResult, reloadSeconds=0.0) self._report_apply_timings(applyResult, reloadSeconds=0.0)
write_screen_log( self.context["logger"].info(
self,
t( t(
"Dry-run prepared temporary output {target_path}.", "Dry-run prepared temporary output {target_path}.",
target_path=applyResult["target_path"], target_path=applyResult["target_path"],
@@ -464,12 +474,12 @@ class MediaEditScreen(MediaWorkflowScreenBase):
return return
reloadStart = monotonic() reloadStart = monotonic()
write_screen_log(self, t("Reloading file after metadata write.")) self.context["logger"].info(t("Reloading file after metadata write."))
self.reloadProperties(reset_draft=True) self.reloadProperties(reset_draft=True)
self.refreshAfterDraftChange() self.refreshAfterDraftChange()
reloadSeconds = monotonic() - reloadStart reloadSeconds = monotonic() - reloadStart
self._report_apply_timings(applyResult, reloadSeconds=reloadSeconds) self._report_apply_timings(applyResult, reloadSeconds=reloadSeconds)
write_screen_log(self, t("Changes applied and file reloaded.")) self.context["logger"].info(t("Changes applied and file reloaded."))
self.setMessage(t("Changes applied and file reloaded.")) self.setMessage(t("Changes applied and file reloaded."))
self._applyChangesWorker = None self._applyChangesWorker = None

View File

@@ -16,6 +16,8 @@ from .media_descriptor_change_set import MediaDescriptorChangeSet
from .process import executeProcess, formatCommandSequence from .process import executeProcess, formatCommandSequence
from .video_encoder import VideoEncoder from .video_encoder import VideoEncoder
from .helper import LogLevel
def create_temporary_output_path(source_path: str) -> str: def create_temporary_output_path(source_path: str) -> str:
sourceDirectory = os.path.dirname(os.path.abspath(source_path)) or "." sourceDirectory = os.path.dirname(os.path.abspath(source_path)) or "."
@@ -75,22 +77,22 @@ def notify_ffmpeg_invocation(
context: dict, context: dict,
command_sequence: list[str], command_sequence: list[str],
*, *,
notify=None, loggingHandler = None,
dry_run: bool = False, dry_run: bool = False,
) -> None: ) -> None:
notify_callback = notify or context.get("notify_callback") loggingCallback = loggingHandler or context.get("logging_handler")
if not callable(notify_callback): if not callable(loggingCallback):
return return
verbosity = int(context.get("verbosity", 0) or 0) verbosity = int(context.get("verbosity", 0) or 0)
if verbosity > 0: if verbosity > 0:
if dry_run: if dry_run:
notify_callback(f"ffmpeg dry-run: {formatCommandSequence(command_sequence)}") loggingCallback(f"ffmpeg dry-run: {formatCommandSequence(command_sequence)}", level = LogLevel.DEBUG)
else: else:
notify_callback(f"ffmpeg: {formatCommandSequence(command_sequence)}") loggingCallback(f"ffmpeg: {formatCommandSequence(command_sequence)}", level = LogLevel.DEBUG)
return return
notify_callback("ffmpeg dry-run prepared.") if dry_run else notify_callback( loggingCallback("ffmpeg dry-run prepared.") if dry_run else loggingCallback(
"ffmpeg metadata write started." "ffmpeg metadata write started."
) )
@@ -101,7 +103,7 @@ def apply_metadata_edits(
baseline_descriptor: MediaDescriptor, baseline_descriptor: MediaDescriptor,
draft_descriptor: MediaDescriptor, draft_descriptor: MediaDescriptor,
*, *,
notify=None, loggingHandler = None,
) -> dict[str, object]: ) -> dict[str, object]:
temporaryOutputPath = create_temporary_output_path(source_path) temporaryOutputPath = create_temporary_output_path(source_path)
@@ -126,7 +128,7 @@ def apply_metadata_edits(
notify_ffmpeg_invocation( notify_ffmpeg_invocation(
editContext, editContext,
commandSequence, commandSequence,
notify=notify, loggingHandler = loggingHandler,
dry_run=True, dry_run=True,
) )
@@ -142,7 +144,9 @@ def apply_metadata_edits(
}, },
} }
notify_ffmpeg_invocation(editContext, commandSequence, notify=notify) notify_ffmpeg_invocation(editContext,
commandSequence,
loggingHandler = loggingHandler)
ffmpegStart = monotonic() ffmpegStart = monotonic()
_out, err, rc = executeProcess(commandSequence, context=editContext) _out, err, rc = executeProcess(commandSequence, context=editContext)

View File

@@ -1,12 +1,15 @@
from __future__ import annotations from __future__ import annotations
import logging
import weakref
from collections.abc import Mapping from collections.abc import Mapping
from dataclasses import dataclass from dataclasses import dataclass
from rich.cells import cell_len from rich.cells import cell_len
from rich.measure import measure_renderables from rich.measure import measure_renderables
from rich.text import Text from rich.text import Text
from textual.widgets import Collapsible, RichLog from textual import events
from textual.widgets import Collapsible, RichLog, Static
from .helper import formatRichColor from .helper import formatRichColor
from .i18n import t from .i18n import t
@@ -20,6 +23,152 @@ from .track_controller import TrackController
SCREEN_LOG_PANE_ID = "screen_log_pane" SCREEN_LOG_PANE_ID = "screen_log_pane"
SCREEN_LOG_VIEW_ID = "screen_log_view" SCREEN_LOG_VIEW_ID = "screen_log_view"
SCREEN_LOG_RESIZE_HANDLE_ID = "screen_log_resize_handle"
SCREEN_LOG_HANDLER_NAME = "ffx-screen-log"
SCREEN_LOG_DEFAULT_HEIGHT = 8
SCREEN_LOG_MIN_HEIGHT = 4
SCREEN_LOG_COMPONENT_WIDTH = 16
SCREEN_LOG_LEVEL_WIDTH = 8
_SCREEN_LOG_PANE_ENABLED = False
class ScreenLogHandler(logging.Handler):
"""Mirror logger output into the active screen log pane when available."""
def __init__(self, app) -> None:
super().__init__(level=logging.DEBUG)
self.set_name(SCREEN_LOG_HANDLER_NAME)
self.set_app(app)
def set_app(self, app) -> None:
self._app_ref = weakref.ref(app) if app is not None else lambda: None
def emit(self, record: logging.LogRecord) -> None:
app = self._app_ref()
if app is None:
return
try:
message = str(self.format(record)).strip()
except Exception:
self.handleError(record)
return
if not message:
return
try:
app.call_from_thread(write_screen_log, app.screen, message)
except RuntimeError:
write_screen_log(app.screen, message)
except Exception:
self.handleError(record)
class ScreenLogResizeHandle(Static):
DEFAULT_CSS = """
ScreenLogResizeHandle {
width: 100%;
height: 1;
content-align: center middle;
color: $text-muted;
background: $panel-lighten-1;
}
ScreenLogResizeHandle:hover {
color: $text;
background: $panel-lighten-2;
}
"""
def __init__(self) -> None:
super().__init__(" drag to resize ", id=SCREEN_LOG_RESIZE_HANDLE_ID)
self._drag_active = False
self._drag_origin_screen_y = 0
self._drag_origin_height = SCREEN_LOG_DEFAULT_HEIGHT
def _get_log_pane(self):
return self.parent.parent if self.parent is not None else None
def on_mouse_down(self, event: events.MouseDown) -> None:
if event.button != 1:
return
log_pane = self._get_log_pane()
if log_pane is None:
return
self._drag_active = True
self._drag_origin_screen_y = event.screen_y
self._drag_origin_height = log_pane.get_log_height()
self.capture_mouse()
event.stop()
def on_mouse_move(self, event: events.MouseMove) -> None:
if not self._drag_active:
return
log_pane = self._get_log_pane()
if log_pane is None:
return
next_height = self._drag_origin_height + (
self._drag_origin_screen_y - event.screen_y
)
log_pane.set_log_height(next_height)
event.stop()
def on_mouse_up(self, event: events.MouseUp) -> None:
if not self._drag_active:
return
self._drag_active = False
self.release_mouse()
event.stop()
class ResizableScreenLogPane(Collapsible):
def __init__(self) -> None:
self._log_view = RichLog(
id=SCREEN_LOG_VIEW_ID,
wrap=True,
markup=False,
highlight=False,
auto_scroll=True,
)
self._log_height = SCREEN_LOG_DEFAULT_HEIGHT
self._apply_log_height()
super().__init__(
ScreenLogResizeHandle(),
self._log_view,
title=t("Log"),
collapsed=True,
id=SCREEN_LOG_PANE_ID,
)
self.styles.width = "100%"
def _apply_log_height(self) -> None:
self._log_view.styles.height = self._log_height
self._log_view.styles.width = "100%"
def get_log_height(self) -> int:
return int(self._log_height)
def set_log_height(self, height: int) -> None:
next_height = max(SCREEN_LOG_MIN_HEIGHT, int(height))
try:
available_height = int(self.app.size.height) - 8
except Exception:
available_height = next_height
if available_height > 0:
next_height = min(next_height, available_height)
self._log_height = next_height
self._apply_log_height()
@dataclass(frozen=True) @dataclass(frozen=True)
@@ -52,6 +201,48 @@ def build_screen_bootstrap(context: dict) -> ScreenBootstrap:
) )
def set_screen_log_pane_enabled(enabled: bool) -> None:
global _SCREEN_LOG_PANE_ENABLED
_SCREEN_LOG_PANE_ENABLED = bool(enabled)
def is_screen_log_pane_enabled() -> bool:
return bool(_SCREEN_LOG_PANE_ENABLED)
def configure_screen_log_handler(logger, app, *, enabled: bool):
if logger is None:
return None
screen_log_handler = next(
(handler for handler in logger.handlers if handler.get_name() == SCREEN_LOG_HANDLER_NAME),
None,
)
if not enabled:
if screen_log_handler is not None:
logger.removeHandler(screen_log_handler)
screen_log_handler.close()
return None
if screen_log_handler is None:
screen_log_handler = ScreenLogHandler(app)
logger.addHandler(screen_log_handler)
elif isinstance(screen_log_handler, ScreenLogHandler):
screen_log_handler.set_app(app)
screen_log_handler.setLevel(logging.DEBUG)
screen_log_handler.setFormatter(
logging.Formatter(
f"%(name)-{SCREEN_LOG_COMPONENT_WIDTH}s "
+ f"%(levelname)-{SCREEN_LOG_LEVEL_WIDTH}s "
+ "%(asctime)s | %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
)
return screen_log_handler
def build_screen_controllers( def build_screen_controllers(
context: dict, context: dict,
*, *,
@@ -149,27 +340,15 @@ def update_table_column_label(table, column_key, label) -> None:
table.refresh() table.refresh()
def build_screen_log_pane() -> Collapsible: def build_screen_log_pane() -> ResizableScreenLogPane | Static:
"""Create a shared collapsible log pane for screen-local diagnostics.""" """Create a shared collapsible log pane for screen-local diagnostics."""
logView = RichLog( if not is_screen_log_pane_enabled():
id=SCREEN_LOG_VIEW_ID, hidden = Static("", id=f"{SCREEN_LOG_PANE_ID}_disabled")
wrap=True, hidden.display = False
markup=False, return hidden
highlight=False,
auto_scroll=True,
)
logView.styles.height = 8
logView.styles.width = "100%"
logPane = Collapsible( return ResizableScreenLogPane()
logView,
title=t("Log"),
collapsed=True,
id=SCREEN_LOG_PANE_ID,
)
logPane.styles.width = "100%"
return logPane
def toggle_screen_log_pane(screen) -> bool: def toggle_screen_log_pane(screen) -> bool:

View File

@@ -168,6 +168,40 @@ class CliLazyImportTests(unittest.TestCase):
result["modules"], result["modules"],
) )
def test_root_debug_flag_parses_without_loading_runtime_modules(self):
result = self.run_python(
textwrap.dedent(
f"""
import json
import sys
sys.path.insert(0, {str(SRC_ROOT)!r})
import ffx.cli
context = ffx.cli.ffx.make_context(
"ffx",
["--debug", "help"],
resilient_parsing=True,
)
print(json.dumps({{
"debug": context.params["debug"],
"modules": {{
module_name: module_name in sys.modules
for module_name in {HEAVY_MODULES!r}
}},
}}))
"""
)
)
self.assertTrue(result["debug"])
self.assertTrue(
all(not is_loaded for is_loaded in result["modules"].values()),
result["modules"],
)
def test_convert_cut_option_supports_flag_duration_and_start_duration_forms(self): def test_convert_cut_option_supports_flag_duration_and_start_duration_forms(self):
result = self.run_python( result = self.run_python(
textwrap.dedent( textwrap.dedent(

View File

@@ -0,0 +1,91 @@
from __future__ import annotations
from pathlib import Path
import sys
import unittest
SRC_ROOT = Path(__file__).resolve().parents[2] / "src"
if str(SRC_ROOT) not in sys.path:
sys.path.insert(0, str(SRC_ROOT))
from ffx import cli # noqa: E402
from ffx.track_codec import TrackCodec # noqa: E402
from ffx.track_descriptor import TrackDescriptor # noqa: E402
from ffx.track_type import TrackType # noqa: E402
class UnmuxSequenceTests(unittest.TestCase):
def test_h265_video_unmux_uses_annex_b_bitstream_filter(self):
track_descriptor = TrackDescriptor(
index=0,
sub_index=0,
track_type=TrackType.VIDEO,
codec_name=TrackCodec.H265,
tags={},
disposition_set=set(),
)
sequence = cli.getUnmuxSequence(
track_descriptor,
"input.mp4",
"episode_0_eng",
)
self.assertEqual(
[
"ffmpeg",
"-y",
"-i",
"input.mp4",
"-map",
"0:v:0",
"-c:v",
"copy",
"-bsf:v",
"hevc_mp4toannexb",
"-f",
"h265",
"episode_0_eng.h265",
],
sequence,
)
def test_non_h265_unmux_keeps_generic_copy_behavior(self):
track_descriptor = TrackDescriptor(
index=1,
sub_index=0,
track_type=TrackType.SUBTITLE,
codec_name=TrackCodec.SRT,
tags={},
disposition_set=set(),
)
sequence = cli.getUnmuxSequence(
track_descriptor,
"input.mkv",
"episode_1_eng",
)
self.assertEqual(
[
"ffmpeg",
"-y",
"-i",
"input.mkv",
"-map",
"0:s:0",
"-c",
"copy",
"-f",
"srt",
"episode_1_eng.srt",
],
sequence,
)
if __name__ == "__main__":
unittest.main()

View File

@@ -16,8 +16,10 @@ if str(SRC_ROOT) not in sys.path:
from ffx.logging_utils import ( # noqa: E402 from ffx.logging_utils import ( # noqa: E402
CONSOLE_HANDLER_NAME, CONSOLE_HANDLER_NAME,
FILE_HANDLER_NAME, FILE_HANDLER_NAME,
MUTED_CONSOLE_LEVEL,
configure_ffx_logger, configure_ffx_logger,
get_ffx_logger, get_ffx_logger,
set_ffx_console_logging_enabled,
) )
@@ -81,6 +83,33 @@ class LoggingUtilsTests(unittest.TestCase):
self.cleanup_logger(logger_name) self.cleanup_logger(logger_name)
def test_set_ffx_console_logging_enabled_mutes_and_restores_console_handler(self):
logger_name = "ffx-test-console-mute"
self.cleanup_logger(logger_name)
with tempfile.TemporaryDirectory() as tempdir:
log_path = Path(tempdir) / "ffx.log"
logger = configure_ffx_logger(
str(log_path),
logging.DEBUG,
logging.INFO,
name=logger_name,
)
console_handler = next(
handler for handler in logger.handlers if handler.get_name() == CONSOLE_HANDLER_NAME
)
self.assertEqual(logging.INFO, console_handler.level)
set_ffx_console_logging_enabled(logger, enabled=False)
self.assertEqual(MUTED_CONSOLE_LEVEL, console_handler.level)
set_ffx_console_logging_enabled(logger, enabled=True)
self.assertEqual(logging.INFO, console_handler.level)
self.cleanup_logger(logger_name)
if __name__ == "__main__": if __name__ == "__main__":
unittest.main() unittest.main()

View File

@@ -170,7 +170,7 @@ class MetadataEditorTests(unittest.TestCase):
"/tmp/example.mkv", "/tmp/example.mkv",
baseline_descriptor, baseline_descriptor,
draft_descriptor, draft_descriptor,
notify=notifications.append, loggingHandler = notifications.append,
) )
mocked_execute.assert_not_called() mocked_execute.assert_not_called()
@@ -216,7 +216,7 @@ class MetadataEditorTests(unittest.TestCase):
"/tmp/example.mkv", "/tmp/example.mkv",
baseline_descriptor, baseline_descriptor,
draft_descriptor, draft_descriptor,
notify=notifications.append, loggingHandler = notifications.append,
) )
self.assertEqual(1, len(notifications)) self.assertEqual(1, len(notifications))

View File

@@ -1,6 +1,7 @@
from __future__ import annotations from __future__ import annotations
from pathlib import Path from pathlib import Path
import logging
import sys import sys
import unittest import unittest
from unittest.mock import patch from unittest.mock import patch
@@ -57,9 +58,38 @@ class FakeScreen:
self.app = FakeApp(screen_stack) self.app = FakeApp(screen_stack)
class FakeRichLog:
def __init__(self):
self.messages = []
def write(self, message):
self.messages.append(message)
class FakeScreenWithLog:
def __init__(self):
self.log_view = FakeRichLog()
def query_one(self, selector, _widget_type=None):
if selector == f"#{screen_support.SCREEN_LOG_VIEW_ID}":
return self.log_view
raise LookupError(selector)
class FakeThreadedApp:
def __init__(self, screen):
self.screen = screen
self.calls = []
def call_from_thread(self, func, *args):
self.calls.append((func, args))
return func(*args)
class ScreenSupportTests(unittest.TestCase): class ScreenSupportTests(unittest.TestCase):
def tearDown(self): def tearDown(self):
set_current_language("de") set_current_language("de")
screen_support.set_screen_log_pane_enabled(False)
def make_context(self): def make_context(self):
return { return {
@@ -168,6 +198,63 @@ class ScreenSupportTests(unittest.TestCase):
self.assertGreater(len(translated), 8) self.assertGreater(len(translated), 8)
self.assertEqual(len(translated) + 2, screen_support.localized_column_width(translated, 8)) self.assertEqual(len(translated) + 2, screen_support.localized_column_width(translated, 8))
def test_build_screen_log_pane_is_hidden_when_debug_mode_is_disabled(self):
screen_support.set_screen_log_pane_enabled(False)
log_pane = screen_support.build_screen_log_pane()
self.assertFalse(log_pane.display)
def test_build_screen_log_pane_is_collapsed_when_debug_mode_is_enabled(self):
screen_support.set_screen_log_pane_enabled(True)
log_pane = screen_support.build_screen_log_pane()
self.assertIsInstance(log_pane, screen_support.ResizableScreenLogPane)
self.assertEqual(screen_support.SCREEN_LOG_PANE_ID, log_pane.id)
self.assertTrue(log_pane.collapsed)
def test_resizable_screen_log_pane_clamps_height_to_minimum(self):
log_pane = screen_support.ResizableScreenLogPane()
log_pane.set_log_height(1)
self.assertEqual(screen_support.SCREEN_LOG_MIN_HEIGHT, log_pane.get_log_height())
def test_configure_screen_log_handler_routes_logger_messages_to_active_screen(self):
logger_name = "ffx-test-screen-log-handler"
logger = logging.getLogger(logger_name)
logger.setLevel(logging.DEBUG)
logger.propagate = False
for handler in list(logger.handlers):
logger.removeHandler(handler)
handler.close()
screen = FakeScreenWithLog()
app = FakeThreadedApp(screen)
try:
handler = screen_support.configure_screen_log_handler(
logger,
app,
enabled=True,
)
self.assertIsNotNone(handler)
logger.info("hello pane")
self.assertEqual(1, len(screen.log_view.messages))
self.assertRegex(
screen.log_view.messages[0],
r"^ffx-test-screen-log-handler\s+INFO\s+\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2} \| hello pane$",
)
finally:
screen_support.configure_screen_log_handler(logger, app, enabled=False)
for handler in list(logger.handlers):
logger.removeHandler(handler)
handler.close()
if __name__ == "__main__": if __name__ == "__main__":
unittest.main() unittest.main()