10 Commits

Author SHA1 Message Date
Javanaut
6dfbe1022a Merge branch 'dev' of gitea.maveno.de:Javanaut/ffx into dev 2026-04-15 15:50:27 +02:00
Javanaut
d3d2de8a0d adds scratchpad points 2026-04-15 15:50:24 +02:00
Javanaut
0728ece4b8 Fix h265 subtrack unmux 2026-04-15 00:03:17 +02:00
Javanaut
02e375fbf2 nnn 2026-04-14 19:08:29 +02:00
Javanaut
14e6ce8458 Fix logging 2026-04-14 10:04:39 +02:00
Javanaut
d921629947 v0.3.0 2026-04-14 00:55:42 +02:00
Javanaut
65490e2a7f ff 2026-04-14 00:44:43 +02:00
Javanaut
6c5b518e4d ffn 2026-04-14 00:26:16 +02:00
Javanaut
e3c18f22d4 Adds UI tweaks nightly 2026-04-13 23:11:14 +02:00
Javanaut
57185c7f10 Adds missing codecs 2026-04-13 20:15:10 +02:00
41 changed files with 1556 additions and 110 deletions

1
.gitignore vendored
View File

@@ -20,5 +20,6 @@ venv/
*.mkv *.mkv
*.webm *.webm
*.mp4
ffmpeg2pass-0.log ffmpeg2pass-0.log
*.sup *.sup

View File

@@ -99,6 +99,13 @@ TMDB-backed metadata enrichment requires `TMDB_API_KEY` to be set in the environ
## Version History ## Version History
### 0.3.0
- inspect and edit screens now refresh nested track and pattern changes more reliably, with inspect-mode tables aligned to the target pattern view shown in the differences pane
- metadata editing got a follow-up polish pass with clearer ffmpeg notifications, a shared in-screen log pane, safer apply/reload handling, and expanded cleanup and normalization coverage
- track and asset probing recognize additional codecs, and the modern test suite now covers more metadata-editor, change-set, screen-state, and asset-probe behavior
- Textual now requires version `8.0` or newer to match the UI APIs used by the current screens
### 0.2.6 ### 0.2.6
- DB-free `ffx edit` workflow for in-place metadata editing via temporary-file rewrite - DB-free `ffx edit` workflow for in-place metadata editing via temporary-file rewrite

View File

@@ -69,3 +69,13 @@
## Delete When ## Delete When
- Delete this scratchpad once the optimization backlog is either converted into issues/work items or distilled into durable project guidance. - Delete this scratchpad once the optimization backlog is either converted into issues/work items or distilled into durable project guidance.
## Missing Timestamps
Detect ffmpeg warning "Timestamps are unset in a packet for stream 0. This is deprecated and will stop working in the future. Fix your code to set the timestamps properly" and try autofix by -fflags +genpts -> Warning if fails -> Error. Check if flags collide with anything.
## .265 export
-map 0:v -c:v copy -bsf:v hevc_mp4toannexb out.h265

View File

@@ -1,13 +1,13 @@
[project] [project]
name = "ffx" name = "ffx"
description = "FFX recoding and metadata managing tool" description = "FFX recoding and metadata managing tool"
version = "0.2.6" version = "0.3.0"
license = {file = "LICENSE.md"} license = {file = "LICENSE.md"}
dependencies = [ dependencies = [
"requests", "requests",
"jinja2", "jinja2",
"click", "click",
"textual", "textual>=8.0",
"sqlalchemy", "sqlalchemy",
] ]
readme = {file = "README.md", content-type = "text/markdown"} readme = {file = "README.md", content-type = "text/markdown"}

View File

@@ -104,6 +104,11 @@ The main missing pieces are:
- `METADATA_EDITOR-0013`: The command shall write changes through an ffmpeg - `METADATA_EDITOR-0013`: The command shall write changes through an ffmpeg
stream-copy remux workflow only. No transcoding shall be performed as part of stream-copy remux workflow only. No transcoding shall be performed as part of
`ffx edit`. `ffx edit`.
- `METADATA_EDITOR-0013A`: The ffmpeg invocation used by `ffx edit` shall map
all source streams with `-map 0` and shall copy all mapped streams with a
single `-c copy`. It shall not emit conversion-style per-stream `-map` or
`-c:*` options that could drop, reorder, or transcode streams during a
metadata-only edit.
- `METADATA_EDITOR-0014`: Because ffmpeg cannot rewrite the source file in - `METADATA_EDITOR-0014`: Because ffmpeg cannot rewrite the source file in
place, `ffx edit` shall write to a temporary output file on the same place, `ffx edit` shall write to a temporary output file on the same
filesystem as the source file and shall replace the original path only after filesystem as the source file and shall replace the original path only after
@@ -130,13 +135,19 @@ The main missing pieces are:
configuration shall be surfaced clearly in the UI and in the planned-changes configuration shall be surfaced clearly in the UI and in the planned-changes
view. If those rules are applied during save, the operator shall be able to view. If those rules are applied during save, the operator shall be able to
tell that the file will be cleaned in addition to any manual edits. tell that the file will be cleaned in addition to any manual edits.
- `METADATA_EDITOR-0022`: The command shall provide an invocation-level way to - `METADATA_EDITOR-0022`: Edit mode shall provide an in-screen operator toggle
disable config-driven cleanup when the operator wants a pure manual metadata for config-driven cleanup so a user can switch between pure manual metadata
edit without automatic tag removal. edits and metadata edits plus configured tag cleanup without leaving the
editor.
- `METADATA_EDITOR-0023`: The existing global `--dry-run` behavior shall apply - `METADATA_EDITOR-0023`: The existing global `--dry-run` behavior shall apply
to `ffx edit`. In dry-run mode the command shall not replace the original to `ffx edit`. In dry-run mode the command shall not replace the original
file and shall expose the planned write operation clearly enough for the user file and shall expose the planned write operation clearly enough for the user
to understand what would happen. to understand what would happen.
- `METADATA_EDITOR-0024`: Every ffmpeg invocation performed by `ffx edit`
shall be surfaced to the operator as a notification in the edit UI.
- `METADATA_EDITOR-0025`: When application verbosity is greater than zero, the
notification for an `ffx edit` ffmpeg invocation shall include the concrete
ffmpeg command line.
## Acceptance ## Acceptance
@@ -154,6 +165,8 @@ The main missing pieces are:
- The planned-changes view reflects manual edits relative to the original file - The planned-changes view reflects manual edits relative to the original file
and, when enabled, any configured cleanup removals. and, when enabled, any configured cleanup removals.
- No rendered Rich or Textual color markup appears in the saved file metadata. - No rendered Rich or Textual color markup appears in the saved file metadata.
- Saving metadata with files that contain PGS subtitle tracks or other
non-text subtitle codecs preserves those streams instead of dropping them.
- If ffmpeg fails while saving, the original file remains present and readable - If ffmpeg fails while saving, the original file remains present and readable
at the original path. at the original path.
- In dry-run mode, the original file remains untouched. - In dry-run mode, the original file remains untouched.

View File

@@ -98,7 +98,7 @@
- Intended for local execution, not server deployment. - Intended for local execution, not server deployment.
- Stores default state in `~/.local/etc/ffx.json`, `~/.local/var/ffx/ffx.db`, and `~/.local/var/log/ffx.log`. - Stores default state in `~/.local/etc/ffx.json`, `~/.local/var/ffx/ffx.db`, and `~/.local/var/log/ffx.log`.
- Timeline constraints: - Timeline constraints:
- The current implemented scope reflects a compact alpha release stream up to version `0.2.6`. - The current implemented scope reflects a compact alpha release stream up to version `0.3.0`.
- Team capacity assumptions: - Team capacity assumptions:
- Maintained as a small codebase where simple patterns and direct controller logic are preferred over framework-heavy abstractions. - Maintained as a small codebase where simple patterns and direct controller logic are preferred over framework-heavy abstractions.
- Third-party dependencies: - Third-party dependencies:

View File

@@ -252,9 +252,15 @@ def buildRenameTargetFilename(
@click.pass_context @click.pass_context
@click.option('--language', 'app_language', type=str, default='', help='Set application language') @click.option('--language', 'app_language', type=str, default='', help='Set application language')
@click.option('--database-file', type=str, default='', help='Path to database file') @click.option('--database-file', type=str, default='', help='Path to database file')
@click.option(
'--debug',
is_flag=True,
default=False,
help='Enable debug-only TUI diagnostics such as the log pane',
)
@click.option('-v', '--verbose', type=int, default=0, help='Set verbosity of output') @click.option('-v', '--verbose', type=int, default=0, help='Set verbosity of output')
@click.option("--dry-run", is_flag=True, default=False) @click.option("--dry-run", is_flag=True, default=False)
def ffx(ctx, app_language, database_file, verbose, dry_run): def ffx(ctx, app_language, database_file, debug, verbose, dry_run):
"""FFX""" """FFX"""
ctx.obj = {} ctx.obj = {}
@@ -274,6 +280,7 @@ def ffx(ctx, app_language, database_file, verbose, dry_run):
) )
set_current_language(resolvedLanguage) set_current_language(resolvedLanguage)
ctx.obj['language'] = resolvedLanguage ctx.obj['language'] = resolvedLanguage
ctx.obj['debug'] = bool(debug)
if ctx.invoked_subcommand in LIGHTWEIGHT_COMMANDS: if ctx.invoked_subcommand in LIGHTWEIGHT_COMMANDS:
ctx.obj['dry_run'] = dry_run ctx.obj['dry_run'] = dry_run
@@ -287,6 +294,7 @@ def ffx(ctx, app_language, database_file, verbose, dry_run):
ctx.obj['dry_run'] = dry_run ctx.obj['dry_run'] = dry_run
ctx.obj['verbosity'] = verbose ctx.obj['verbosity'] = verbose
ctx.obj['debug'] = bool(debug)
ctx.obj['language'] = resolve_application_language( ctx.obj['language'] = resolve_application_language(
cli_language=app_language, cli_language=app_language,
config_language=ctx.obj['config'].getLanguage(), config_language=ctx.obj['config'].getLanguage(),
@@ -391,6 +399,20 @@ def runScriptWrapper(ctx, scriptPath, missingDescription, commandArgs):
ctx.exit(completed.returncode) ctx.exit(completed.returncode)
def runTuiApp(ctx) -> None:
from ffx.ffx_app import FfxApp
from ffx.logging_utils import set_ffx_console_logging_enabled
logger = ctx.obj.get('logger')
set_ffx_console_logging_enabled(logger, enabled=False)
try:
app = FfxApp(ctx.obj)
app.run()
finally:
set_ffx_console_logging_enabled(logger, enabled=True)
@ffx.command(name='setup') @ffx.command(name='setup')
@click.pass_context @click.pass_context
@click.option('--check', is_flag=True, default=False, help='Only verify bundle-setup readiness') @click.option('--check', is_flag=True, default=False, help='Only verify bundle-setup readiness')
@@ -527,14 +549,11 @@ def inspect(ctx, shift, filenames):
if len(filenames) != 1: if len(filenames) != 1:
raise click.ClickException("Inspect without --shift requires exactly one filename.") raise click.ClickException("Inspect without --shift requires exactly one filename.")
from ffx.ffx_app import FfxApp
ctx.obj['command'] = 'inspect' ctx.obj['command'] = 'inspect'
ctx.obj['arguments'] = {} ctx.obj['arguments'] = {}
ctx.obj['arguments']['filename'] = filenames[0] ctx.obj['arguments']['filename'] = filenames[0]
app = FfxApp(ctx.obj) runTuiApp(ctx)
app.run()
@ffx.command() @ffx.command()
@@ -544,8 +563,6 @@ def edit(ctx, filename):
if not os.path.isfile(filename): if not os.path.isfile(filename):
raise click.ClickException(f"File not found: {filename}") raise click.ClickException(f"File not found: {filename}")
from ffx.ffx_app import FfxApp
ctx.obj['command'] = 'edit' ctx.obj['command'] = 'edit'
ctx.obj['arguments'] = {'filename': filename} ctx.obj['arguments'] = {'filename': filename}
ctx.obj['use_pattern'] = False ctx.obj['use_pattern'] = False
@@ -554,8 +571,7 @@ def edit(ctx, filename):
ctx.obj['apply_metadata_normalization'] = True ctx.obj['apply_metadata_normalization'] = True
ctx.obj['resource_limits'] = ctx.obj.get('resource_limits', {}) ctx.obj['resource_limits'] = ctx.obj.get('resource_limits', {})
app = FfxApp(ctx.obj) runTuiApp(ctx)
app.run()
@ffx.command() @ffx.command()
@@ -615,21 +631,24 @@ def rename(ctx, paths, prefix, season, suffix, dry_run):
def getUnmuxSequence(trackDescriptor: TrackDescriptor, sourcePath, targetPrefix, targetDirectory = ''): def getUnmuxSequence(trackDescriptor: TrackDescriptor, sourcePath, targetPrefix, targetDirectory = ''):
from ffx.track_codec import TrackCodec
from ffx.track_type import TrackType
# executable and input file # executable and input file
commandTokens = list(FFMPEG_COMMAND_TOKENS) + ['-i', sourcePath] commandTokens = list(FFMPEG_COMMAND_TOKENS) + ['-i', sourcePath]
trackType = trackDescriptor.getType() trackType = trackDescriptor.getType()
trackCodec = trackDescriptor.getCodec()
targetPathBase = os.path.join(targetDirectory, targetPrefix) if targetDirectory else targetPrefix targetPathBase = os.path.join(targetDirectory, targetPrefix) if targetDirectory else targetPrefix
# mapping # mapping
commandTokens += ['-map', commandTokens += ['-map', f"0:{trackType.indicator()}:{trackDescriptor.getSubIndex()}"]
f"0:{trackType.indicator()}:{trackDescriptor.getSubIndex()}",
'-c',
'copy']
trackCodec = trackDescriptor.getCodec() if trackType == TrackType.VIDEO and trackCodec == TrackCodec.H265:
commandTokens += ['-c:v', 'copy', '-bsf:v', 'hevc_mp4toannexb']
else:
commandTokens += ['-c', 'copy']
# output format # output format
codecFormat = trackCodec.format() codecFormat = trackCodec.format()
@@ -837,12 +856,8 @@ def cropdetect(ctx,
@click.pass_context @click.pass_context
def shows(ctx): def shows(ctx):
from ffx.ffx_app import FfxApp
ctx.obj['command'] = 'shows' ctx.obj['command'] = 'shows'
runTuiApp(ctx)
app = FfxApp(ctx.obj)
app.run()
def checkUniqueDispositions(context, mediaDescriptor: MediaDescriptor): def checkUniqueDispositions(context, mediaDescriptor: MediaDescriptor):

View File

@@ -3,6 +3,7 @@ from textual.screen import Screen
from textual.widgets import Button, Footer, Header, Static from textual.widgets import Button, Footer, Header, Static
from .i18n import t from .i18n import t
from .screen_support import build_screen_log_pane
class ConfirmScreen(Screen): class ConfirmScreen(Screen):
@@ -58,6 +59,7 @@ class ConfirmScreen(Screen):
yield Button(self.__confirmLabel, id="confirm_button") yield Button(self.__confirmLabel, id="confirm_button")
yield Button(self.__cancelLabel, id="cancel_button") yield Button(self.__cancelLabel, id="cancel_button")
yield build_screen_log_pane()
yield Footer() yield Footer()
def on_button_pressed(self, event: Button.Pressed) -> None: def on_button_pressed(self, event: Button.Pressed) -> None:

View File

@@ -1,4 +1,4 @@
VERSION='0.2.6' VERSION='0.3.0'
DATABASE_VERSION = 3 DATABASE_VERSION = 3
DEFAULT_QUALITY = 32 DEFAULT_QUALITY = 32

View File

@@ -4,6 +4,7 @@ from .i18n import set_current_language, t
from .shows_screen import ShowsScreen from .shows_screen import ShowsScreen
from .inspect_details_screen import InspectDetailsScreen from .inspect_details_screen import InspectDetailsScreen
from .media_edit_screen import MediaEditScreen from .media_edit_screen import MediaEditScreen
from .screen_support import configure_screen_log_handler, set_screen_log_pane_enabled
class FfxApp(App): class FfxApp(App):
@@ -22,6 +23,13 @@ class FfxApp(App):
# Data 'input' variable # Data 'input' variable
self.context = context self.context = context
set_current_language(self.context.get("language")) set_current_language(self.context.get("language"))
debug_mode = bool(self.context.get("debug", False))
set_screen_log_pane_enabled(debug_mode)
configure_screen_log_handler(
self.context.get("logger"),
self,
enabled=debug_mode,
)
def on_mount(self) -> None: def on_mount(self) -> None:

View File

@@ -3,7 +3,7 @@ from textual.screen import Screen
from textual.widgets import Footer, Placeholder from textual.widgets import Footer, Placeholder
from .i18n import t from .i18n import t
from .screen_support import go_back_or_exit from .screen_support import build_screen_log_pane, go_back_or_exit
class HelpScreen(Screen): class HelpScreen(Screen):
BINDINGS = [ BINDINGS = [
@@ -17,6 +17,7 @@ class HelpScreen(Screen):
def compose(self) -> ComposeResult: def compose(self) -> ComposeResult:
# Row 1 # Row 1
yield Placeholder(t("Help Screen")) yield Placeholder(t("Help Screen"))
yield build_screen_log_pane()
yield Footer() yield Footer()
def action_back(self): def action_back(self):

View File

@@ -6,12 +6,23 @@ from .configuration_controller import ConfigurationController
from .logging_utils import get_ffx_logger from .logging_utils import get_ffx_logger
from .show_descriptor import ShowDescriptor from .show_descriptor import ShowDescriptor
from enum import Enum
class EmptyStringUndefined(Undefined): class EmptyStringUndefined(Undefined):
def __str__(self): def __str__(self):
return '' return ''
class LogLevel(Enum):
DEBUG = 'debug'
INFO = 'info'
WARNING = 'warning'
ERROR = 'error'
CRITICAL = 'critical'
DIFF_ADDED_KEY = 'added' DIFF_ADDED_KEY = 'added'
DIFF_REMOVED_KEY = 'removed' DIFF_REMOVED_KEY = 'removed'
DIFF_CHANGED_KEY = 'changed' DIFF_CHANGED_KEY = 'changed'
@@ -119,7 +130,7 @@ def setDiff(a : set, b : set) -> set:
def permutateList(inputList: list, permutation: list): def permutateList(inputList: list, permutation: list):
# 0,1,2: ABC # 0,1,2: ABC
# 0,2,1: ACB # 0,2,1: ACBffmpeg:
# 1,2,0: BCA # 1,2,0: BCA
pass pass

View File

@@ -8,6 +8,7 @@ from textual.widgets import Button, Footer, Header, Input, Static
from textual.widgets._data_table import CellDoesNotExist from textual.widgets._data_table import CellDoesNotExist
from ffx.file_properties import FileProperties from ffx.file_properties import FileProperties
from ffx.helper import DIFF_ADDED_KEY, DIFF_CHANGED_KEY, DIFF_REMOVED_KEY
from ffx.media_descriptor_change_set import MediaDescriptorChangeSet from ffx.media_descriptor_change_set import MediaDescriptorChangeSet
from ffx.show_descriptor import ShowDescriptor from ffx.show_descriptor import ShowDescriptor
from ffx.track_descriptor import TrackDescriptor from ffx.track_descriptor import TrackDescriptor
@@ -18,6 +19,7 @@ from .pattern_details_screen import PatternDetailsScreen
from .screen_support import ( from .screen_support import (
add_auto_table_column, add_auto_table_column,
build_screen_controllers, build_screen_controllers,
build_screen_log_pane,
go_back_or_exit, go_back_or_exit,
localized_column_width, localized_column_width,
update_table_column_label, update_table_column_label,
@@ -190,6 +192,7 @@ class InspectDetailsScreen(MediaWorkflowScreenBase):
yield self.tracksTable yield self.tracksTable
yield Static(" ") yield Static(" ")
yield build_screen_log_pane()
yield Footer() yield Footer()
def _update_grid_layout(self) -> None: def _update_grid_layout(self) -> None:
@@ -205,6 +208,30 @@ class InspectDetailsScreen(MediaWorkflowScreenBase):
def action_back(self): def action_back(self):
go_back_or_exit(self) go_back_or_exit(self)
def getDisplayedMediaDescriptor(self):
if self._currentPattern is not None and self._targetMediaDescriptor is not None:
return self._targetMediaDescriptor
return self._sourceMediaDescriptor
def getTrackEditSourceDescriptor(self):
selectedTrackDescriptor = self.getSelectedTrackDescriptor()
if (
selectedTrackDescriptor is None
or self._currentPattern is None
or self._targetMediaDescriptor is None
):
return selectedTrackDescriptor
for sourceTrackDescriptor in self._sourceMediaDescriptor.getTrackDescriptors():
if (
sourceTrackDescriptor.getSourceIndex()
== selectedTrackDescriptor.getSourceIndex()
and sourceTrackDescriptor.getType() == selectedTrackDescriptor.getType()
):
return sourceTrackDescriptor
return None
def _build_shows_table(self): def _build_shows_table(self):
from textual.widgets import DataTable from textual.widgets import DataTable
@@ -476,8 +503,6 @@ class InspectDetailsScreen(MediaWorkflowScreenBase):
self.updateDifferences() self.updateDifferences()
return updated return updated
self.reloadProperties(reset_draft=True)
tagDifferences = self._mediaChangeSetObj.get(MediaDescriptorChangeSet.TAGS_KEY, {}) tagDifferences = self._mediaChangeSetObj.get(MediaDescriptorChangeSet.TAGS_KEY, {})
for addedTagKey in tagDifferences.get(DIFF_ADDED_KEY, {}).keys(): for addedTagKey in tagDifferences.get(DIFF_ADDED_KEY, {}).keys():
self._tac.deleteMediaTagByKey(self._currentPattern.getId(), addedTagKey) self._tac.deleteMediaTagByKey(self._currentPattern.getId(), addedTagKey)
@@ -564,9 +589,6 @@ class InspectDetailsScreen(MediaWorkflowScreenBase):
) )
def handle_edit_pattern(self, screenResult): def handle_edit_pattern(self, screenResult):
if not screenResult:
return
self.reloadProperties(reset_draft=True) self.reloadProperties(reset_draft=True)
if self._currentPattern is not None: if self._currentPattern is not None:
self.query_one("#pattern_input", Input).value = self._currentPattern.getPattern() self.query_one("#pattern_input", Input).value = self._currentPattern.getPattern()

View File

@@ -5,6 +5,7 @@ import os
FFX_LOGGER_NAME = "FFX" FFX_LOGGER_NAME = "FFX"
CONSOLE_HANDLER_NAME = "ffx-console" CONSOLE_HANDLER_NAME = "ffx-console"
FILE_HANDLER_NAME = "ffx-file" FILE_HANDLER_NAME = "ffx-file"
MUTED_CONSOLE_LEVEL = logging.CRITICAL + 1
def get_ffx_logger(name: str = FFX_LOGGER_NAME) -> logging.Logger: def get_ffx_logger(name: str = FFX_LOGGER_NAME) -> logging.Logger:
@@ -66,3 +67,31 @@ def configure_ffx_logger(
) )
return logger return logger
def set_ffx_console_logging_enabled(
logger: logging.Logger | None,
*,
enabled: bool,
):
if logger is None:
return None
console_handler = next(
(handler for handler in logger.handlers if handler.get_name() == CONSOLE_HANDLER_NAME),
None,
)
if console_handler is None:
return None
if enabled:
saved_level = getattr(console_handler, "_ffx_saved_level", None)
if saved_level is not None:
console_handler.setLevel(saved_level)
delattr(console_handler, "_ffx_saved_level")
return console_handler
if not hasattr(console_handler, "_ffx_saved_level"):
console_handler._ffx_saved_level = console_handler.level
console_handler.setLevel(MUTED_CONSOLE_LEVEL)
return console_handler

View File

@@ -203,7 +203,7 @@ class MediaDescriptorChangeSet():
if ( if (
self.__applyMetadataNormalization self.__applyMetadataNormalization
and trackDescriptor is not None and trackDescriptor is not None
and trackDescriptor.getType() == TrackType.SUBTITLE and trackDescriptor.getType() in (TrackType.VIDEO, TrackType.AUDIO, TrackType.SUBTITLE)
): ):
trackTitle = str(normalizedTrackTags.get("title", "")).strip() trackTitle = str(normalizedTrackTags.get("title", "")).strip()
fallbackTitle = str((fallbackTrackTags or {}).get("title", "")).strip() fallbackTitle = str((fallbackTrackTags or {}).get("title", "")).strip()
@@ -260,6 +260,8 @@ class MediaDescriptorChangeSet():
# else: # else:
# dispositionTokens += [f"-disposition:{streamIndicator}:{subIndex}", '0'] # dispositionTokens += [f"-disposition:{streamIndicator}:{subIndex}", '0']
for ttd in self.__targetTrackDescriptors: for ttd in self.__targetTrackDescriptors:
if ttd.getType() == TrackType.ATTACHMENT:
continue
targetDispositions = ttd.getDispositionSet() targetDispositions = ttd.getDispositionSet()
streamIndicator = ttd.getType().indicator() streamIndicator = ttd.getType().indicator()
@@ -344,7 +346,7 @@ class MediaDescriptorChangeSet():
for tagKey, tagValue in self.normalizeTrackTags( for tagKey, tagValue in self.normalizeTrackTags(
outputTrackTags, outputTrackTags,
trackDescriptor=trackDescriptor, trackDescriptor=trackDescriptor,
fallbackTrackTags=unchangedTrackTags | removedTrackTags, fallbackTrackTags=trackDescriptor.getTags(),
).items(): ).items():
metadataTokens += [f"-metadata:s:{trackDescriptor.getType().indicator()}" metadataTokens += [f"-metadata:s:{trackDescriptor.getType().indicator()}"
+ f":{trackDescriptor.getSubIndex()}", + f":{trackDescriptor.getSubIndex()}",
@@ -366,6 +368,7 @@ class MediaDescriptorChangeSet():
for tagKey, tagValue in self.normalizeTrackTags( for tagKey, tagValue in self.normalizeTrackTags(
preservedTrackTags, preservedTrackTags,
trackDescriptor=trackDescriptor, trackDescriptor=trackDescriptor,
fallbackTrackTags=trackDescriptor.getTags(),
).items(): ).items():
metadataTokens += [f"-metadata:s:{trackDescriptor.getType().indicator()}" metadataTokens += [f"-metadata:s:{trackDescriptor.getType().indicator()}"
+ f":{trackDescriptor.getSubIndex()}", + f":{trackDescriptor.getSubIndex()}",

View File

@@ -1,6 +1,9 @@
import os import os
from time import monotonic
from textual import events, work
from textual.containers import Grid from textual.containers import Grid
from textual.worker import Worker, WorkerState
from textual.widgets import Button, Footer, Header, Static from textual.widgets import Button, Footer, Header, Static
from ffx.metadata_editor import apply_metadata_edits from ffx.metadata_editor import apply_metadata_edits
@@ -9,11 +12,13 @@ from ffx.track_descriptor import TrackDescriptor
from .i18n import t from .i18n import t
from .confirm_screen import ConfirmScreen from .confirm_screen import ConfirmScreen
from .media_workflow_screen_base import MediaWorkflowScreenBase from .media_workflow_screen_base import MediaWorkflowScreenBase
from .screen_support import localized_column_width from .screen_support import build_screen_log_pane, localized_column_width
from .tag_delete_screen import TagDeleteScreen from .tag_delete_screen import TagDeleteScreen
from .tag_details_screen import TagDetailsScreen from .tag_details_screen import TagDetailsScreen
from .track_details_screen import TrackDetailsScreen from .track_details_screen import TrackDetailsScreen
from .helper import LogLevel
class MediaEditScreen(MediaWorkflowScreenBase): class MediaEditScreen(MediaWorkflowScreenBase):
@@ -169,6 +174,7 @@ class MediaEditScreen(MediaWorkflowScreenBase):
yield Button(t("Quit"), id="quit_button") yield Button(t("Quit"), id="quit_button")
yield Static(" ") yield Static(" ")
yield build_screen_log_pane()
yield Footer() yield Footer()
def on_mount(self): def on_mount(self):
@@ -177,6 +183,14 @@ class MediaEditScreen(MediaWorkflowScreenBase):
self.updateTracks() self.updateTracks()
self.updateDifferences() self.updateDifferences()
self.updateToggleButtons() self.updateToggleButtons()
self._applyChangesWorker = None
def on_screen_resume(self, _event: events.ScreenResume) -> None:
if not hasattr(self, "tracksTable"):
return
self.refreshAfterDraftChange()
self.updateToggleButtons()
def _update_grid_layout(self) -> None: def _update_grid_layout(self) -> None:
leftColumnWidth = max( leftColumnWidth = max(
@@ -195,6 +209,41 @@ class MediaEditScreen(MediaWorkflowScreenBase):
if self._messageText: if self._messageText:
self.notify(self._messageText) self.notify(self._messageText)
def workerLoggingHandler(self,
message: str,
level: LogLevel = LogLevel.INFO) -> None:
if level == LogLevel.DEBUG:
self.context["logger"].debug(str(message))
elif level == LogLevel.INFO:
self.context["logger"].info(str(message))
elif level == LogLevel.WARNING:
self.context["logger"].warning(str(message))
elif level == LogLevel.ERROR:
self.context["logger"].error(str(message))
elif level == LogLevel.CRITICAL:
self.context["logger"].critical(str(message))
else:
raise Exception(f"Undefined Logging Level (msg={message})")
def _report_apply_timings(self, applyResult: dict, reloadSeconds: float = 0.0) -> None:
timings = dict(applyResult.get("timings", {}))
ffmpegSeconds = float(timings.get("ffmpeg_seconds", 0.0))
replaceSeconds = float(timings.get("replace_seconds", 0.0))
writeSeconds = float(timings.get("write_seconds", ffmpegSeconds + replaceSeconds))
reloadSeconds = float(reloadSeconds)
totalSeconds = writeSeconds + reloadSeconds
timingSummary = (
f"ffx edit timings: ffmpeg={ffmpegSeconds:.2f}s "
+ f"replace={replaceSeconds:.2f}s "
+ f"reload={reloadSeconds:.2f}s "
+ f"total={totalSeconds:.2f}s"
)
self.context["logger"].info(timingSummary)
def updateToggleButtons(self): def updateToggleButtons(self):
self._set_toggle_button_state( self._set_toggle_button_state(
"#cleanup_toggle_button", "#cleanup_toggle_button",
@@ -296,6 +345,7 @@ class MediaEditScreen(MediaWorkflowScreenBase):
def action_toggle_normalization(self): def action_toggle_normalization(self):
self.setApplyNormalization(not self._applyNormalization) self.setApplyNormalization(not self._applyNormalization)
self.updateToggleButtons() self.updateToggleButtons()
self.updateTracks()
self.updateDifferences() self.updateDifferences()
self.setMessage( self.setMessage(
t("Normalization enabled.") t("Normalization enabled.")
@@ -323,30 +373,35 @@ class MediaEditScreen(MediaWorkflowScreenBase):
if trackDescriptor is None: if trackDescriptor is None:
return return
updatedTracks = [] nextSourceMediaDescriptor = self._sourceMediaDescriptor.clone(context=self.context)
updatedTracks = nextSourceMediaDescriptor.getTrackDescriptors()
replacementTrack = trackDescriptor.clone(context=self.context)
replaced = False replaced = False
for currentTrack in self._sourceMediaDescriptor.getTrackDescriptors():
if ( for trackIndex, currentTrack in enumerate(updatedTracks):
currentTrack.getIndex() == trackDescriptor.getIndex() sameSourceTrack = (
and currentTrack.getSubIndex() == trackDescriptor.getSubIndex() currentTrack.getSourceIndex() == replacementTrack.getSourceIndex()
): and currentTrack.getType() == replacementTrack.getType()
updatedTracks.append(trackDescriptor) )
sameVisibleTrack = (
currentTrack.getIndex() == replacementTrack.getIndex()
and currentTrack.getSubIndex() == replacementTrack.getSubIndex()
)
if sameSourceTrack or sameVisibleTrack:
updatedTracks[trackIndex] = replacementTrack
replaced = True replaced = True
else: break
updatedTracks.append(currentTrack)
if not replaced: if not replaced:
self.setMessage(t("Unable to update selected stream.")) self.setMessage(t("Unable to update selected stream."))
return return
self._sourceMediaDescriptor = self._sourceMediaDescriptor.clone(context=self.context) self._sourceMediaDescriptor = nextSourceMediaDescriptor
self._sourceMediaDescriptor.getTrackDescriptors().clear()
self._sourceMediaDescriptor.getTrackDescriptors().extend(updatedTracks)
self.setMessage( self.setMessage(
t( t(
"Updated stream #{index} ({track_type}).", "Updated stream #{index} ({track_type}).",
index=trackDescriptor.getIndex(), index=replacementTrack.getIndex(),
track_type=t(trackDescriptor.getType().label()), track_type=t(replacementTrack.getType().label()),
) )
) )
self.refreshAfterDraftChange() self.refreshAfterDraftChange()
@@ -356,33 +411,77 @@ class MediaEditScreen(MediaWorkflowScreenBase):
self.setMessage(t("No changes to apply.")) self.setMessage(t("No changes to apply."))
return return
try: if self._applyChangesWorker is not None and self._applyChangesWorker.is_running:
applyResult = apply_metadata_edits( self.setMessage(t("Apply already running."))
return
self.context["logger"].info(
t("Starting metadata apply for {filename}.", filename=self._mediaFilename)
)
self._applyChangesWorker = self.run_apply_changes_worker()
@work(
thread=True,
exclusive=True,
group="media-edit-apply",
exit_on_error=False,
)
def run_apply_changes_worker(self):
return apply_metadata_edits(
self.context, self.context,
self._mediaFilename, self._mediaFilename,
self._baselineMediaDescriptor, self._baselineMediaDescriptor,
self._sourceMediaDescriptor, self._sourceMediaDescriptor,
loggingHandler = self.workerLoggingHandler,
) )
except Exception as ex:
self.context["logger"].exception( def on_worker_state_changed(self, event: Worker.StateChanged) -> None:
"Failed to apply metadata edits for %s", if event.worker is not self._applyChangesWorker:
self._mediaFilename,
)
self.setMessage(t("Apply failed: {error}", error=ex))
return return
if event.state == WorkerState.ERROR:
error = event.worker.error
if error is not None:
self.context["logger"].error(
"Failed to apply metadata edits for %s",
self._mediaFilename,
exc_info=(type(error), error, error.__traceback__),
)
self.setMessage(t("Apply failed: {error}", error=error))
self._applyChangesWorker = None
return
if event.state != WorkerState.SUCCESS:
return
applyResult = event.worker.result or {}
if applyResult.get("dry_run", False): if applyResult.get("dry_run", False):
self._report_apply_timings(applyResult, reloadSeconds=0.0)
self.context["logger"].info(
t(
"Dry-run prepared temporary output {target_path}.",
target_path=applyResult["target_path"],
),
)
self.setMessage( self.setMessage(
t( t(
"Dry-run: would rewrite via temporary file {target_path}", "Dry-run: would rewrite via temporary file {target_path}",
target_path=applyResult["target_path"], target_path=applyResult["target_path"],
) )
) )
self._applyChangesWorker = None
return return
reloadStart = monotonic()
self.context["logger"].info(t("Reloading file after metadata write."))
self.reloadProperties(reset_draft=True) self.reloadProperties(reset_draft=True)
self.refreshAfterDraftChange() self.refreshAfterDraftChange()
reloadSeconds = monotonic() - reloadStart
self._report_apply_timings(applyResult, reloadSeconds=reloadSeconds)
self.context["logger"].info(t("Changes applied and file reloaded."))
self.setMessage(t("Changes applied and file reloaded.")) self.setMessage(t("Changes applied and file reloaded."))
self._applyChangesWorker = None
def action_revert_changes(self): def action_revert_changes(self):
if not self.hasPendingChanges(): if not self.hasPendingChanges():

View File

@@ -9,6 +9,8 @@ from textual.widgets._data_table import CellDoesNotExist
from ffx.audio_layout import AudioLayout from ffx.audio_layout import AudioLayout
from ffx.file_properties import FileProperties from ffx.file_properties import FileProperties
from ffx.helper import DIFF_ADDED_KEY, DIFF_CHANGED_KEY, DIFF_REMOVED_KEY from ffx.helper import DIFF_ADDED_KEY, DIFF_CHANGED_KEY, DIFF_REMOVED_KEY
from ffx.iso_language import IsoLanguage
from ffx.media_descriptor import MediaDescriptor
from ffx.media_descriptor_change_set import MediaDescriptorChangeSet from ffx.media_descriptor_change_set import MediaDescriptorChangeSet
from ffx.track_descriptor import TrackDescriptor from ffx.track_descriptor import TrackDescriptor
from ffx.track_disposition import TrackDisposition from ffx.track_disposition import TrackDisposition
@@ -170,10 +172,17 @@ class MediaWorkflowScreenBase(Screen):
def hasPendingChanges(self) -> bool: def hasPendingChanges(self) -> bool:
return bool(self._mediaChangeSetObj) return bool(self._mediaChangeSetObj)
def getDisplayedMediaDescriptor(self) -> MediaDescriptor | None:
return self._sourceMediaDescriptor
def getTrackEditSourceDescriptor(self) -> TrackDescriptor | None:
return self.getSelectedTrackDescriptor()
def updateMediaTags(self): def updateMediaTags(self):
displayedMediaDescriptor = self.getDisplayedMediaDescriptor()
self._sourceMediaTagRowData = populate_tag_table( self._sourceMediaTagRowData = populate_tag_table(
self.mediaTagsTable, self.mediaTagsTable,
self._sourceMediaDescriptor.getTags(), displayedMediaDescriptor.getTags() if displayedMediaDescriptor is not None else {},
ignore_keys=self._ignoreGlobalKeys, ignore_keys=self._ignoreGlobalKeys,
remove_keys=self._removeGlobalKeys, remove_keys=self._removeGlobalKeys,
) )
@@ -183,8 +192,14 @@ class MediaWorkflowScreenBase(Screen):
self._configure_tracks_table_columns() self._configure_tracks_table_columns()
self._trackRowData = {} self._trackRowData = {}
trackDescriptorList = self._sourceMediaDescriptor.getTrackDescriptors() displayedMediaDescriptor = self.getDisplayedMediaDescriptor()
trackDescriptorList = (
displayedMediaDescriptor.getTrackDescriptors()
if displayedMediaDescriptor is not None
else []
)
typeCounter = {} typeCounter = {}
applyNormalization = bool(getattr(self, "_applyNormalization", False))
for trackDescriptor in trackDescriptorList: for trackDescriptor in trackDescriptorList:
trackType = trackDescriptor.getType() trackType = trackDescriptor.getType()
@@ -193,6 +208,15 @@ class MediaWorkflowScreenBase(Screen):
dispositionSet = trackDescriptor.getDispositionSet() dispositionSet = trackDescriptor.getDispositionSet()
audioLayout = trackDescriptor.getAudioLayout() audioLayout = trackDescriptor.getAudioLayout()
trackTitle = trackDescriptor.getTitle()
if (
applyNormalization
and not str(trackTitle).strip()
and trackType in (TrackType.VIDEO, TrackType.AUDIO, TrackType.SUBTITLE)
):
trackLanguage = trackDescriptor.getLanguage()
if trackLanguage != IsoLanguage.UNDEFINED:
trackTitle = trackLanguage.label()
row = ( row = (
trackDescriptor.getIndex(), trackDescriptor.getIndex(),
t(trackType.label()), t(trackType.label()),
@@ -203,7 +227,7 @@ class MediaWorkflowScreenBase(Screen):
and audioLayout != AudioLayout.LAYOUT_UNDEFINED and audioLayout != AudioLayout.LAYOUT_UNDEFINED
else " ", else " ",
trackDescriptor.getLanguage().label(), trackDescriptor.getLanguage().label(),
trackDescriptor.getTitle(), trackTitle,
t("Yes") if TrackDisposition.DEFAULT in dispositionSet else t("No"), t("Yes") if TrackDisposition.DEFAULT in dispositionSet else t("No"),
t("Yes") if TrackDisposition.FORCED in dispositionSet else t("No"), t("Yes") if TrackDisposition.FORCED in dispositionSet else t("No"),
) )
@@ -355,7 +379,7 @@ class MediaWorkflowScreenBase(Screen):
return None return None
def setSelectedTrackDefault(self): def setSelectedTrackDefault(self):
selectedTrackDescriptor = self.getSelectedTrackDescriptor() selectedTrackDescriptor = self.getTrackEditSourceDescriptor()
if selectedTrackDescriptor is None: if selectedTrackDescriptor is None:
return False return False
@@ -366,7 +390,7 @@ class MediaWorkflowScreenBase(Screen):
return True return True
def setSelectedTrackForced(self): def setSelectedTrackForced(self):
selectedTrackDescriptor = self.getSelectedTrackDescriptor() selectedTrackDescriptor = self.getTrackEditSourceDescriptor()
if selectedTrackDescriptor is None: if selectedTrackDescriptor is None:
return False return False

View File

@@ -1,17 +1,23 @@
from __future__ import annotations from __future__ import annotations
import click
import os import os
import tempfile import tempfile
from time import monotonic
from .constants import ( from .constants import (
DEFAULT_AC3_BANDWIDTH, DEFAULT_AC3_BANDWIDTH,
DEFAULT_DTS_BANDWIDTH, DEFAULT_DTS_BANDWIDTH,
DEFAULT_STEREO_BANDWIDTH, DEFAULT_STEREO_BANDWIDTH,
FFMPEG_COMMAND_TOKENS,
) )
from .ffx_controller import FfxController
from .media_descriptor import MediaDescriptor from .media_descriptor import MediaDescriptor
from .media_descriptor_change_set import MediaDescriptorChangeSet
from .process import executeProcess, formatCommandSequence
from .video_encoder import VideoEncoder from .video_encoder import VideoEncoder
from .helper import LogLevel
def create_temporary_output_path(source_path: str) -> str: def create_temporary_output_path(source_path: str) -> str:
sourceDirectory = os.path.dirname(os.path.abspath(source_path)) or "." sourceDirectory = os.path.dirname(os.path.abspath(source_path)) or "."
@@ -49,41 +55,123 @@ def build_metadata_edit_context(context: dict) -> dict:
return editContext return editContext
def build_metadata_edit_command(
context: dict,
source_path: str,
target_path: str,
baseline_descriptor: MediaDescriptor,
draft_descriptor: MediaDescriptor,
) -> list[str]:
changeSet = MediaDescriptorChangeSet(context, draft_descriptor, baseline_descriptor)
return (
list(FFMPEG_COMMAND_TOKENS)
+ ["-i", source_path, "-map", "0", "-c", "copy"]
+ changeSet.generateMetadataTokens()
+ changeSet.generateDispositionTokens()
+ [target_path]
)
def notify_ffmpeg_invocation(
context: dict,
command_sequence: list[str],
*,
loggingHandler = None,
dry_run: bool = False,
) -> None:
loggingCallback = loggingHandler or context.get("logging_handler")
if not callable(loggingCallback):
return
verbosity = int(context.get("verbosity", 0) or 0)
if verbosity > 0:
if dry_run:
loggingCallback(f"ffmpeg dry-run: {formatCommandSequence(command_sequence)}", level = LogLevel.DEBUG)
else:
loggingCallback(f"ffmpeg: {formatCommandSequence(command_sequence)}", level = LogLevel.DEBUG)
return
loggingCallback("ffmpeg dry-run prepared.") if dry_run else loggingCallback(
"ffmpeg metadata write started."
)
def apply_metadata_edits( def apply_metadata_edits(
context: dict, context: dict,
source_path: str, source_path: str,
baseline_descriptor: MediaDescriptor, baseline_descriptor: MediaDescriptor,
draft_descriptor: MediaDescriptor, draft_descriptor: MediaDescriptor,
*,
loggingHandler = None,
) -> dict[str, object]: ) -> dict[str, object]:
temporaryOutputPath = create_temporary_output_path(source_path)
editContext = build_metadata_edit_context(context)
controller = FfxController(editContext, draft_descriptor, baseline_descriptor)
try: temporaryOutputPath = create_temporary_output_path(source_path)
controller.runJob(
editContext = build_metadata_edit_context(context)
commandSequence = build_metadata_edit_command(
editContext,
source_path, source_path,
temporaryOutputPath, temporaryOutputPath,
targetFormat="", baseline_descriptor,
chainIteration=[], draft_descriptor,
) )
ffmpegSeconds = 0.0
replaceSeconds = 0.0
try:
if editContext.get("dry_run", False): if editContext.get("dry_run", False):
notify_ffmpeg_invocation(
editContext,
commandSequence,
loggingHandler = loggingHandler,
dry_run=True,
)
return { return {
"applied": False, "applied": False,
"dry_run": True, "dry_run": True,
"target_path": temporaryOutputPath, "target_path": temporaryOutputPath,
"command_sequence": commandSequence,
"timings": {
"ffmpeg_seconds": ffmpegSeconds,
"replace_seconds": replaceSeconds,
"write_seconds": ffmpegSeconds + replaceSeconds,
},
} }
notify_ffmpeg_invocation(editContext,
commandSequence,
loggingHandler = loggingHandler)
ffmpegStart = monotonic()
_out, err, rc = executeProcess(commandSequence, context=editContext)
ffmpegSeconds = monotonic() - ffmpegStart
if rc:
raise click.ClickException(f"ffmpeg edit failed: rc={rc} error={err}")
replaceStart = monotonic()
os.replace(temporaryOutputPath, source_path) os.replace(temporaryOutputPath, source_path)
replaceSeconds = monotonic() - replaceStart
return { return {
"applied": True, "applied": True,
"dry_run": False, "dry_run": False,
"target_path": source_path, "target_path": source_path,
"command_sequence": commandSequence,
"timings": {
"ffmpeg_seconds": ffmpegSeconds,
"replace_seconds": replaceSeconds,
"write_seconds": ffmpegSeconds + replaceSeconds,
},
} }
except Exception: except Exception:
if os.path.exists(temporaryOutputPath): if os.path.exists(temporaryOutputPath):
os.remove(temporaryOutputPath) os.remove(temporaryOutputPath)
raise raise
finally:
if editContext.get("dry_run", False) and os.path.exists(temporaryOutputPath):
os.remove(temporaryOutputPath)

View File

@@ -7,7 +7,7 @@ from textual.containers import Grid
from .i18n import t from .i18n import t
from .show_controller import ShowController from .show_controller import ShowController
from .pattern_controller import PatternController from .pattern_controller import PatternController
from .screen_support import go_back_or_exit from .screen_support import build_screen_log_pane, go_back_or_exit
from ffx.model.pattern import Pattern from ffx.model.pattern import Pattern
@@ -103,6 +103,7 @@ class PatternDeleteScreen(Screen):
yield Button(t("Delete"), id="delete_button") yield Button(t("Delete"), id="delete_button")
yield Button(t("Cancel"), id="cancel_button") yield Button(t("Cancel"), id="cancel_button")
yield build_screen_log_pane()
yield Footer() yield Footer()

View File

@@ -1,6 +1,7 @@
import click, re import click, re
from typing import List from typing import List
from textual import events
from textual.screen import Screen from textual.screen import Screen
from textual.widgets import Header, Footer, Static, Button, Input, DataTable, TextArea from textual.widgets import Header, Footer, Static, Button, Input, DataTable, TextArea
from textual.containers import Grid from textual.containers import Grid
@@ -18,6 +19,7 @@ from .screen_support import (
add_auto_table_column, add_auto_table_column,
build_screen_bootstrap, build_screen_bootstrap,
build_screen_controllers, build_screen_controllers,
build_screen_log_pane,
go_back_or_exit, go_back_or_exit,
populate_tag_table, populate_tag_table,
) )
@@ -341,6 +343,16 @@ class PatternDetailsScreen(Screen):
self.updateTracks() self.updateTracks()
self.updateShiftedSeasons() self.updateShiftedSeasons()
def on_screen_resume(self, _event: events.ScreenResume) -> None:
if not hasattr(self, "tracksTable") or not hasattr(self, "tagsTable"):
return
self.updateTags()
self.updateTracks()
if self.__pattern is not None and hasattr(self, "shiftedSeasonsTable"):
self.updateShiftedSeasons()
def compose(self): def compose(self):
@@ -482,6 +494,7 @@ class PatternDetailsScreen(Screen):
# Row 20 # Row 20
yield Static(" ", classes="seven") yield Static(" ", classes="seven")
yield build_screen_log_pane()
yield Footer() yield Footer()

View File

@@ -1,13 +1,18 @@
from __future__ import annotations from __future__ import annotations
import logging
import weakref
from collections.abc import Mapping from collections.abc import Mapping
from dataclasses import dataclass from dataclasses import dataclass
from rich.cells import cell_len from rich.cells import cell_len
from rich.measure import measure_renderables from rich.measure import measure_renderables
from rich.text import Text from rich.text import Text
from textual import events
from textual.widgets import Collapsible, RichLog, Static
from .helper import formatRichColor from .helper import formatRichColor
from .i18n import t
from .pattern_controller import PatternController from .pattern_controller import PatternController
from .show_controller import ShowController from .show_controller import ShowController
from .shifted_season_controller import ShiftedSeasonController from .shifted_season_controller import ShiftedSeasonController
@@ -16,6 +21,156 @@ from .tmdb_controller import TmdbController
from .track_controller import TrackController from .track_controller import TrackController
SCREEN_LOG_PANE_ID = "screen_log_pane"
SCREEN_LOG_VIEW_ID = "screen_log_view"
SCREEN_LOG_RESIZE_HANDLE_ID = "screen_log_resize_handle"
SCREEN_LOG_HANDLER_NAME = "ffx-screen-log"
SCREEN_LOG_DEFAULT_HEIGHT = 8
SCREEN_LOG_MIN_HEIGHT = 4
SCREEN_LOG_COMPONENT_WIDTH = 16
SCREEN_LOG_LEVEL_WIDTH = 8
_SCREEN_LOG_PANE_ENABLED = False
class ScreenLogHandler(logging.Handler):
"""Mirror logger output into the active screen log pane when available."""
def __init__(self, app) -> None:
super().__init__(level=logging.DEBUG)
self.set_name(SCREEN_LOG_HANDLER_NAME)
self.set_app(app)
def set_app(self, app) -> None:
self._app_ref = weakref.ref(app) if app is not None else lambda: None
def emit(self, record: logging.LogRecord) -> None:
app = self._app_ref()
if app is None:
return
try:
message = str(self.format(record)).strip()
except Exception:
self.handleError(record)
return
if not message:
return
try:
app.call_from_thread(write_screen_log, app.screen, message)
except RuntimeError:
write_screen_log(app.screen, message)
except Exception:
self.handleError(record)
class ScreenLogResizeHandle(Static):
DEFAULT_CSS = """
ScreenLogResizeHandle {
width: 100%;
height: 1;
content-align: center middle;
color: $text-muted;
background: $panel-lighten-1;
}
ScreenLogResizeHandle:hover {
color: $text;
background: $panel-lighten-2;
}
"""
def __init__(self) -> None:
super().__init__(" drag to resize ", id=SCREEN_LOG_RESIZE_HANDLE_ID)
self._drag_active = False
self._drag_origin_screen_y = 0
self._drag_origin_height = SCREEN_LOG_DEFAULT_HEIGHT
def _get_log_pane(self):
return self.parent.parent if self.parent is not None else None
def on_mouse_down(self, event: events.MouseDown) -> None:
if event.button != 1:
return
log_pane = self._get_log_pane()
if log_pane is None:
return
self._drag_active = True
self._drag_origin_screen_y = event.screen_y
self._drag_origin_height = log_pane.get_log_height()
self.capture_mouse()
event.stop()
def on_mouse_move(self, event: events.MouseMove) -> None:
if not self._drag_active:
return
log_pane = self._get_log_pane()
if log_pane is None:
return
next_height = self._drag_origin_height + (
self._drag_origin_screen_y - event.screen_y
)
log_pane.set_log_height(next_height)
event.stop()
def on_mouse_up(self, event: events.MouseUp) -> None:
if not self._drag_active:
return
self._drag_active = False
self.release_mouse()
event.stop()
class ResizableScreenLogPane(Collapsible):
def __init__(self) -> None:
self._log_view = RichLog(
id=SCREEN_LOG_VIEW_ID,
wrap=True,
markup=False,
highlight=False,
auto_scroll=True,
)
self._log_height = SCREEN_LOG_DEFAULT_HEIGHT
self._apply_log_height()
super().__init__(
ScreenLogResizeHandle(),
self._log_view,
title=t("Log"),
collapsed=True,
id=SCREEN_LOG_PANE_ID,
)
self.styles.width = "100%"
def _apply_log_height(self) -> None:
self._log_view.styles.height = self._log_height
self._log_view.styles.width = "100%"
def get_log_height(self) -> int:
return int(self._log_height)
def set_log_height(self, height: int) -> None:
next_height = max(SCREEN_LOG_MIN_HEIGHT, int(height))
try:
available_height = int(self.app.size.height) - 8
except Exception:
available_height = next_height
if available_height > 0:
next_height = min(next_height, available_height)
self._log_height = next_height
self._apply_log_height()
@dataclass(frozen=True) @dataclass(frozen=True)
class ScreenBootstrap: class ScreenBootstrap:
context: dict context: dict
@@ -46,6 +201,48 @@ def build_screen_bootstrap(context: dict) -> ScreenBootstrap:
) )
def set_screen_log_pane_enabled(enabled: bool) -> None:
global _SCREEN_LOG_PANE_ENABLED
_SCREEN_LOG_PANE_ENABLED = bool(enabled)
def is_screen_log_pane_enabled() -> bool:
return bool(_SCREEN_LOG_PANE_ENABLED)
def configure_screen_log_handler(logger, app, *, enabled: bool):
if logger is None:
return None
screen_log_handler = next(
(handler for handler in logger.handlers if handler.get_name() == SCREEN_LOG_HANDLER_NAME),
None,
)
if not enabled:
if screen_log_handler is not None:
logger.removeHandler(screen_log_handler)
screen_log_handler.close()
return None
if screen_log_handler is None:
screen_log_handler = ScreenLogHandler(app)
logger.addHandler(screen_log_handler)
elif isinstance(screen_log_handler, ScreenLogHandler):
screen_log_handler.set_app(app)
screen_log_handler.setLevel(logging.DEBUG)
screen_log_handler.setFormatter(
logging.Formatter(
f"%(name)-{SCREEN_LOG_COMPONENT_WIDTH}s "
+ f"%(levelname)-{SCREEN_LOG_LEVEL_WIDTH}s "
+ "%(asctime)s | %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
)
return screen_log_handler
def build_screen_controllers( def build_screen_controllers(
context: dict, context: dict,
*, *,
@@ -143,6 +340,48 @@ def update_table_column_label(table, column_key, label) -> None:
table.refresh() table.refresh()
def build_screen_log_pane() -> ResizableScreenLogPane | Static:
"""Create a shared collapsible log pane for screen-local diagnostics."""
if not is_screen_log_pane_enabled():
hidden = Static("", id=f"{SCREEN_LOG_PANE_ID}_disabled")
hidden.display = False
return hidden
return ResizableScreenLogPane()
def toggle_screen_log_pane(screen) -> bool:
"""Toggle the current screen log pane when present."""
try:
logPane = screen.query_one(f"#{SCREEN_LOG_PANE_ID}", Collapsible)
except Exception:
return False
logPane.collapsed = not bool(logPane.collapsed)
return True
def write_screen_log(screen, message: str) -> bool:
"""Append a line to the current screen log pane when present."""
if message is None:
return False
text = str(message).strip()
if not text:
return False
try:
logView = screen.query_one(f"#{SCREEN_LOG_VIEW_ID}", RichLog)
except Exception:
return False
logView.write(text)
return True
def go_back_or_exit(screen) -> None: def go_back_or_exit(screen) -> None:
"""Pop the current screen when possible, otherwise exit the app.""" """Pop the current screen when possible, otherwise exit the app."""

View File

@@ -3,7 +3,7 @@ from textual.screen import Screen
from textual.widgets import Footer, Placeholder from textual.widgets import Footer, Placeholder
from .i18n import t from .i18n import t
from .screen_support import go_back_or_exit from .screen_support import build_screen_log_pane, go_back_or_exit
class SettingsScreen(Screen): class SettingsScreen(Screen):
@@ -17,6 +17,7 @@ class SettingsScreen(Screen):
def compose(self) -> ComposeResult: def compose(self) -> ComposeResult:
# Row 1 # Row 1
yield Placeholder(t("Settings Screen")) yield Placeholder(t("Settings Screen"))
yield build_screen_log_pane()
yield Footer() yield Footer()
def action_back(self): def action_back(self):

View File

@@ -6,7 +6,7 @@ from textual.containers import Grid
from .i18n import t from .i18n import t
from .shifted_season_controller import ShiftedSeasonController from .shifted_season_controller import ShiftedSeasonController
from .screen_support import go_back_or_exit from .screen_support import build_screen_log_pane, go_back_or_exit
from ffx.model.shifted_season import ShiftedSeason from ffx.model.shifted_season import ShiftedSeason
@@ -127,6 +127,7 @@ class ShiftedSeasonDeleteScreen(Screen):
yield Button(t("Delete"), id="delete_button") yield Button(t("Delete"), id="delete_button")
yield Button(t("Cancel"), id="cancel_button") yield Button(t("Cancel"), id="cancel_button")
yield build_screen_log_pane()
yield Footer() yield Footer()

View File

@@ -6,7 +6,7 @@ from textual.containers import Grid
from .i18n import t from .i18n import t
from .shifted_season_controller import ShiftedSeasonController from .shifted_season_controller import ShiftedSeasonController
from .screen_support import go_back_or_exit from .screen_support import build_screen_log_pane, go_back_or_exit
from ffx.model.shifted_season import ShiftedSeason from ffx.model.shifted_season import ShiftedSeason
@@ -175,6 +175,7 @@ class ShiftedSeasonDetailsScreen(Screen):
# Row 10 # Row 10
yield Static(" ", classes="three") yield Static(" ", classes="three")
yield build_screen_log_pane()
yield Footer() yield Footer()

View File

@@ -4,7 +4,7 @@ from textual.containers import Grid
from .i18n import t from .i18n import t
from .show_controller import ShowController from .show_controller import ShowController
from .screen_support import go_back_or_exit from .screen_support import build_screen_log_pane, go_back_or_exit
# Screen[dict[int, str, int]] # Screen[dict[int, str, int]]
class ShowDeleteScreen(Screen): class ShowDeleteScreen(Screen):
@@ -89,6 +89,7 @@ class ShowDeleteScreen(Screen):
yield Button(t("Cancel"), id="cancel_button") yield Button(t("Cancel"), id="cancel_button")
yield build_screen_log_pane()
yield Footer() yield Footer()

View File

@@ -21,6 +21,7 @@ from .screen_support import (
add_auto_table_column, add_auto_table_column,
build_screen_bootstrap, build_screen_bootstrap,
build_screen_controllers, build_screen_controllers,
build_screen_log_pane,
go_back_or_exit, go_back_or_exit,
) )
@@ -433,6 +434,7 @@ class ShowDetailsScreen(Screen):
yield Button(t("Cancel"), id="cancel_button") yield Button(t("Cancel"), id="cancel_button")
yield build_screen_log_pane()
yield Footer() yield Footer()

View File

@@ -5,7 +5,12 @@ from rich.text import Text
from .i18n import t from .i18n import t
from .show_controller import ShowController from .show_controller import ShowController
from .screen_support import add_auto_table_column, go_back_or_exit, update_table_column_label from .screen_support import (
add_auto_table_column,
build_screen_log_pane,
go_back_or_exit,
update_table_column_label,
)
from .show_details_screen import ShowDetailsScreen from .show_details_screen import ShowDetailsScreen
from .show_delete_screen import ShowDeleteScreen from .show_delete_screen import ShowDeleteScreen
@@ -278,4 +283,5 @@ class ShowsScreen(Screen):
f = Footer() f = Footer()
f.description = "yolo" f.description = "yolo"
yield build_screen_log_pane()
yield f yield f

View File

@@ -3,7 +3,7 @@ from textual.widgets import Header, Footer, Static, Button
from textual.containers import Grid from textual.containers import Grid
from .i18n import t from .i18n import t
from .screen_support import go_back_or_exit from .screen_support import build_screen_log_pane, go_back_or_exit
# Screen[dict[int, str, int]] # Screen[dict[int, str, int]]
@@ -92,6 +92,7 @@ class TagDeleteScreen(Screen):
yield Button(t("Delete"), id="delete_button") yield Button(t("Delete"), id="delete_button")
yield Button(t("Cancel"), id="cancel_button") yield Button(t("Cancel"), id="cancel_button")
yield build_screen_log_pane()
yield Footer() yield Footer()

View File

@@ -3,7 +3,7 @@ from textual.widgets import Header, Footer, Static, Button, Input
from textual.containers import Grid from textual.containers import Grid
from .i18n import t from .i18n import t
from .screen_support import go_back_or_exit from .screen_support import build_screen_log_pane, go_back_or_exit
# Screen[dict[int, str, int]] # Screen[dict[int, str, int]]
@@ -121,6 +121,7 @@ class TagDetailsScreen(Screen):
# Row 6 # Row 6
yield Static(" ", classes="five", id="messagestatic") yield Static(" ", classes="five", id="messagestatic")
yield build_screen_log_pane()
yield Footer(id="footer") yield Footer(id="footer")

View File

@@ -3,17 +3,20 @@ from enum import Enum
class TrackCodec(Enum): class TrackCodec(Enum):
VP9 = {'identifier': 'vp9', 'format': 'ivf', 'extension': 'ivf' , 'label': 'VP9'}
H265 = {'identifier': 'hevc', 'format': 'h265', 'extension': 'h265' ,'label': 'H.265'} H265 = {'identifier': 'hevc', 'format': 'h265', 'extension': 'h265' ,'label': 'H.265'}
H264 = {'identifier': 'h264', 'format': 'h264', 'extension': 'h264' ,'label': 'H.264'} H264 = {'identifier': 'h264', 'format': 'h264', 'extension': 'h264' ,'label': 'H.264'}
MPEG4 = {'identifier': 'mpeg4', 'format': 'm4v', 'extension': 'm4v' ,'label': 'MPEG-4'} MPEG4 = {'identifier': 'mpeg4', 'format': 'm4v', 'extension': 'm4v' ,'label': 'MPEG-4'}
MPEG2 = {'identifier': 'mpeg2video', 'format': 'mpeg2video', 'extension': 'mpg' ,'label': 'MPEG-2'} MPEG2 = {'identifier': 'mpeg2video', 'format': 'mpeg2video', 'extension': 'mpg' ,'label': 'MPEG-2'}
OPUS = {'identifier': 'opus', 'format': 'opus', 'extension': 'opus' , 'label': 'Opus'}
AAC = {'identifier': 'aac', 'format': None, 'extension': 'aac' , 'label': 'AAC'} AAC = {'identifier': 'aac', 'format': None, 'extension': 'aac' , 'label': 'AAC'}
AC3 = {'identifier': 'ac3', 'format': 'ac3', 'extension': 'ac3' , 'label': 'AC3'} AC3 = {'identifier': 'ac3', 'format': 'ac3', 'extension': 'ac3' , 'label': 'AC3'}
EAC3 = {'identifier': 'eac3', 'format': 'eac3', 'extension': 'eac3' , 'label': 'EAC3'} EAC3 = {'identifier': 'eac3', 'format': 'eac3', 'extension': 'eac3' , 'label': 'EAC3'}
DTS = {'identifier': 'dts', 'format': 'dts', 'extension': 'dts' , 'label': 'DTS'} DTS = {'identifier': 'dts', 'format': 'dts', 'extension': 'dts' , 'label': 'DTS'}
MP3 = {'identifier': 'mp3', 'format': 'mp3', 'extension': 'mp3' , 'label': 'MP3'} MP3 = {'identifier': 'mp3', 'format': 'mp3', 'extension': 'mp3' , 'label': 'MP3'}
WEBVTT = {'identifier': 'webvtt', 'format': 'webvtt', 'extension': 'vtt' , 'label': 'WebVTT'}
SRT = {'identifier': 'subrip', 'format': 'srt', 'extension': 'srt' , 'label': 'SRT'} SRT = {'identifier': 'subrip', 'format': 'srt', 'extension': 'srt' , 'label': 'SRT'}
ASS = {'identifier': 'ass', 'format': 'ass', 'extension': 'ass' , 'label': 'ASS'} ASS = {'identifier': 'ass', 'format': 'ass', 'extension': 'ass' , 'label': 'ASS'}
TTF = {'identifier': 'ttf', 'format': None, 'extension': 'ttf' , 'label': 'TTF'} TTF = {'identifier': 'ttf', 'format': None, 'extension': 'ttf' , 'label': 'TTF'}

View File

@@ -6,7 +6,7 @@ from textual.containers import Grid
from ffx.track_descriptor import TrackDescriptor from ffx.track_descriptor import TrackDescriptor
from .i18n import t from .i18n import t
from .screen_support import go_back_or_exit from .screen_support import build_screen_log_pane, go_back_or_exit
# Screen[dict[int, str, int]] # Screen[dict[int, str, int]]
@@ -118,6 +118,7 @@ class TrackDeleteScreen(Screen):
yield Button(t("Delete"), id="delete_button") yield Button(t("Delete"), id="delete_button")
yield Button(t("Cancel"), id="cancel_button") yield Button(t("Cancel"), id="cancel_button")
yield build_screen_log_pane()
yield Footer() yield Footer()

View File

@@ -14,7 +14,13 @@ from .track_descriptor import TrackDescriptor
from .track_disposition import TrackDisposition from .track_disposition import TrackDisposition
from .track_type import TrackType from .track_type import TrackType
from .i18n import t from .i18n import t
from .screen_support import add_auto_table_column, build_screen_bootstrap, go_back_or_exit, populate_tag_table from .screen_support import (
add_auto_table_column,
build_screen_bootstrap,
build_screen_log_pane,
go_back_or_exit,
populate_tag_table,
)
class TrackDetailsScreen(Screen): class TrackDetailsScreen(Screen):
@@ -128,6 +134,9 @@ class TrackDetailsScreen(Screen):
self.__patternLabel = str(patternLabel) self.__patternLabel = str(patternLabel)
self.__siblingTrackDescriptors = list(siblingTrackDescriptors or []) self.__siblingTrackDescriptors = list(siblingTrackDescriptors or [])
self.__metadataOnly = bool(metadata_only) self.__metadataOnly = bool(metadata_only)
self.__applyNormalization = bool(
self.context.get("apply_metadata_normalization", True)
)
if self.__isNew: if self.__isNew:
self.__trackType = trackType self.__trackType = trackType
@@ -152,8 +161,13 @@ class TrackDetailsScreen(Screen):
initial_language = trackDescriptor.getLanguage() initial_language = trackDescriptor.getLanguage()
initial_title = trackDescriptor.getTitle() initial_title = trackDescriptor.getTitle()
self.__titleAutoManaged = ( initialTitleEmpty = not str(initial_title).strip()
initial_language == IsoLanguage.UNDEFINED and not str(initial_title).strip() self.__titleAutoManaged = bool(
initialTitleEmpty
and (
initial_language == IsoLanguage.UNDEFINED
or (self.__metadataOnly and self.__applyNormalization)
)
) )
self.__suppressTitleChanged = False self.__suppressTitleChanged = False
self.__lastAutoTitle = "" self.__lastAutoTitle = ""
@@ -256,6 +270,8 @@ class TrackDetailsScreen(Screen):
self.__trackDescriptor.getLanguage() self.__trackDescriptor.getLanguage()
) )
self.query_one("#title_input", Input).value = self.__trackDescriptor.getTitle() self.query_one("#title_input", Input).value = self.__trackDescriptor.getTitle()
if self.__titleAutoManaged and not self.__trackDescriptor.getTitle().strip():
self._apply_auto_title_for_language(self.__trackDescriptor.getLanguage())
self.updateTags() self.updateTags()
if self.__metadataOnly: if self.__metadataOnly:
@@ -387,6 +403,7 @@ class TrackDetailsScreen(Screen):
# Row 24 # Row 24
yield Static(" ", classes="five", id="messagestatic") yield Static(" ", classes="five", id="messagestatic")
yield build_screen_log_pane()
yield Footer(id="footer") yield Footer(id="footer")
def getTrackDescriptorFromInput(self): def getTrackDescriptorFromInput(self):

View File

@@ -168,6 +168,40 @@ class CliLazyImportTests(unittest.TestCase):
result["modules"], result["modules"],
) )
def test_root_debug_flag_parses_without_loading_runtime_modules(self):
result = self.run_python(
textwrap.dedent(
f"""
import json
import sys
sys.path.insert(0, {str(SRC_ROOT)!r})
import ffx.cli
context = ffx.cli.ffx.make_context(
"ffx",
["--debug", "help"],
resilient_parsing=True,
)
print(json.dumps({{
"debug": context.params["debug"],
"modules": {{
module_name: module_name in sys.modules
for module_name in {HEAVY_MODULES!r}
}},
}}))
"""
)
)
self.assertTrue(result["debug"])
self.assertTrue(
all(not is_loaded for is_loaded in result["modules"].values()),
result["modules"],
)
def test_convert_cut_option_supports_flag_duration_and_start_duration_forms(self): def test_convert_cut_option_supports_flag_duration_and_start_duration_forms(self):
result = self.run_python( result = self.run_python(
textwrap.dedent( textwrap.dedent(

View File

@@ -0,0 +1,91 @@
from __future__ import annotations
from pathlib import Path
import sys
import unittest
SRC_ROOT = Path(__file__).resolve().parents[2] / "src"
if str(SRC_ROOT) not in sys.path:
sys.path.insert(0, str(SRC_ROOT))
from ffx import cli # noqa: E402
from ffx.track_codec import TrackCodec # noqa: E402
from ffx.track_descriptor import TrackDescriptor # noqa: E402
from ffx.track_type import TrackType # noqa: E402
class UnmuxSequenceTests(unittest.TestCase):
def test_h265_video_unmux_uses_annex_b_bitstream_filter(self):
track_descriptor = TrackDescriptor(
index=0,
sub_index=0,
track_type=TrackType.VIDEO,
codec_name=TrackCodec.H265,
tags={},
disposition_set=set(),
)
sequence = cli.getUnmuxSequence(
track_descriptor,
"input.mp4",
"episode_0_eng",
)
self.assertEqual(
[
"ffmpeg",
"-y",
"-i",
"input.mp4",
"-map",
"0:v:0",
"-c:v",
"copy",
"-bsf:v",
"hevc_mp4toannexb",
"-f",
"h265",
"episode_0_eng.h265",
],
sequence,
)
def test_non_h265_unmux_keeps_generic_copy_behavior(self):
track_descriptor = TrackDescriptor(
index=1,
sub_index=0,
track_type=TrackType.SUBTITLE,
codec_name=TrackCodec.SRT,
tags={},
disposition_set=set(),
)
sequence = cli.getUnmuxSequence(
track_descriptor,
"input.mkv",
"episode_1_eng",
)
self.assertEqual(
[
"ffmpeg",
"-y",
"-i",
"input.mkv",
"-map",
"0:s:0",
"-c",
"copy",
"-f",
"srt",
"episode_1_eng.srt",
],
sequence,
)
if __name__ == "__main__":
unittest.main()

View File

@@ -0,0 +1,64 @@
from __future__ import annotations
from pathlib import Path
import sys
import unittest
SRC_ROOT = Path(__file__).resolve().parents[2] / "src"
if str(SRC_ROOT) not in sys.path:
sys.path.insert(0, str(SRC_ROOT))
from ffx.file_properties import FileProperties # noqa: E402
from ffx.i18n import set_current_language # noqa: E402
from ffx.logging_utils import get_ffx_logger # noqa: E402
from ffx.track_codec import TrackCodec # noqa: E402
from ffx.track_type import TrackType # noqa: E402
class StaticConfig:
def __init__(self, data: dict):
self._data = data
def getData(self):
return self._data
class FilePropertiesAssetProbeTests(unittest.TestCase):
def tearDown(self):
set_current_language("de")
def test_boruto_webm_probe_recognizes_webm_stream_codecs(self):
context = {
"logger": get_ffx_logger(),
"config": StaticConfig({}),
"language": "de",
"use_pattern": False,
}
set_current_language("de")
media_path = (
Path(__file__).resolve().parents[1]
/ "assets"
/ "Boruto; Naruto Next Generations (2017) - 0069 Super-Chochos Liebestaumel - S01E0069.webm"
)
file_properties = FileProperties(context, str(media_path))
tracks = file_properties.getMediaDescriptor().getTrackDescriptors()
subtitle_codecs = [
track.getCodec()
for track in tracks
if track.getType() == TrackType.SUBTITLE
]
self.assertIn(TrackCodec.VP9, [track.getCodec() for track in tracks])
self.assertIn(TrackCodec.OPUS, [track.getCodec() for track in tracks])
self.assertTrue(subtitle_codecs)
self.assertTrue(all(codec == TrackCodec.WEBVTT for codec in subtitle_codecs))
if __name__ == "__main__":
unittest.main()

View File

@@ -16,8 +16,10 @@ if str(SRC_ROOT) not in sys.path:
from ffx.logging_utils import ( # noqa: E402 from ffx.logging_utils import ( # noqa: E402
CONSOLE_HANDLER_NAME, CONSOLE_HANDLER_NAME,
FILE_HANDLER_NAME, FILE_HANDLER_NAME,
MUTED_CONSOLE_LEVEL,
configure_ffx_logger, configure_ffx_logger,
get_ffx_logger, get_ffx_logger,
set_ffx_console_logging_enabled,
) )
@@ -81,6 +83,33 @@ class LoggingUtilsTests(unittest.TestCase):
self.cleanup_logger(logger_name) self.cleanup_logger(logger_name)
def test_set_ffx_console_logging_enabled_mutes_and_restores_console_handler(self):
logger_name = "ffx-test-console-mute"
self.cleanup_logger(logger_name)
with tempfile.TemporaryDirectory() as tempdir:
log_path = Path(tempdir) / "ffx.log"
logger = configure_ffx_logger(
str(log_path),
logging.DEBUG,
logging.INFO,
name=logger_name,
)
console_handler = next(
handler for handler in logger.handlers if handler.get_name() == CONSOLE_HANDLER_NAME
)
self.assertEqual(logging.INFO, console_handler.level)
set_ffx_console_logging_enabled(logger, enabled=False)
self.assertEqual(MUTED_CONSOLE_LEVEL, console_handler.level)
set_ffx_console_logging_enabled(logger, enabled=True)
self.assertEqual(logging.INFO, console_handler.level)
self.cleanup_logger(logger_name)
if __name__ == "__main__": if __name__ == "__main__":
unittest.main() unittest.main()

View File

@@ -247,7 +247,8 @@ class MediaDescriptorChangeSetTests(unittest.TestCase):
self.assertIn("title=German", metadata_tokens) self.assertIn("title=German", metadata_tokens)
self.assertNotIn("title=Deutsch", metadata_tokens) self.assertNotIn("title=Deutsch", metadata_tokens)
def test_non_subtitle_track_without_title_does_not_get_language_name(self): def test_audio_track_without_title_gets_language_name_when_normalization_enabled(self):
set_current_language("de")
context = { context = {
"logger": get_ffx_logger(), "logger": get_ffx_logger(),
"config": StaticConfig({}), "config": StaticConfig({}),
@@ -278,6 +279,73 @@ class MediaDescriptorChangeSetTests(unittest.TestCase):
self.assertIn("-metadata:s:a:0", metadata_tokens) self.assertIn("-metadata:s:a:0", metadata_tokens)
self.assertIn("language=deu", metadata_tokens) self.assertIn("language=deu", metadata_tokens)
self.assertIn("title=Deutsch", metadata_tokens)
def test_video_track_without_title_gets_language_name_when_normalization_enabled(self):
set_current_language("de")
context = {
"logger": get_ffx_logger(),
"config": StaticConfig({}),
}
source_track = TrackDescriptor(
index=0,
source_index=0,
sub_index=0,
track_type=TrackType.VIDEO,
tags={"language": "ger"},
)
target_track = TrackDescriptor(
index=0,
source_index=0,
sub_index=0,
track_type=TrackType.VIDEO,
tags={"language": "ger"},
)
change_set = MediaDescriptorChangeSet(
context,
MediaDescriptor(track_descriptors=[target_track]),
MediaDescriptor(track_descriptors=[source_track]),
)
metadata_tokens = change_set.generateMetadataTokens()
self.assertIn("language=deu", metadata_tokens)
self.assertIn("title=Deutsch", metadata_tokens)
def test_changed_track_language_does_not_autofill_title_when_title_already_exists(self):
set_current_language("de")
context = {
"logger": get_ffx_logger(),
"config": StaticConfig({}),
}
source_track = TrackDescriptor(
index=0,
source_index=0,
sub_index=0,
track_type=TrackType.SUBTITLE,
tags={"language": "ger", "title": "Deutsch [FN]"},
)
target_track = TrackDescriptor(
index=0,
source_index=0,
sub_index=0,
track_type=TrackType.SUBTITLE,
tags={"language": "jpn", "title": "Deutsch [FN]"},
)
change_set = MediaDescriptorChangeSet(
context,
MediaDescriptor(track_descriptors=[target_track]),
MediaDescriptor(track_descriptors=[source_track]),
)
metadata_tokens = change_set.generateMetadataTokens()
self.assertIn("language=jpn", metadata_tokens)
self.assertNotIn("title=Japanisch", metadata_tokens)
self.assertNotIn("title=Deutsch", metadata_tokens) self.assertNotIn("title=Deutsch", metadata_tokens)
def test_target_only_tracks_still_emit_remove_tokens_for_configured_stream_keys(self): def test_target_only_tracks_still_emit_remove_tokens_for_configured_stream_keys(self):

View File

@@ -18,6 +18,7 @@ from ffx.logging_utils import get_ffx_logger # noqa: E402
from ffx.media_descriptor import MediaDescriptor # noqa: E402 from ffx.media_descriptor import MediaDescriptor # noqa: E402
from ffx.metadata_editor import ( # noqa: E402 from ffx.metadata_editor import ( # noqa: E402
apply_metadata_edits, apply_metadata_edits,
build_metadata_edit_command,
build_metadata_edit_context, build_metadata_edit_context,
create_temporary_output_path, create_temporary_output_path,
) )
@@ -77,15 +78,45 @@ class MetadataEditorTests(unittest.TestCase):
self.assertEqual(".mkv", Path(temporary_path).suffix) self.assertEqual(".mkv", Path(temporary_path).suffix)
self.assertEqual(Path(source_path).parent, Path(temporary_path).parent) self.assertEqual(Path(source_path).parent, Path(temporary_path).parent)
def test_build_metadata_edit_command_maps_all_streams_and_uses_single_copy_codec(self):
context = build_metadata_edit_context(make_context())
baseline_descriptor = make_descriptor()
draft_descriptor = baseline_descriptor.clone(context=context)
command = build_metadata_edit_command(
context,
"/tmp/example.mkv",
"/tmp/.edit.mkv",
baseline_descriptor,
draft_descriptor,
)
self.assertEqual(1, command.count("-map"))
self.assertEqual(1, command.count("-c"))
self.assertNotIn("-c:v:0", command)
self.assertNotIn("-c:a:0", command)
self.assertNotIn("-c:s:0", command)
self.assertEqual(
["-map", "0", "-c", "copy"],
command[command.index("-map"):command.index("-c") + 2],
)
def test_apply_metadata_edits_rewrites_via_temporary_file_then_replaces_source(self): def test_apply_metadata_edits_rewrites_via_temporary_file_then_replaces_source(self):
context = make_context() context = make_context()
baseline_descriptor = make_descriptor() baseline_descriptor = make_descriptor()
draft_descriptor = baseline_descriptor.clone(context=context) draft_descriptor = baseline_descriptor.clone(context=context)
source_path = "/tmp/example.mkv" source_path = "/tmp/example.mkv"
expected_command = build_metadata_edit_command(
build_metadata_edit_context(context),
source_path,
"/tmp/.edit.mkv",
baseline_descriptor,
draft_descriptor,
)
with ( with (
patch("ffx.metadata_editor.create_temporary_output_path", return_value="/tmp/.edit.mkv"), patch("ffx.metadata_editor.create_temporary_output_path", return_value="/tmp/.edit.mkv"),
patch("ffx.metadata_editor.FfxController.runJob") as mocked_run_job, patch("ffx.metadata_editor.executeProcess", return_value=("", "", 0)) as mocked_execute,
patch("ffx.metadata_editor.os.replace") as mocked_replace, patch("ffx.metadata_editor.os.replace") as mocked_replace,
): ):
result = apply_metadata_edits( result = apply_metadata_edits(
@@ -95,32 +126,43 @@ class MetadataEditorTests(unittest.TestCase):
draft_descriptor, draft_descriptor,
) )
mocked_run_job.assert_called_once_with( mocked_execute.assert_called_once_with(expected_command, context=build_metadata_edit_context(context))
source_path,
"/tmp/.edit.mkv",
targetFormat="",
chainIteration=[],
)
mocked_replace.assert_called_once_with("/tmp/.edit.mkv", source_path) mocked_replace.assert_called_once_with("/tmp/.edit.mkv", source_path)
self.assertEqual( self.assertEqual(
{ {
"applied": True, "applied": True,
"dry_run": False, "dry_run": False,
"target_path": source_path, "target_path": source_path,
"command_sequence": expected_command,
},
{
"applied": result["applied"],
"dry_run": result["dry_run"],
"target_path": result["target_path"],
"command_sequence": result["command_sequence"],
}, },
result,
) )
self.assertIn("timings", result)
self.assertIn("ffmpeg_seconds", result["timings"])
self.assertIn("replace_seconds", result["timings"])
self.assertIn("write_seconds", result["timings"])
def test_apply_metadata_edits_dry_run_skips_replace_and_cleans_temp_path(self): def test_apply_metadata_edits_dry_run_skips_replace_and_cleans_temp_path(self):
context = make_context(dry_run=True) context = make_context(dry_run=True)
baseline_descriptor = make_descriptor() baseline_descriptor = make_descriptor()
draft_descriptor = baseline_descriptor.clone(context=context) draft_descriptor = baseline_descriptor.clone(context=context)
notifications = []
expected_command = build_metadata_edit_command(
build_metadata_edit_context(context),
"/tmp/example.mkv",
"/tmp/.edit.mkv",
baseline_descriptor,
draft_descriptor,
)
with ( with (
patch("ffx.metadata_editor.create_temporary_output_path", return_value="/tmp/.edit.mkv"), patch("ffx.metadata_editor.create_temporary_output_path", return_value="/tmp/.edit.mkv"),
patch("ffx.metadata_editor.FfxController.runJob") as mocked_run_job, patch("ffx.metadata_editor.executeProcess") as mocked_execute,
patch("ffx.metadata_editor.os.path.exists", return_value=True),
patch("ffx.metadata_editor.os.remove") as mocked_remove,
patch("ffx.metadata_editor.os.replace") as mocked_replace, patch("ffx.metadata_editor.os.replace") as mocked_replace,
): ):
result = apply_metadata_edits( result = apply_metadata_edits(
@@ -128,19 +170,57 @@ class MetadataEditorTests(unittest.TestCase):
"/tmp/example.mkv", "/tmp/example.mkv",
baseline_descriptor, baseline_descriptor,
draft_descriptor, draft_descriptor,
loggingHandler = notifications.append,
) )
mocked_run_job.assert_called_once() mocked_execute.assert_not_called()
mocked_replace.assert_not_called() mocked_replace.assert_not_called()
mocked_remove.assert_called_once_with("/tmp/.edit.mkv") self.assertEqual(["ffmpeg dry-run prepared."], notifications)
self.assertEqual( self.assertEqual(
{ {
"applied": False, "applied": False,
"dry_run": True, "dry_run": True,
"target_path": "/tmp/.edit.mkv", "target_path": "/tmp/.edit.mkv",
"command_sequence": expected_command,
},
{
"applied": result["applied"],
"dry_run": result["dry_run"],
"target_path": result["target_path"],
"command_sequence": result["command_sequence"],
}, },
result,
) )
self.assertEqual(
{
"ffmpeg_seconds": 0.0,
"replace_seconds": 0.0,
"write_seconds": 0.0,
},
result["timings"],
)
def test_apply_metadata_edits_notifies_with_command_when_verbose(self):
context = make_context()
context["verbosity"] = 1
baseline_descriptor = make_descriptor()
draft_descriptor = baseline_descriptor.clone(context=context)
notifications = []
with (
patch("ffx.metadata_editor.create_temporary_output_path", return_value="/tmp/.edit.mkv"),
patch("ffx.metadata_editor.executeProcess", return_value=("", "", 0)),
patch("ffx.metadata_editor.os.replace"),
):
apply_metadata_edits(
context,
"/tmp/example.mkv",
baseline_descriptor,
draft_descriptor,
loggingHandler = notifications.append,
)
self.assertEqual(1, len(notifications))
self.assertTrue(notifications[0].startswith("ffmpeg: ffmpeg "))
if __name__ == "__main__": if __name__ == "__main__":

View File

@@ -1,6 +1,7 @@
from __future__ import annotations from __future__ import annotations
from pathlib import Path from pathlib import Path
import logging
import sys import sys
import unittest import unittest
from unittest.mock import patch from unittest.mock import patch
@@ -57,9 +58,38 @@ class FakeScreen:
self.app = FakeApp(screen_stack) self.app = FakeApp(screen_stack)
class FakeRichLog:
def __init__(self):
self.messages = []
def write(self, message):
self.messages.append(message)
class FakeScreenWithLog:
def __init__(self):
self.log_view = FakeRichLog()
def query_one(self, selector, _widget_type=None):
if selector == f"#{screen_support.SCREEN_LOG_VIEW_ID}":
return self.log_view
raise LookupError(selector)
class FakeThreadedApp:
def __init__(self, screen):
self.screen = screen
self.calls = []
def call_from_thread(self, func, *args):
self.calls.append((func, args))
return func(*args)
class ScreenSupportTests(unittest.TestCase): class ScreenSupportTests(unittest.TestCase):
def tearDown(self): def tearDown(self):
set_current_language("de") set_current_language("de")
screen_support.set_screen_log_pane_enabled(False)
def make_context(self): def make_context(self):
return { return {
@@ -168,6 +198,63 @@ class ScreenSupportTests(unittest.TestCase):
self.assertGreater(len(translated), 8) self.assertGreater(len(translated), 8)
self.assertEqual(len(translated) + 2, screen_support.localized_column_width(translated, 8)) self.assertEqual(len(translated) + 2, screen_support.localized_column_width(translated, 8))
def test_build_screen_log_pane_is_hidden_when_debug_mode_is_disabled(self):
screen_support.set_screen_log_pane_enabled(False)
log_pane = screen_support.build_screen_log_pane()
self.assertFalse(log_pane.display)
def test_build_screen_log_pane_is_collapsed_when_debug_mode_is_enabled(self):
screen_support.set_screen_log_pane_enabled(True)
log_pane = screen_support.build_screen_log_pane()
self.assertIsInstance(log_pane, screen_support.ResizableScreenLogPane)
self.assertEqual(screen_support.SCREEN_LOG_PANE_ID, log_pane.id)
self.assertTrue(log_pane.collapsed)
def test_resizable_screen_log_pane_clamps_height_to_minimum(self):
log_pane = screen_support.ResizableScreenLogPane()
log_pane.set_log_height(1)
self.assertEqual(screen_support.SCREEN_LOG_MIN_HEIGHT, log_pane.get_log_height())
def test_configure_screen_log_handler_routes_logger_messages_to_active_screen(self):
logger_name = "ffx-test-screen-log-handler"
logger = logging.getLogger(logger_name)
logger.setLevel(logging.DEBUG)
logger.propagate = False
for handler in list(logger.handlers):
logger.removeHandler(handler)
handler.close()
screen = FakeScreenWithLog()
app = FakeThreadedApp(screen)
try:
handler = screen_support.configure_screen_log_handler(
logger,
app,
enabled=True,
)
self.assertIsNotNone(handler)
logger.info("hello pane")
self.assertEqual(1, len(screen.log_view.messages))
self.assertRegex(
screen.log_view.messages[0],
r"^ffx-test-screen-log-handler\s+INFO\s+\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2} \| hello pane$",
)
finally:
screen_support.configure_screen_log_handler(logger, app, enabled=False)
for handler in list(logger.handlers):
logger.removeHandler(handler)
handler.close()
if __name__ == "__main__": if __name__ == "__main__":
unittest.main() unittest.main()

View File

@@ -14,10 +14,12 @@ if str(SRC_ROOT) not in sys.path:
from ffx.audio_layout import AudioLayout # noqa: E402 from ffx.audio_layout import AudioLayout # noqa: E402
from ffx.helper import DIFF_ADDED_KEY # noqa: E402
from ffx.iso_language import IsoLanguage # noqa: E402 from ffx.iso_language import IsoLanguage # noqa: E402
from ffx.logging_utils import get_ffx_logger # noqa: E402 from ffx.logging_utils import get_ffx_logger # noqa: E402
from ffx.inspect_details_screen import InspectDetailsScreen # noqa: E402 from ffx.inspect_details_screen import InspectDetailsScreen # noqa: E402
from ffx.i18n import set_current_language # noqa: E402 from ffx.i18n import set_current_language # noqa: E402
from ffx.media_descriptor import MediaDescriptor # noqa: E402
from ffx.media_edit_screen import MediaEditScreen # noqa: E402 from ffx.media_edit_screen import MediaEditScreen # noqa: E402
from ffx.pattern_details_screen import PatternDetailsScreen # noqa: E402 from ffx.pattern_details_screen import PatternDetailsScreen # noqa: E402
from ffx.show_descriptor import ShowDescriptor # noqa: E402 from ffx.show_descriptor import ShowDescriptor # noqa: E402
@@ -89,16 +91,21 @@ class FakeTagTable:
class FakeMediaDescriptor: class FakeMediaDescriptor:
def __init__(self, track_descriptors): def __init__(self, track_descriptors, tags=None):
self._track_descriptors = list(track_descriptors) self._track_descriptors = list(track_descriptors)
self._tags = dict(tags or {})
def getTrackDescriptors(self): def getTrackDescriptors(self):
return list(self._track_descriptors) return list(self._track_descriptors)
def getTags(self):
return dict(self._tags)
class FakeValueWidget: class FakeValueWidget:
def __init__(self, value): def __init__(self, value):
self.value = value self.value = value
self.disabled = False
class FakeInputWidget: class FakeInputWidget:
@@ -106,10 +113,21 @@ class FakeInputWidget:
self.value = value self.value = value
class FakeStaticWidget:
def __init__(self, value=""):
self.value = value
def update(self, value):
self.value = value
class FakeSelectionListWidget: class FakeSelectionListWidget:
def __init__(self, selected): def __init__(self, selected):
self.selected = selected self.selected = selected
def add_option(self, _option):
return None
def make_track_descriptor(index, sub_index, track_type): def make_track_descriptor(index, sub_index, track_type):
return TrackDescriptor( return TrackDescriptor(
@@ -244,6 +262,49 @@ class TagTableScreenStateTests(unittest.TestCase):
self.assertEqual("Preset", widgets["#title_input"].value) self.assertEqual("Preset", widgets["#title_input"].value)
def test_track_details_screen_metadata_only_mount_shows_normalized_title_preview(self):
set_current_language("de")
screen = object.__new__(TrackDetailsScreen)
screen._TrackDetailsScreen__index = 2
screen._TrackDetailsScreen__subIndex = 0
screen._TrackDetailsScreen__patternLabel = "demo"
screen._TrackDetailsScreen__trackType = TrackType.AUDIO
screen._TrackDetailsScreen__audioLayout = AudioLayout.LAYOUT_STEREO
screen._TrackDetailsScreen__trackDescriptor = TrackDescriptor(
index=2,
source_index=2,
sub_index=0,
track_type=TrackType.AUDIO,
codec_name=TrackCodec.DTS,
audio_layout=AudioLayout.LAYOUT_STEREO,
tags={"language": "ger"},
)
screen._TrackDetailsScreen__metadataOnly = True
screen._TrackDetailsScreen__titleAutoManaged = True
screen._TrackDetailsScreen__suppressTitleChanged = False
screen._TrackDetailsScreen__lastAutoTitle = ""
screen._TrackDetailsScreen__removeTrackKeys = []
screen._TrackDetailsScreen__ignoreTrackKeys = []
screen._TrackDetailsScreen__draftTrackTags = {}
screen._TrackDetailsScreen__tagRowData = {}
screen.updateTags = lambda: None
widgets = {
"#index_label": FakeStaticWidget(),
"#subindex_label": FakeStaticWidget(),
"#pattern_label": FakeStaticWidget(),
"#type_select": FakeValueWidget(None),
"#audio_layout_select": FakeValueWidget(None),
"#dispositions_selection_list": FakeSelectionListWidget(set()),
"#language_select": FakeValueWidget(None),
"#title_input": FakeInputWidget(""),
}
screen.query_one = lambda selector, _widget_type=None: widgets[selector]
screen.on_mount()
self.assertEqual("Deutsch", widgets["#title_input"].value)
def test_track_details_screen_language_options_are_sorted_by_localized_label(self): def test_track_details_screen_language_options_are_sorted_by_localized_label(self):
set_current_language("de") set_current_language("de")
@@ -326,12 +387,177 @@ class TagTableScreenStateTests(unittest.TestCase):
screen.tracksTable = FakeTagTable() screen.tracksTable = FakeTagTable()
screen._sourceMediaDescriptor = FakeMediaDescriptor([first_track]) screen._sourceMediaDescriptor = FakeMediaDescriptor([first_track])
screen._trackRowData = {} screen._trackRowData = {}
screen._applyNormalization = False
screen.updateTracks() screen.updateTracks()
self.assertEqual(9, len(screen.tracksTable.columns)) self.assertEqual(9, len(screen.tracksTable.columns))
self.assertIn("A much longer updated title", screen.tracksTable.rows["row-0"]) self.assertIn("A much longer updated title", screen.tracksTable.rows["row-0"])
def test_media_edit_screen_shows_normalized_audio_title_preview(self):
set_current_language("de")
audio_track = TrackDescriptor(
index=1,
source_index=1,
sub_index=0,
track_type=TrackType.AUDIO,
codec_name=TrackCodec.DTS,
audio_layout=AudioLayout.LAYOUT_STEREO,
tags={"language": "ger"},
)
screen = object.__new__(MediaEditScreen)
screen.tracksTable = FakeTagTable()
screen._sourceMediaDescriptor = FakeMediaDescriptor([audio_track])
screen._trackRowData = {}
screen._applyNormalization = True
screen.updateTracks()
self.assertIn("Deutsch", screen.tracksTable.rows["row-0"])
def test_media_edit_screen_shows_normalized_video_title_preview(self):
set_current_language("de")
video_track = TrackDescriptor(
index=0,
source_index=0,
sub_index=0,
track_type=TrackType.VIDEO,
codec_name=TrackCodec.H264,
tags={"language": "ger"},
)
screen = object.__new__(MediaEditScreen)
screen.tracksTable = FakeTagTable()
screen._sourceMediaDescriptor = FakeMediaDescriptor([video_track])
screen._trackRowData = {}
screen._applyNormalization = True
screen.updateTracks()
self.assertIn("Deutsch", screen.tracksTable.rows["row-0"])
def test_media_edit_screen_toggle_normalization_refreshes_tracks(self):
screen = object.__new__(MediaEditScreen)
screen._applyNormalization = False
calls = []
screen.setApplyNormalization = lambda enabled: (
setattr(screen, "_applyNormalization", bool(enabled)),
calls.append("setApplyNormalization"),
)
screen.updateToggleButtons = lambda: calls.append("updateToggleButtons")
screen.updateTracks = lambda: calls.append("updateTracks")
screen.updateDifferences = lambda: calls.append("updateDifferences")
screen.setMessage = lambda _message: calls.append("setMessage")
screen.action_toggle_normalization()
self.assertEqual(
[
"setApplyNormalization",
"updateToggleButtons",
"updateTracks",
"updateDifferences",
"setMessage",
],
calls,
)
def test_media_edit_screen_handle_edit_track_updates_draft_descriptor(self):
original_track = TrackDescriptor(
index=1,
source_index=1,
sub_index=0,
track_type=TrackType.SUBTITLE,
codec_name=TrackCodec.UNKNOWN,
tags={"language": "ger"},
)
context = {"logger": get_ffx_logger()}
updated_track = original_track.clone(context=context)
updated_track.getTags()["language"] = "eng"
screen = object.__new__(MediaEditScreen)
screen.context = context
screen._sourceMediaDescriptor = MediaDescriptor(
context=context,
track_descriptors=[original_track],
)
calls = []
screen.setMessage = lambda _message: calls.append("setMessage")
screen.refreshAfterDraftChange = lambda: calls.append("refreshAfterDraftChange")
screen.handle_edit_track(updated_track)
self.assertEqual(
"eng",
screen._sourceMediaDescriptor.getTrackDescriptors()[0].getTags()["language"],
)
self.assertEqual(
["setMessage", "refreshAfterDraftChange"],
calls,
)
def test_media_edit_screen_screen_resume_refreshes_draft_tables(self):
screen = object.__new__(MediaEditScreen)
screen.tracksTable = FakeTagTable()
calls = []
screen.refreshAfterDraftChange = lambda: calls.append("refreshAfterDraftChange")
screen.updateToggleButtons = lambda: calls.append("updateToggleButtons")
screen.on_screen_resume(None)
self.assertEqual(
["refreshAfterDraftChange", "updateToggleButtons"],
calls,
)
def test_pattern_details_screen_screen_resume_refreshes_tables(self):
screen = object.__new__(PatternDetailsScreen)
screen.tracksTable = FakeTagTable()
screen.tagsTable = FakeTagTable()
screen.shiftedSeasonsTable = FakeTagTable()
screen._PatternDetailsScreen__pattern = object()
calls = []
screen.updateTags = lambda: calls.append("updateTags")
screen.updateTracks = lambda: calls.append("updateTracks")
screen.updateShiftedSeasons = lambda: calls.append("updateShiftedSeasons")
screen.on_screen_resume(None)
self.assertEqual(
["updateTags", "updateTracks", "updateShiftedSeasons"],
calls,
)
def test_inspect_details_screen_handle_edit_pattern_refreshes_even_without_result(self):
screen = object.__new__(InspectDetailsScreen)
calls = []
screen.reloadProperties = lambda reset_draft=True: calls.append(
("reloadProperties", reset_draft)
)
screen._currentPattern = None
screen.updateMediaTags = lambda: calls.append("updateMediaTags")
screen.updateTracks = lambda: calls.append("updateTracks")
screen.updateDifferences = lambda: calls.append("updateDifferences")
screen.handle_edit_pattern(None)
self.assertEqual(
[
("reloadProperties", True),
"updateMediaTags",
"updateTracks",
"updateDifferences",
],
calls,
)
def test_pattern_details_screen_reads_selected_shifted_season_from_row_mapping(self): def test_pattern_details_screen_reads_selected_shifted_season_from_row_mapping(self):
screen = object.__new__(PatternDetailsScreen) screen = object.__new__(PatternDetailsScreen)
screen.shiftedSeasonsTable = FakeTagTable() screen.shiftedSeasonsTable = FakeTagTable()
@@ -438,6 +664,127 @@ class TagTableScreenStateTests(unittest.TestCase):
self.assertNotIn(placeholder_key, screen._showRowData) self.assertNotIn(placeholder_key, screen._showRowData)
self.assertEqual(0, screen.getRowIndexFromShowId(8)) self.assertEqual(0, screen.getRowIndexFromShowId(8))
def test_inspect_details_screen_update_tracks_shows_target_pattern_tracks(self):
source_track = TrackDescriptor(
index=1,
source_index=1,
sub_index=0,
track_type=TrackType.SUBTITLE,
codec_name=TrackCodec.UNKNOWN,
tags={"language": "ger", "title": "German Full"},
)
target_track = TrackDescriptor(
index=1,
source_index=1,
sub_index=0,
track_type=TrackType.SUBTITLE,
codec_name=TrackCodec.UNKNOWN,
tags={"language": "eng", "title": "English Full"},
)
screen = object.__new__(InspectDetailsScreen)
screen.tracksTable = FakeTagTable()
screen._sourceMediaDescriptor = FakeMediaDescriptor([source_track])
screen._targetMediaDescriptor = FakeMediaDescriptor([target_track])
screen._currentPattern = object()
screen._trackRowData = {}
screen._applyNormalization = False
screen.updateTracks()
self.assertIn("English Full", screen.tracksTable.rows["row-0"])
self.assertIs(target_track, screen.getSelectedTrackDescriptor())
def test_inspect_details_screen_maps_target_selection_back_to_source_track(self):
source_track = TrackDescriptor(
index=3,
source_index=7,
sub_index=1,
track_type=TrackType.SUBTITLE,
codec_name=TrackCodec.UNKNOWN,
tags={"language": "ger"},
)
target_track = TrackDescriptor(
index=1,
source_index=7,
sub_index=0,
track_type=TrackType.SUBTITLE,
codec_name=TrackCodec.UNKNOWN,
tags={"language": "eng"},
)
screen = object.__new__(InspectDetailsScreen)
screen.tracksTable = FakeTagTable()
screen._sourceMediaDescriptor = FakeMediaDescriptor([source_track])
screen._targetMediaDescriptor = FakeMediaDescriptor([target_track])
screen._currentPattern = object()
screen._trackRowData = {}
screen._applyNormalization = False
screen.updateTracks()
self.assertIs(source_track, screen.getTrackEditSourceDescriptor())
def test_inspect_details_screen_action_update_pattern_uses_existing_change_set_before_reload(self):
class _FakePattern:
def getPattern(self):
return r"demo_(s[0-9]+e[0-9]+)\.mkv"
def getId(self):
return 9
class _FakeTagController:
def __init__(self, calls):
self._calls = calls
def deleteMediaTagByKey(self, pattern_id, key):
self._calls.append(("deleteMediaTagByKey", pattern_id, key))
calls = []
screen = object.__new__(InspectDetailsScreen)
screen._currentPattern = _FakePattern()
screen._mediaChangeSetObj = {
"tags": {
DIFF_ADDED_KEY: {"TITLE": "Demo"},
}
}
screen._tac = _FakeTagController(calls)
screen._tc = type(
"_FakeTrackController",
(),
{
"addTrack": staticmethod(lambda *_args, **_kwargs: None),
"deleteTrack": staticmethod(lambda *_args, **_kwargs: None),
"setDispositionState": staticmethod(lambda *_args, **_kwargs: None),
},
)()
screen._sourceMediaDescriptor = FakeMediaDescriptor([], tags={})
screen._targetMediaDescriptor = FakeMediaDescriptor([])
screen.getPatternObjFromInput = lambda: {
"show_id": 1,
"pattern": r"demo_(s[0-9]+e[0-9]+)\.mkv",
}
screen.reloadProperties = lambda reset_draft=True: calls.append(
("reloadProperties", reset_draft)
)
screen.updateMediaTags = lambda: calls.append("updateMediaTags")
screen.updateTracks = lambda: calls.append("updateTracks")
screen.updateDifferences = lambda: calls.append("updateDifferences")
screen.action_update_pattern()
self.assertEqual(
[
("deleteMediaTagByKey", 9, "TITLE"),
("reloadProperties", True),
"updateMediaTags",
"updateTracks",
"updateDifferences",
],
calls,
)
if __name__ == "__main__": if __name__ == "__main__":
unittest.main() unittest.main()

View File

@@ -0,0 +1,25 @@
from __future__ import annotations
from pathlib import Path
import sys
import unittest
SRC_ROOT = Path(__file__).resolve().parents[2] / "src"
if str(SRC_ROOT) not in sys.path:
sys.path.insert(0, str(SRC_ROOT))
from ffx.track_codec import TrackCodec # noqa: E402
class TrackCodecIdentificationTests(unittest.TestCase):
def test_identify_modern_webm_codecs(self):
self.assertEqual(TrackCodec.VP9, TrackCodec.identify("vp9"))
self.assertEqual(TrackCodec.OPUS, TrackCodec.identify("opus"))
self.assertEqual(TrackCodec.WEBVTT, TrackCodec.identify("webvtt"))
if __name__ == "__main__":
unittest.main()