19 Commits

Author SHA1 Message Date
Javanaut
b80c055826 fix table 2026-04-17 13:17:15 +02:00
Javanaut
c5fc6ac13d fix styled ASS and refactor att format 2026-04-17 11:41:13 +02:00
Javanaut
1bead05d19 ff 2026-04-16 19:36:40 +02:00
Javanaut
9fe2a842e9 ff 2026-04-16 19:32:41 +02:00
Javanaut
849d03d054 v0.3.1 2026-04-16 19:26:17 +02:00
Javanaut
3a87bbbba6 Anpassung --cut flag 2026-04-16 19:02:57 +02:00
Javanaut
ab5e8e53e1 Fix debug title 2026-04-16 18:32:07 +02:00
Javanaut
0ab2408444 Fix h265 format 2026-04-16 18:20:17 +02:00
Javanaut
bc1e0889e7 Fix inspect details screen 2026-04-16 18:10:39 +02:00
Javanaut
6dfbe1022a Merge branch 'dev' of gitea.maveno.de:Javanaut/ffx into dev 2026-04-15 15:50:27 +02:00
Javanaut
d3d2de8a0d adds scratchpad points 2026-04-15 15:50:24 +02:00
Javanaut
0728ece4b8 Fix h265 subtrack unmux 2026-04-15 00:03:17 +02:00
Javanaut
02e375fbf2 nnn 2026-04-14 19:08:29 +02:00
Javanaut
14e6ce8458 Fix logging 2026-04-14 10:04:39 +02:00
Javanaut
d921629947 v0.3.0 2026-04-14 00:55:42 +02:00
Javanaut
65490e2a7f ff 2026-04-14 00:44:43 +02:00
Javanaut
6c5b518e4d ffn 2026-04-14 00:26:16 +02:00
Javanaut
e3c18f22d4 Adds UI tweaks nightly 2026-04-13 23:11:14 +02:00
Javanaut
57185c7f10 Adds missing codecs 2026-04-13 20:15:10 +02:00
54 changed files with 2531 additions and 182 deletions

3
.gitignore vendored
View File

@@ -10,6 +10,8 @@ tools/ansible/inventory/group_vars/all.yml
ffx_test_report.log
bin/conversiontest.py
tests/assets/
build/
dist/
*.egg-info/
@@ -20,5 +22,6 @@ venv/
*.mkv
*.webm
*.mp4
ffmpeg2pass-0.log
*.sup

View File

@@ -99,6 +99,20 @@ TMDB-backed metadata enrichment requires `TMDB_API_KEY` to be set in the environ
## Version History
### 0.3.1
- debug mode screen titles now append the active Textual screen class name, making screen-specific troubleshooting easier during inspect and edit flows
- `--cut` again works as a combined flag/option: omitted disables cutting, bare `--cut` applies the default `60,180`, and explicit duration or `START,DURATION` values stay supported
- H.265 unmux commands no longer force an invalid `-f h265` output format, keeping ffmpeg copy extraction aligned with the required Annex B bitstream filter
- H.264 encoding now falls back from `libx264` to `libopenh264` with a warning when needed, and the test fixtures use the same encoder fallback so the suite remains portable across ffmpeg builds
### 0.3.0
- inspect and edit screens now refresh nested track and pattern changes more reliably, with inspect-mode tables aligned to the target pattern view shown in the differences pane
- metadata editing got a follow-up polish pass with clearer ffmpeg notifications, a shared in-screen log pane, safer apply/reload handling, and expanded cleanup and normalization coverage
- track and asset probing recognize additional codecs, and the modern test suite now covers more metadata-editor, change-set, screen-state, and asset-probe behavior
- Textual now requires version `8.0` or newer to match the UI APIs used by the current screens
### 0.2.6
- DB-free `ffx edit` workflow for in-place metadata editing via temporary-file rewrite

View File

@@ -69,3 +69,15 @@
## Delete When
- Delete this scratchpad once the optimization backlog is either converted into issues/work items or distilled into durable project guidance.
## Missing Timestamps
Detect ffmpeg warning "Timestamps are unset in a packet for stream 0. This is deprecated and will stop working in the future. Fix your code to set the timestamps properly" and try autofix by -fflags +genpts -> Warning if fails -> Error. Check if flags collide with anything.
<!--
## Source Formats
-->

170
docs/file_formats.md Normal file
View File

@@ -0,0 +1,170 @@
# File Formats
This document captures source-file-format notes that complement the normative
requirements in `requirements/source_file_formats.md`.
The first documented format is a Matroska source that carries styled ASS/SSA
subtitle streams together with embedded font attachments.
## Styled ASS In Matroska With Embedded Fonts
These files are typically `.mkv` releases where subtitle rendering quality
depends on keeping both parts of the subtitle package together:
- one or more subtitle streams with codec `ass`
- one or more attachment streams that embed font files used by those subtitles
This matters because ASS subtitles are not plain text subtitles in the narrow
WebVTT sense. They can carry layout, styling, positioning, karaoke, signs, and
other typesetting effects. If the matching embedded fonts are lost, consumers
can still see subtitle text but the intended styling and sometimes glyph
coverage can be degraded.
For FFX this format is special because the ASS subtitle streams should remain
normally editable and mappable, while the related font attachments should be
transported unchanged.
## Observed Sample
Assessment date: `2026-04-17`
Observed sample file:
- `tests/assets/boruto_s01e283_ssa.mkv`
Commands used for assessment:
```bash
ffprobe tests/assets/boruto_s01e283_ssa.mkv
ffprobe -hide_banner -show_format -show_streams -of json tests/assets/boruto_s01e283_ssa.mkv
```
Observed stream layout:
| Stream index | Kind | Key details |
| --- | --- | --- |
| `0` | video | `codec_name=h264` |
| `1` | audio | `codec_name=aac`, `language=jpn` |
| `2` | subtitle | `codec_name=ass`, `language=ger`, default |
| `3` | subtitle | `codec_name=ass`, `language=eng` |
| `4`-`13` | attachment | `tags.mimetype=font/ttf`, `.ttf` filenames |
Observed attachment filenames:
- `AmazonEmberTanuki-Italic.ttf`
- `AmazonEmberTanuki-Regular.ttf`
- `Arial.ttf`
- `Arial Bold.ttf`
- `Georgia.ttf`
- `Times New Roman.ttf`
- `Times New Roman Bold.ttf`
- `Trebuchet MS.ttf`
- `Verdana.ttf`
- `Verdana Bold.ttf`
Important probe behavior from the real sample:
- Plain `ffprobe` lists the font streams as `Attachment: none`.
- Plain `ffprobe` also prints warnings such as `Could not find codec
parameters for stream 4 (Attachment: none): unknown codec` and later
`Unsupported codec with id 0 for input stream ...`.
- The JSON produced by `FileProperties.FFPROBE_COMMAND_TOKENS`
(`ffprobe -hide_banner -show_format -show_streams -of json`) still exposes
the attachment streams clearly through `codec_type="attachment"` and the
attachment tags.
- In that JSON, the attachment streams do not expose `codec_name`.
This last point is important for FFX: robust detection must not depend on
attachment `codec_name` being present.
## Detection Guidance
Current known indicators for this format are:
- one or more subtitle streams with `codec_type="subtitle"` and
`codec_name="ass"`
- one or more attachment streams with `codec_type="attachment"`
- attachment tags that identify embedded fonts, especially
`tags.mimetype="font/ttf"`
- attachment filenames that end in `.ttf`
The pattern can vary. FFX should therefore treat the above as a cluster of
signals rather than an exact signature tied to one file.
Inference from the observed sample plus FFmpeg documentation:
- MIME matching should not be limited to `font/ttf` alone.
- The Boruto sample uses `font/ttf`.
- FFmpeg's Matroska attachment example uses
`mimetype=application/x-truetype-font` for a `.ttf` attachment.
- Detection should therefore normalize multiple TTF-like MIME values rather
than depend on a single exact string.
## Processing Expectations In FFX
The format-specific requirements live in
`requirements/source_file_formats.md`. In practical terms, FFX should:
- recognize the ASS-plus-font-attachment pattern even when attachment probe
data is incomplete
- tell the operator that the pattern was detected and that special handling is
being used
- reject sidecar subtitle import for such sources, because converting or
replacing these subtitle tracks with ordinary external text subtitles would
break the intended subtitle package
- continue to allow normal manipulation of the ASS subtitle tracks themselves
- preserve the font attachment streams unchanged
## FFmpeg Notes
Relevant FFmpeg documentation confirms several behaviors that line up with
FFX's needs:
- FFmpeg documents `-attach` as adding an attachment stream to the output, and
explicitly names Matroska fonts used in subtitle rendering as an example.
- FFmpeg documents attachment streams as regular streams that are created after
the mapped media streams.
- FFmpeg documents `-dump_attachment` for extracting attachment streams, which
is useful for debugging or validating a source file's embedded fonts.
- FFmpeg's Matroska example requires a `mimetype` metadata tag for attached
fonts, which is consistent with using attachment tags as detection signals.
- FFmpeg also notes that attachments are implemented as codec extradata. That
helps explain why probe output for attachment streams can look different from
ordinary audio, video, and subtitle streams.
Implication for FFX:
- Attachment preservation is not an optional cosmetic feature for this format.
It is part of preserving the subtitle package correctly.
## Jellyfin Notes
Jellyfin's documentation also supports keeping this format intact:
- Jellyfin's subtitle compatibility table lists `ASS/SSA` as supported in
`MKV` and not supported in `MP4`.
- Jellyfin notes that when subtitles must be transcoded, they are either
converted to a supported format or burned into the video, and burning them in
is the most CPU-intensive path.
- Jellyfin's subtitle-extraction example for `SSA/ASS` first dumps attachment
streams and then extracts the ASS subtitle stream, which reflects the real
relationship between ASS subtitles and embedded fonts in MKV releases.
- Jellyfin's font documentation says text-based subtitles require fonts to
render properly.
- Jellyfin's configuration documentation says the web client uses configured
fallback fonts for ASS subtitles when other fonts such as MKV attachments or
client-side fonts are not available.
Inference from the Jellyfin compatibility tables:
- Keeping this subtitle format in Matroska is the safest interoperability
choice for Jellyfin consumers.
- Converting the subtitle payload to WebVTT would lose styled ASS behavior.
- Dropping the attachment streams would force client or fallback font
substitution and can change appearance or glyph coverage.
## References
- FFmpeg documentation: https://ffmpeg.org/ffmpeg.html
- Jellyfin codec support: https://jellyfin.org/docs/general/clients/codec-support/
- Jellyfin configuration and fonts: https://jellyfin.org/docs/general/administration/configuration/

View File

@@ -1,13 +1,13 @@
[project]
name = "ffx"
description = "FFX recoding and metadata managing tool"
version = "0.2.6"
version = "0.3.1"
license = {file = "LICENSE.md"}
dependencies = [
"requests",
"jinja2",
"click",
"textual",
"textual>=8.0",
"sqlalchemy",
]
readme = {file = "README.md", content-type = "text/markdown"}

View File

@@ -104,6 +104,11 @@ The main missing pieces are:
- `METADATA_EDITOR-0013`: The command shall write changes through an ffmpeg
stream-copy remux workflow only. No transcoding shall be performed as part of
`ffx edit`.
- `METADATA_EDITOR-0013A`: The ffmpeg invocation used by `ffx edit` shall map
all source streams with `-map 0` and shall copy all mapped streams with a
single `-c copy`. It shall not emit conversion-style per-stream `-map` or
`-c:*` options that could drop, reorder, or transcode streams during a
metadata-only edit.
- `METADATA_EDITOR-0014`: Because ffmpeg cannot rewrite the source file in
place, `ffx edit` shall write to a temporary output file on the same
filesystem as the source file and shall replace the original path only after
@@ -130,13 +135,19 @@ The main missing pieces are:
configuration shall be surfaced clearly in the UI and in the planned-changes
view. If those rules are applied during save, the operator shall be able to
tell that the file will be cleaned in addition to any manual edits.
- `METADATA_EDITOR-0022`: The command shall provide an invocation-level way to
disable config-driven cleanup when the operator wants a pure manual metadata
edit without automatic tag removal.
- `METADATA_EDITOR-0022`: Edit mode shall provide an in-screen operator toggle
for config-driven cleanup so a user can switch between pure manual metadata
edits and metadata edits plus configured tag cleanup without leaving the
editor.
- `METADATA_EDITOR-0023`: The existing global `--dry-run` behavior shall apply
to `ffx edit`. In dry-run mode the command shall not replace the original
file and shall expose the planned write operation clearly enough for the user
to understand what would happen.
- `METADATA_EDITOR-0024`: Every ffmpeg invocation performed by `ffx edit`
shall be surfaced to the operator as a notification in the edit UI.
- `METADATA_EDITOR-0025`: When application verbosity is greater than zero, the
notification for an `ffx edit` ffmpeg invocation shall include the concrete
ffmpeg command line.
## Acceptance
@@ -154,6 +165,8 @@ The main missing pieces are:
- The planned-changes view reflects manual edits relative to the original file
and, when enabled, any configured cleanup removals.
- No rendered Rich or Textual color markup appears in the saved file metadata.
- Saving metadata with files that contain PGS subtitle tracks or other
non-text subtitle codecs preserves those streams instead of dropping them.
- If ffmpeg fails while saving, the original file remains present and readable
at the original path.
- In dry-run mode, the original file remains untouched.

View File

@@ -98,7 +98,7 @@
- Intended for local execution, not server deployment.
- Stores default state in `~/.local/etc/ffx.json`, `~/.local/var/ffx/ffx.db`, and `~/.local/var/log/ffx.log`.
- Timeline constraints:
- The current implemented scope reflects a compact alpha release stream up to version `0.2.6`.
- The current implemented scope reflects a compact alpha release stream up to version `0.3.1`.
- Team capacity assumptions:
- Maintained as a small codebase where simple patterns and direct controller logic are preferred over framework-heavy abstractions.
- Third-party dependencies:

View File

@@ -0,0 +1,90 @@
# Source File Formats
This file defines source-file-format-specific processing requirements for FFX.
It is intended to grow as additional relevant source file types are identified.
The first covered format is Matroska media that contains styled ASS/SSA
subtitle streams together with embedded font attachments.
## Scope
- Detecting source files that use ASS subtitle streams together with embedded
font attachments needed for correct rendering.
- Defining the required `ffx convert` behavior when this format is present.
- Preserving the required attachment streams during conversion.
- Keeping normal subtitle-track manipulation behavior for the ASS subtitle
tracks themselves.
## Out Of Scope
- General subtitle behavior for sources that do not carry this pattern.
- A complete catalog of all source file formats FFX may support later.
## Terms
- `styled ASS source`: a source media file that contains one or more subtitle
streams with `codec_type="subtitle"` and `codec_name="ass"` together with
one or more font-bearing attachment streams.
- `font attachment`: an attachment stream whose metadata identifies a font
payload, commonly through `tags.mimetype` and attachment filename metadata.
- `external subtitle feed`: subtitle tracks supplied from separate subtitle
files through the existing subtitle-import path.
- `special attachment subtracks`: the embedded font attachment streams that
belong to the styled ASS source pattern.
## Rules
- `SOURCE_FILE_FORMATS-0001`: The system shall recognize the styled ASS source
pattern.
- `SOURCE_FILE_FORMATS-0002`: Recognition shall not depend on fixed stream
counts, fixed stream indices, or one exact attachment count.
- `SOURCE_FILE_FORMATS-0003`: Recognition shall use the best available ffprobe
signals. For known subtitle streams this includes
`codec_type="subtitle"` together with `codec_name="ass"`.
- `SOURCE_FILE_FORMATS-0004`: Recognition of the special attachment subtracks
shall use attachment-oriented signals such as `codec_type="attachment"` and
font-identifying metadata such as `tags.mimetype="font/ttf"` when present.
- `SOURCE_FILE_FORMATS-0005`: Recognition shall tolerate known ffprobe
variation in attachment reporting, including files where attachment streams
do not expose a `codec_name` but do expose `codec_type="attachment"` and
font-identifying tags.
- `SOURCE_FILE_FORMATS-0006`: When attachment metadata varies across files,
detection shall not depend on one exact MIME string alone. Detection shall
be written so the known pattern can vary while still recognizing font
attachments.
- `SOURCE_FILE_FORMATS-0007`: When the styled ASS source pattern is detected,
`ffx convert` shall emit an operator-facing message that reports the
detection and hints that special subtitle preservation handling is being
applied.
- `SOURCE_FILE_FORMATS-0008`: When the styled ASS source pattern is present on
the source file, `ffx convert` shall not process an external subtitle feed.
The command shall stop before conversion and report an error that explains
that separate subtitle-file import is incompatible with this source format.
- `SOURCE_FILE_FORMATS-0009`: Normal manipulation of the ASS subtitle streams
themselves shall continue to work through the usual selection, ordering,
metadata, language, title, and disposition handling paths.
- `SOURCE_FILE_FORMATS-0010`: The special attachment subtracks shall be
preserved in the target media file as-is rather than transcoded,
regenerated, or replaced from external sources.
- `SOURCE_FILE_FORMATS-0011`: Preserving the special attachment subtracks
as-is includes retaining the attachment payload and the attachment metadata
required by consumers, especially attachment filename and mimetype
information.
- `SOURCE_FILE_FORMATS-0012`: This file shall remain the extension point for
additional source-file-format contracts as FFX adds support for more special
source formats.
## Acceptance
- A source file matching the observed pattern of embedded ASS subtitles plus
font attachments is recognized even when the attachment streams do not carry
a `codec_name`.
- `ffx convert` output contains a clear detection message before the actual
conversion work proceeds.
- If external subtitle import is requested for such a source file, the command
fails fast with an explicit error instead of mixing sidecar subtitles into
the job.
- Existing manipulation of the ASS subtitle tracks still works for metadata,
titles, languages, ordering, and dispositions.
- The output media preserves the required font attachment streams and their
identifying metadata needed by downstream media players.

View File

@@ -0,0 +1,67 @@
from enum import Enum
import os
class AttachmentFormat(Enum):
TTF = {'identifier': 'ttf', 'format': None, 'extension': 'ttf', 'label': 'TTF'}
PNG = {'identifier': 'png', 'format': None, 'extension': 'png', 'label': 'PNG'}
UNKNOWN = {'identifier': 'unknown', 'format': None, 'extension': None, 'label': 'UNKNOWN'}
def identifier(self):
return str(self.value['identifier'])
def label(self):
return str(self.value['label'])
def format(self):
return self.value['format']
def extension(self):
return str(self.value['extension'])
@staticmethod
def identify(identifier: str):
formats = [f for f in AttachmentFormat if f.value['identifier'] == str(identifier)]
if formats:
return formats[0]
return AttachmentFormat.UNKNOWN
@staticmethod
def identifyFfprobeStream(streamObj: dict):
identifier = streamObj.get("codec_name")
identifiedFormat = AttachmentFormat.identify(identifier)
if identifiedFormat != AttachmentFormat.UNKNOWN:
return identifiedFormat
if str(streamObj.get("codec_type", "")).strip() != "attachment":
return AttachmentFormat.UNKNOWN
tags = streamObj.get("tags", {}) or {}
mimetype = str(tags.get("mimetype", "")).strip().lower()
filename = str(tags.get("filename", "")).strip().lower()
filenameExtension = os.path.splitext(filename)[1]
if (
mimetype in {
"font/ttf",
"application/x-truetype-font",
"application/x-font-ttf",
}
or "truetype" in mimetype
or filenameExtension == ".ttf"
):
return AttachmentFormat.TTF
if mimetype in {"image/png", "image/x-png"} or filenameExtension == ".png":
return AttachmentFormat.PNG
return AttachmentFormat.UNKNOWN
@staticmethod
def fromTrackCodec(trackCodec):
identifier = getattr(trackCodec, "identifier", None)
if callable(identifier):
return AttachmentFormat.identify(trackCodec.identifier())
return AttachmentFormat.UNKNOWN

View File

@@ -252,9 +252,15 @@ def buildRenameTargetFilename(
@click.pass_context
@click.option('--language', 'app_language', type=str, default='', help='Set application language')
@click.option('--database-file', type=str, default='', help='Path to database file')
@click.option(
'--debug',
is_flag=True,
default=False,
help='Enable debug-only TUI diagnostics such as the log pane',
)
@click.option('-v', '--verbose', type=int, default=0, help='Set verbosity of output')
@click.option("--dry-run", is_flag=True, default=False)
def ffx(ctx, app_language, database_file, verbose, dry_run):
def ffx(ctx, app_language, database_file, debug, verbose, dry_run):
"""FFX"""
ctx.obj = {}
@@ -274,6 +280,7 @@ def ffx(ctx, app_language, database_file, verbose, dry_run):
)
set_current_language(resolvedLanguage)
ctx.obj['language'] = resolvedLanguage
ctx.obj['debug'] = bool(debug)
if ctx.invoked_subcommand in LIGHTWEIGHT_COMMANDS:
ctx.obj['dry_run'] = dry_run
@@ -287,6 +294,7 @@ def ffx(ctx, app_language, database_file, verbose, dry_run):
ctx.obj['dry_run'] = dry_run
ctx.obj['verbosity'] = verbose
ctx.obj['debug'] = bool(debug)
ctx.obj['language'] = resolve_application_language(
cli_language=app_language,
config_language=ctx.obj['config'].getLanguage(),
@@ -391,6 +399,20 @@ def runScriptWrapper(ctx, scriptPath, missingDescription, commandArgs):
ctx.exit(completed.returncode)
def runTuiApp(ctx) -> None:
from ffx.ffx_app import FfxApp
from ffx.logging_utils import set_ffx_console_logging_enabled
logger = ctx.obj.get('logger')
set_ffx_console_logging_enabled(logger, enabled=False)
try:
app = FfxApp(ctx.obj)
app.run()
finally:
set_ffx_console_logging_enabled(logger, enabled=True)
@ffx.command(name='setup')
@click.pass_context
@click.option('--check', is_flag=True, default=False, help='Only verify bundle-setup readiness')
@@ -527,14 +549,11 @@ def inspect(ctx, shift, filenames):
if len(filenames) != 1:
raise click.ClickException("Inspect without --shift requires exactly one filename.")
from ffx.ffx_app import FfxApp
ctx.obj['command'] = 'inspect'
ctx.obj['arguments'] = {}
ctx.obj['arguments']['filename'] = filenames[0]
app = FfxApp(ctx.obj)
app.run()
runTuiApp(ctx)
@ffx.command()
@@ -544,8 +563,6 @@ def edit(ctx, filename):
if not os.path.isfile(filename):
raise click.ClickException(f"File not found: {filename}")
from ffx.ffx_app import FfxApp
ctx.obj['command'] = 'edit'
ctx.obj['arguments'] = {'filename': filename}
ctx.obj['use_pattern'] = False
@@ -554,8 +571,7 @@ def edit(ctx, filename):
ctx.obj['apply_metadata_normalization'] = True
ctx.obj['resource_limits'] = ctx.obj.get('resource_limits', {})
app = FfxApp(ctx.obj)
app.run()
runTuiApp(ctx)
@ffx.command()
@@ -615,29 +631,33 @@ def rename(ctx, paths, prefix, season, suffix, dry_run):
def getUnmuxSequence(trackDescriptor: TrackDescriptor, sourcePath, targetPrefix, targetDirectory = ''):
from ffx.track_codec import TrackCodec
from ffx.track_type import TrackType
# executable and input file
commandTokens = list(FFMPEG_COMMAND_TOKENS) + ['-i', sourcePath]
trackType = trackDescriptor.getType()
trackCodec = trackDescriptor.getCodec()
trackFormat = trackDescriptor.getFormatDescriptor()
targetPathBase = os.path.join(targetDirectory, targetPrefix) if targetDirectory else targetPrefix
# mapping
commandTokens += ['-map',
f"0:{trackType.indicator()}:{trackDescriptor.getSubIndex()}",
'-c',
'copy']
commandTokens += ['-map', f"0:{trackType.indicator()}:{trackDescriptor.getSubIndex()}"]
trackCodec = trackDescriptor.getCodec()
if trackType == TrackType.VIDEO and trackCodec == TrackCodec.H265:
commandTokens += ['-c:v', 'copy', '-bsf:v', 'hevc_mp4toannexb']
else:
commandTokens += ['-c', 'copy']
# output format
codecFormat = trackCodec.format()
codecFormat = trackFormat.format()
if codecFormat is not None:
commandTokens += ['-f', codecFormat]
# output filename
commandTokens += [f"{targetPathBase}.{trackCodec.extension()}"]
commandTokens += [f"{targetPathBase}.{trackFormat.extension()}"]
return commandTokens
@@ -752,7 +772,7 @@ def unmux(ctx,
if not ctx.obj['dry_run']:
#TODO #425: Codec Enum
ctx.obj['logger'].info(f"Unmuxing stream {trackDescriptor.getIndex()} into file {targetPrefix}.{trackDescriptor.getCodec().extension()}")
ctx.obj['logger'].info(f"Unmuxing stream {trackDescriptor.getIndex()} into file {targetPrefix}.{trackDescriptor.getFormatDescriptor().extension()}")
ctx.obj['logger'].debug(f"Executing unmuxing sequence")
@@ -837,12 +857,8 @@ def cropdetect(ctx,
@click.pass_context
def shows(ctx):
from ffx.ffx_app import FfxApp
ctx.obj['command'] = 'shows'
app = FfxApp(ctx.obj)
app.run()
runTuiApp(ctx)
def checkUniqueDispositions(context, mediaDescriptor: MediaDescriptor):
@@ -943,7 +959,6 @@ def checkUniqueDispositions(context, mediaDescriptor: MediaDescriptor):
metavar="DURATION|START,DURATION",
is_flag=False,
flag_value=DEFAULT_CUT_OPTION_VALUE,
default=None,
callback=normalizeCutOption,
help=CUT_OPTION_HELP,
)
@@ -1299,10 +1314,12 @@ def convert(ctx,
sourceMediaDescriptor = mediaFileProperties.getMediaDescriptor()
from ffx.attachment_format import AttachmentFormat
if ([smd for smd in sourceMediaDescriptor.getSubtitleTracks()
if smd.getCodec() == TrackCodec.ASS]
and [amd for amd in sourceMediaDescriptor.getAttachmentTracks()
if amd.getCodec() == TrackCodec.TTF]):
if amd.getAttachmentFormat() == AttachmentFormat.TTF]):
targetFormat = ''
targetExtension = 'mkv'

View File

@@ -3,6 +3,7 @@ from textual.screen import Screen
from textual.widgets import Button, Footer, Header, Static
from .i18n import t
from .screen_support import build_screen_log_pane
class ConfirmScreen(Screen):
@@ -58,8 +59,16 @@ class ConfirmScreen(Screen):
yield Button(self.__confirmLabel, id="confirm_button")
yield Button(self.__cancelLabel, id="cancel_button")
yield build_screen_log_pane()
yield Footer()
def on_mount(self):
if getattr(self, 'context', {}).get('debug', False):
self.title = f"{self.app.title} - {self.__class__.__name__}"
def on_button_pressed(self, event: Button.Pressed) -> None:
if event.button.id == "confirm_button":
self.dismiss(True)

View File

@@ -1,4 +1,4 @@
VERSION='0.2.6'
VERSION='0.3.1'
DATABASE_VERSION = 3
DEFAULT_QUALITY = 32

View File

@@ -4,6 +4,7 @@ from .i18n import set_current_language, t
from .shows_screen import ShowsScreen
from .inspect_details_screen import InspectDetailsScreen
from .media_edit_screen import MediaEditScreen
from .screen_support import configure_screen_log_handler, set_screen_log_pane_enabled
class FfxApp(App):
@@ -22,6 +23,13 @@ class FfxApp(App):
# Data 'input' variable
self.context = context
set_current_language(self.context.get("language"))
debug_mode = bool(self.context.get("debug", False))
set_screen_log_pane_enabled(debug_mode)
configure_screen_log_handler(
self.context.get("logger"),
self,
enabled=debug_mode,
)
def on_mount(self) -> None:

View File

@@ -1,4 +1,5 @@
import os, click
import os, click, subprocess
from functools import lru_cache
from logging import Logger
from ffx.media_descriptor_change_set import MediaDescriptorChangeSet
@@ -61,6 +62,41 @@ class FfxController():
sourceMediaDescriptor)
self.__logger: Logger = context['logger']
self.__warnedH264Fallback = False
@staticmethod
@lru_cache(maxsize=None)
def isFfmpegEncoderAvailable(encoderName: str) -> bool:
completed = subprocess.run(
["ffmpeg", "-encoders"],
capture_output=True,
text=True,
check=False,
)
if completed.returncode != 0:
return False
resolvedEncoderName = str(encoderName).strip()
for line in completed.stdout.splitlines():
if not line.startswith(" "):
continue
tokens = line.split(maxsplit=2)
if len(tokens) >= 2 and tokens[1] == resolvedEncoderName:
return True
return False
@classmethod
def getSupportedSoftwareH264Encoder(cls) -> str | None:
if cls.isFfmpegEncoderAvailable("libx264"):
return "libx264"
if cls.isFfmpegEncoderAvailable("libopenh264"):
return "libopenh264"
return None
def executeCommandSequence(self, commandSequence):
@@ -79,11 +115,28 @@ class FfxController():
# -c:v libx264 -preset slow -crf 17
def generateH264Tokens(self, quality, subIndex : int = 0):
h264Encoder = self.getSupportedSoftwareH264Encoder()
if h264Encoder == "libx264":
return [f"-c:v:{int(subIndex)}", 'libx264',
"-preset", "slow",
'-crf', str(quality)]
if h264Encoder == "libopenh264":
if not self.__warnedH264Fallback:
self.__logger.warning(
"libx264 encoder unavailable; falling back to libopenh264 for H.264 encoding."
)
self.__warnedH264Fallback = True
return [f"-c:v:{int(subIndex)}", 'libopenh264',
'-pix_fmt', 'yuv420p']
raise click.ClickException(
"H.264 encoding requested but no supported software H.264 encoder is available. "
+ "Tried libx264 and libopenh264."
)
# -c:v:0 libvpx-vp9 -row-mt 1 -crf 32 -pass 1 -speed 4 -frame-parallel 0 -g 9999 -aq-mode 0
def generateVP9Pass1Tokens(self, quality, subIndex : int = 0):

View File

@@ -3,7 +3,7 @@ from textual.screen import Screen
from textual.widgets import Footer, Placeholder
from .i18n import t
from .screen_support import go_back_or_exit
from .screen_support import build_screen_log_pane, go_back_or_exit
class HelpScreen(Screen):
BINDINGS = [
@@ -17,7 +17,15 @@ class HelpScreen(Screen):
def compose(self) -> ComposeResult:
# Row 1
yield Placeholder(t("Help Screen"))
yield build_screen_log_pane()
yield Footer()
def on_mount(self):
if getattr(self, 'context', {}).get('debug', False):
self.title = f"{self.app.title} - {self.__class__.__name__}"
def action_back(self):
go_back_or_exit(self)

View File

@@ -6,12 +6,23 @@ from .configuration_controller import ConfigurationController
from .logging_utils import get_ffx_logger
from .show_descriptor import ShowDescriptor
from enum import Enum
class EmptyStringUndefined(Undefined):
def __str__(self):
return ''
class LogLevel(Enum):
DEBUG = 'debug'
INFO = 'info'
WARNING = 'warning'
ERROR = 'error'
CRITICAL = 'critical'
DIFF_ADDED_KEY = 'added'
DIFF_REMOVED_KEY = 'removed'
DIFF_CHANGED_KEY = 'changed'
@@ -119,7 +130,7 @@ def setDiff(a : set, b : set) -> set:
def permutateList(inputList: list, permutation: list):
# 0,1,2: ABC
# 0,2,1: ACB
# 0,2,1: ACBffmpeg:
# 1,2,0: BCA
pass

View File

@@ -8,6 +8,7 @@ from textual.widgets import Button, Footer, Header, Input, Static
from textual.widgets._data_table import CellDoesNotExist
from ffx.file_properties import FileProperties
from ffx.helper import DIFF_ADDED_KEY, DIFF_CHANGED_KEY, DIFF_REMOVED_KEY
from ffx.media_descriptor_change_set import MediaDescriptorChangeSet
from ffx.show_descriptor import ShowDescriptor
from ffx.track_descriptor import TrackDescriptor
@@ -18,6 +19,7 @@ from .pattern_details_screen import PatternDetailsScreen
from .screen_support import (
add_auto_table_column,
build_screen_controllers,
build_screen_log_pane,
go_back_or_exit,
localized_column_width,
update_table_column_label,
@@ -37,8 +39,8 @@ class InspectDetailsScreen(MediaWorkflowScreenBase):
CSS = f"""
Grid {{
grid-size: 6 11;
grid-rows: 9 2 2 2 2 8 2 2 2 8 8;
grid-size: 6 8;
grid-rows: 9 2 2 2 2 10 2 10;
grid-columns: {GRID_COLUMN_LABEL_MIN} {GRID_COLUMN_2} {GRID_COLUMN_3} {GRID_COLUMN_4} {GRID_COLUMN_5} {GRID_COLUMN_6};
height: 100%;
width: 100%;
@@ -86,6 +88,10 @@ class InspectDetailsScreen(MediaWorkflowScreenBase):
#differences-table {{
row-span: 10;
}}
.yellow {{
tint: yellow 40%;
}}
"""
@classmethod
@@ -155,6 +161,7 @@ class InspectDetailsScreen(MediaWorkflowScreenBase):
yield Static(" ")
yield self.differencesTable
# Row 2
yield Static(" ", classes="five")
@@ -163,33 +170,31 @@ class InspectDetailsScreen(MediaWorkflowScreenBase):
yield Button(t("Substitute"), id="pattern_button")
yield Static(" ", classes="three")
# Row 4
yield Static(t("Pattern"))
yield Input(type="text", id="pattern_input", classes="three")
yield Static(" ")
# Row 5
yield Static(" ", classes="five")
# Row 6
yield Static(t("Media Tags"))
yield self.mediaTagsTable
yield Static(" ", classes="two")
yield Static(" ")
# Row 7
yield Static(" ", classes="five")
# Row 8
yield Static(" ")
yield Button(t("Set Default"), id="select_default_button")
yield Button(t("Set Forced"), id="select_forced_button")
yield Static(" ", classes="two")
# Row 9
yield Static(t("Streams"))
yield self.tracksTable
yield Static(" ")
yield build_screen_log_pane()
yield Footer()
def _update_grid_layout(self) -> None:
@@ -205,6 +210,30 @@ class InspectDetailsScreen(MediaWorkflowScreenBase):
def action_back(self):
go_back_or_exit(self)
def getDisplayedMediaDescriptor(self):
if self._currentPattern is not None and self._targetMediaDescriptor is not None:
return self._targetMediaDescriptor
return self._sourceMediaDescriptor
def getTrackEditSourceDescriptor(self):
selectedTrackDescriptor = self.getSelectedTrackDescriptor()
if (
selectedTrackDescriptor is None
or self._currentPattern is None
or self._targetMediaDescriptor is None
):
return selectedTrackDescriptor
for sourceTrackDescriptor in self._sourceMediaDescriptor.getTrackDescriptors():
if (
sourceTrackDescriptor.getSourceIndex()
== selectedTrackDescriptor.getSourceIndex()
and sourceTrackDescriptor.getType() == selectedTrackDescriptor.getType()
):
return sourceTrackDescriptor
return None
def _build_shows_table(self):
from textual.widgets import DataTable
@@ -287,6 +316,10 @@ class InspectDetailsScreen(MediaWorkflowScreenBase):
self._update_show_header_labels()
def on_mount(self):
if getattr(self, 'context', {}).get('debug', False):
self.title = f"{self.app.title} - {self.__class__.__name__}"
self._update_grid_layout()
if self._currentPattern is None:
@@ -476,8 +509,6 @@ class InspectDetailsScreen(MediaWorkflowScreenBase):
self.updateDifferences()
return updated
self.reloadProperties(reset_draft=True)
tagDifferences = self._mediaChangeSetObj.get(MediaDescriptorChangeSet.TAGS_KEY, {})
for addedTagKey in tagDifferences.get(DIFF_ADDED_KEY, {}).keys():
self._tac.deleteMediaTagByKey(self._currentPattern.getId(), addedTagKey)
@@ -564,9 +595,6 @@ class InspectDetailsScreen(MediaWorkflowScreenBase):
)
def handle_edit_pattern(self, screenResult):
if not screenResult:
return
self.reloadProperties(reset_draft=True)
if self._currentPattern is not None:
self.query_one("#pattern_input", Input).value = self._currentPattern.getPattern()

View File

@@ -5,6 +5,7 @@ import os
FFX_LOGGER_NAME = "FFX"
CONSOLE_HANDLER_NAME = "ffx-console"
FILE_HANDLER_NAME = "ffx-file"
MUTED_CONSOLE_LEVEL = logging.CRITICAL + 1
def get_ffx_logger(name: str = FFX_LOGGER_NAME) -> logging.Logger:
@@ -66,3 +67,31 @@ def configure_ffx_logger(
)
return logger
def set_ffx_console_logging_enabled(
logger: logging.Logger | None,
*,
enabled: bool,
):
if logger is None:
return None
console_handler = next(
(handler for handler in logger.handlers if handler.get_name() == CONSOLE_HANDLER_NAME),
None,
)
if console_handler is None:
return None
if enabled:
saved_level = getattr(console_handler, "_ffx_saved_level", None)
if saved_level is not None:
console_handler.setLevel(saved_level)
delattr(console_handler, "_ffx_saved_level")
return console_handler
if not hasattr(console_handler, "_ffx_saved_level"):
console_handler._ffx_saved_level = console_handler.level
console_handler.setLevel(MUTED_CONSOLE_LEVEL)
return console_handler

View File

@@ -2,6 +2,7 @@ import os, re, click
from typing import List, Self
from ffx.attachment_format import AttachmentFormat
from ffx.track_type import TrackType
from ffx.iso_language import IsoLanguage
@@ -421,11 +422,11 @@ class MediaDescriptor:
if sourceMediaDescriptor:
fontDescriptors = [ftd for ftd in sourceMediaDescriptor.getAttachmentTracks()
if ftd.getCodec() == TrackCodec.TTF]
if ftd.getAttachmentFormat() == AttachmentFormat.TTF]
else:
fontDescriptors = [ftd for ftd in self.__trackDescriptors
if ftd.getType() == TrackType.ATTACHMENT
and ftd.getCodec() == TrackCodec.TTF]
and ftd.getAttachmentFormat() == AttachmentFormat.TTF]
for ad in sorted(fontDescriptors, key=lambda d: d.getIndex()):
inputMappingTokens += ["-map", f"0:{ad.getIndex()}"]

View File

@@ -203,7 +203,7 @@ class MediaDescriptorChangeSet():
if (
self.__applyMetadataNormalization
and trackDescriptor is not None
and trackDescriptor.getType() == TrackType.SUBTITLE
and trackDescriptor.getType() in (TrackType.VIDEO, TrackType.AUDIO, TrackType.SUBTITLE)
):
trackTitle = str(normalizedTrackTags.get("title", "")).strip()
fallbackTitle = str((fallbackTrackTags or {}).get("title", "")).strip()
@@ -260,6 +260,8 @@ class MediaDescriptorChangeSet():
# else:
# dispositionTokens += [f"-disposition:{streamIndicator}:{subIndex}", '0']
for ttd in self.__targetTrackDescriptors:
if ttd.getType() == TrackType.ATTACHMENT:
continue
targetDispositions = ttd.getDispositionSet()
streamIndicator = ttd.getType().indicator()
@@ -344,7 +346,7 @@ class MediaDescriptorChangeSet():
for tagKey, tagValue in self.normalizeTrackTags(
outputTrackTags,
trackDescriptor=trackDescriptor,
fallbackTrackTags=unchangedTrackTags | removedTrackTags,
fallbackTrackTags=trackDescriptor.getTags(),
).items():
metadataTokens += [f"-metadata:s:{trackDescriptor.getType().indicator()}"
+ f":{trackDescriptor.getSubIndex()}",
@@ -366,6 +368,7 @@ class MediaDescriptorChangeSet():
for tagKey, tagValue in self.normalizeTrackTags(
preservedTrackTags,
trackDescriptor=trackDescriptor,
fallbackTrackTags=trackDescriptor.getTags(),
).items():
metadataTokens += [f"-metadata:s:{trackDescriptor.getType().indicator()}"
+ f":{trackDescriptor.getSubIndex()}",

View File

@@ -1,6 +1,9 @@
import os
from time import monotonic
from textual import events, work
from textual.containers import Grid
from textual.worker import Worker, WorkerState
from textual.widgets import Button, Footer, Header, Static
from ffx.metadata_editor import apply_metadata_edits
@@ -9,11 +12,13 @@ from ffx.track_descriptor import TrackDescriptor
from .i18n import t
from .confirm_screen import ConfirmScreen
from .media_workflow_screen_base import MediaWorkflowScreenBase
from .screen_support import localized_column_width
from .screen_support import build_screen_log_pane, localized_column_width
from .tag_delete_screen import TagDeleteScreen
from .tag_details_screen import TagDetailsScreen
from .track_details_screen import TrackDetailsScreen
from .helper import LogLevel
class MediaEditScreen(MediaWorkflowScreenBase):
@@ -169,14 +174,27 @@ class MediaEditScreen(MediaWorkflowScreenBase):
yield Button(t("Quit"), id="quit_button")
yield Static(" ")
yield build_screen_log_pane()
yield Footer()
def on_mount(self):
if getattr(self, 'context', {}).get('debug', False):
self.title = f"{self.app.title} - {self.__class__.__name__}"
self._update_grid_layout()
self.updateMediaTags()
self.updateTracks()
self.updateDifferences()
self.updateToggleButtons()
self._applyChangesWorker = None
def on_screen_resume(self, _event: events.ScreenResume) -> None:
if not hasattr(self, "tracksTable"):
return
self.refreshAfterDraftChange()
self.updateToggleButtons()
def _update_grid_layout(self) -> None:
leftColumnWidth = max(
@@ -195,6 +213,41 @@ class MediaEditScreen(MediaWorkflowScreenBase):
if self._messageText:
self.notify(self._messageText)
def workerLoggingHandler(self,
message: str,
level: LogLevel = LogLevel.INFO) -> None:
if level == LogLevel.DEBUG:
self.context["logger"].debug(str(message))
elif level == LogLevel.INFO:
self.context["logger"].info(str(message))
elif level == LogLevel.WARNING:
self.context["logger"].warning(str(message))
elif level == LogLevel.ERROR:
self.context["logger"].error(str(message))
elif level == LogLevel.CRITICAL:
self.context["logger"].critical(str(message))
else:
raise Exception(f"Undefined Logging Level (msg={message})")
def _report_apply_timings(self, applyResult: dict, reloadSeconds: float = 0.0) -> None:
timings = dict(applyResult.get("timings", {}))
ffmpegSeconds = float(timings.get("ffmpeg_seconds", 0.0))
replaceSeconds = float(timings.get("replace_seconds", 0.0))
writeSeconds = float(timings.get("write_seconds", ffmpegSeconds + replaceSeconds))
reloadSeconds = float(reloadSeconds)
totalSeconds = writeSeconds + reloadSeconds
timingSummary = (
f"ffx edit timings: ffmpeg={ffmpegSeconds:.2f}s "
+ f"replace={replaceSeconds:.2f}s "
+ f"reload={reloadSeconds:.2f}s "
+ f"total={totalSeconds:.2f}s"
)
self.context["logger"].info(timingSummary)
def updateToggleButtons(self):
self._set_toggle_button_state(
"#cleanup_toggle_button",
@@ -296,6 +349,7 @@ class MediaEditScreen(MediaWorkflowScreenBase):
def action_toggle_normalization(self):
self.setApplyNormalization(not self._applyNormalization)
self.updateToggleButtons()
self.updateTracks()
self.updateDifferences()
self.setMessage(
t("Normalization enabled.")
@@ -323,30 +377,35 @@ class MediaEditScreen(MediaWorkflowScreenBase):
if trackDescriptor is None:
return
updatedTracks = []
nextSourceMediaDescriptor = self._sourceMediaDescriptor.clone(context=self.context)
updatedTracks = nextSourceMediaDescriptor.getTrackDescriptors()
replacementTrack = trackDescriptor.clone(context=self.context)
replaced = False
for currentTrack in self._sourceMediaDescriptor.getTrackDescriptors():
if (
currentTrack.getIndex() == trackDescriptor.getIndex()
and currentTrack.getSubIndex() == trackDescriptor.getSubIndex()
):
updatedTracks.append(trackDescriptor)
for trackIndex, currentTrack in enumerate(updatedTracks):
sameSourceTrack = (
currentTrack.getSourceIndex() == replacementTrack.getSourceIndex()
and currentTrack.getType() == replacementTrack.getType()
)
sameVisibleTrack = (
currentTrack.getIndex() == replacementTrack.getIndex()
and currentTrack.getSubIndex() == replacementTrack.getSubIndex()
)
if sameSourceTrack or sameVisibleTrack:
updatedTracks[trackIndex] = replacementTrack
replaced = True
else:
updatedTracks.append(currentTrack)
break
if not replaced:
self.setMessage(t("Unable to update selected stream."))
return
self._sourceMediaDescriptor = self._sourceMediaDescriptor.clone(context=self.context)
self._sourceMediaDescriptor.getTrackDescriptors().clear()
self._sourceMediaDescriptor.getTrackDescriptors().extend(updatedTracks)
self._sourceMediaDescriptor = nextSourceMediaDescriptor
self.setMessage(
t(
"Updated stream #{index} ({track_type}).",
index=trackDescriptor.getIndex(),
track_type=t(trackDescriptor.getType().label()),
index=replacementTrack.getIndex(),
track_type=t(replacementTrack.getType().label()),
)
)
self.refreshAfterDraftChange()
@@ -356,33 +415,77 @@ class MediaEditScreen(MediaWorkflowScreenBase):
self.setMessage(t("No changes to apply."))
return
try:
applyResult = apply_metadata_edits(
if self._applyChangesWorker is not None and self._applyChangesWorker.is_running:
self.setMessage(t("Apply already running."))
return
self.context["logger"].info(
t("Starting metadata apply for {filename}.", filename=self._mediaFilename)
)
self._applyChangesWorker = self.run_apply_changes_worker()
@work(
thread=True,
exclusive=True,
group="media-edit-apply",
exit_on_error=False,
)
def run_apply_changes_worker(self):
return apply_metadata_edits(
self.context,
self._mediaFilename,
self._baselineMediaDescriptor,
self._sourceMediaDescriptor,
loggingHandler = self.workerLoggingHandler,
)
except Exception as ex:
self.context["logger"].exception(
"Failed to apply metadata edits for %s",
self._mediaFilename,
)
self.setMessage(t("Apply failed: {error}", error=ex))
def on_worker_state_changed(self, event: Worker.StateChanged) -> None:
if event.worker is not self._applyChangesWorker:
return
if event.state == WorkerState.ERROR:
error = event.worker.error
if error is not None:
self.context["logger"].error(
"Failed to apply metadata edits for %s",
self._mediaFilename,
exc_info=(type(error), error, error.__traceback__),
)
self.setMessage(t("Apply failed: {error}", error=error))
self._applyChangesWorker = None
return
if event.state != WorkerState.SUCCESS:
return
applyResult = event.worker.result or {}
if applyResult.get("dry_run", False):
self._report_apply_timings(applyResult, reloadSeconds=0.0)
self.context["logger"].info(
t(
"Dry-run prepared temporary output {target_path}.",
target_path=applyResult["target_path"],
),
)
self.setMessage(
t(
"Dry-run: would rewrite via temporary file {target_path}",
target_path=applyResult["target_path"],
)
)
self._applyChangesWorker = None
return
reloadStart = monotonic()
self.context["logger"].info(t("Reloading file after metadata write."))
self.reloadProperties(reset_draft=True)
self.refreshAfterDraftChange()
reloadSeconds = monotonic() - reloadStart
self._report_apply_timings(applyResult, reloadSeconds=reloadSeconds)
self.context["logger"].info(t("Changes applied and file reloaded."))
self.setMessage(t("Changes applied and file reloaded."))
self._applyChangesWorker = None
def action_revert_changes(self):
if not self.hasPendingChanges():

View File

@@ -9,6 +9,8 @@ from textual.widgets._data_table import CellDoesNotExist
from ffx.audio_layout import AudioLayout
from ffx.file_properties import FileProperties
from ffx.helper import DIFF_ADDED_KEY, DIFF_CHANGED_KEY, DIFF_REMOVED_KEY
from ffx.iso_language import IsoLanguage
from ffx.media_descriptor import MediaDescriptor
from ffx.media_descriptor_change_set import MediaDescriptorChangeSet
from ffx.track_descriptor import TrackDescriptor
from ffx.track_disposition import TrackDisposition
@@ -123,6 +125,24 @@ class MediaWorkflowScreenBase(Screen):
add_auto_table_column(self.differencesTable, t(self.DIFFERENCES_COLUMN_LABEL))
self.differencesTable.cursor_type = "row"
def _track_codec_cell_value(self, trackDescriptor: TrackDescriptor) -> str:
if trackDescriptor.getType() == TrackType.ATTACHMENT:
return " "
return trackDescriptor.getFormatDescriptor().label()
def _track_disposition_cell_value(
self,
trackDescriptor: TrackDescriptor,
disposition: TrackDisposition,
) -> str:
if trackDescriptor.getType() == TrackType.ATTACHMENT:
return " "
return (
t("Yes")
if disposition in trackDescriptor.getDispositionSet()
else t("No")
)
def reloadProperties(self, reset_draft: bool = True):
self._mediaFileProperties = FileProperties(self.context, self._mediaFilename)
probedMediaDescriptor = self._mediaFileProperties.getMediaDescriptor()
@@ -170,10 +190,17 @@ class MediaWorkflowScreenBase(Screen):
def hasPendingChanges(self) -> bool:
return bool(self._mediaChangeSetObj)
def getDisplayedMediaDescriptor(self) -> MediaDescriptor | None:
return self._sourceMediaDescriptor
def getTrackEditSourceDescriptor(self) -> TrackDescriptor | None:
return self.getSelectedTrackDescriptor()
def updateMediaTags(self):
displayedMediaDescriptor = self.getDisplayedMediaDescriptor()
self._sourceMediaTagRowData = populate_tag_table(
self.mediaTagsTable,
self._sourceMediaDescriptor.getTags(),
displayedMediaDescriptor.getTags() if displayedMediaDescriptor is not None else {},
ignore_keys=self._ignoreGlobalKeys,
remove_keys=self._removeGlobalKeys,
)
@@ -183,8 +210,14 @@ class MediaWorkflowScreenBase(Screen):
self._configure_tracks_table_columns()
self._trackRowData = {}
trackDescriptorList = self._sourceMediaDescriptor.getTrackDescriptors()
displayedMediaDescriptor = self.getDisplayedMediaDescriptor()
trackDescriptorList = (
displayedMediaDescriptor.getTrackDescriptors()
if displayedMediaDescriptor is not None
else []
)
typeCounter = {}
applyNormalization = bool(getattr(self, "_applyNormalization", False))
for trackDescriptor in trackDescriptorList:
trackType = trackDescriptor.getType()
@@ -193,19 +226,34 @@ class MediaWorkflowScreenBase(Screen):
dispositionSet = trackDescriptor.getDispositionSet()
audioLayout = trackDescriptor.getAudioLayout()
trackTitle = trackDescriptor.getTitle()
if (
applyNormalization
and not str(trackTitle).strip()
and trackType in (TrackType.VIDEO, TrackType.AUDIO, TrackType.SUBTITLE)
):
trackLanguage = trackDescriptor.getLanguage()
if trackLanguage != IsoLanguage.UNDEFINED:
trackTitle = trackLanguage.label()
row = (
trackDescriptor.getIndex(),
t(trackType.label()),
typeCounter[trackType],
trackDescriptor.getCodec().label(),
self._track_codec_cell_value(trackDescriptor),
t(audioLayout.label())
if trackType == TrackType.AUDIO
and audioLayout != AudioLayout.LAYOUT_UNDEFINED
else " ",
trackDescriptor.getLanguage().label(),
trackDescriptor.getTitle(),
t("Yes") if TrackDisposition.DEFAULT in dispositionSet else t("No"),
t("Yes") if TrackDisposition.FORCED in dispositionSet else t("No"),
trackTitle,
self._track_disposition_cell_value(
trackDescriptor,
TrackDisposition.DEFAULT,
),
self._track_disposition_cell_value(
trackDescriptor,
TrackDisposition.FORCED,
),
)
row_key = self.tracksTable.add_row(*map(str, row))
@@ -355,7 +403,7 @@ class MediaWorkflowScreenBase(Screen):
return None
def setSelectedTrackDefault(self):
selectedTrackDescriptor = self.getSelectedTrackDescriptor()
selectedTrackDescriptor = self.getTrackEditSourceDescriptor()
if selectedTrackDescriptor is None:
return False
@@ -366,7 +414,7 @@ class MediaWorkflowScreenBase(Screen):
return True
def setSelectedTrackForced(self):
selectedTrackDescriptor = self.getSelectedTrackDescriptor()
selectedTrackDescriptor = self.getTrackEditSourceDescriptor()
if selectedTrackDescriptor is None:
return False

View File

@@ -1,17 +1,23 @@
from __future__ import annotations
import click
import os
import tempfile
from time import monotonic
from .constants import (
DEFAULT_AC3_BANDWIDTH,
DEFAULT_DTS_BANDWIDTH,
DEFAULT_STEREO_BANDWIDTH,
FFMPEG_COMMAND_TOKENS,
)
from .ffx_controller import FfxController
from .media_descriptor import MediaDescriptor
from .media_descriptor_change_set import MediaDescriptorChangeSet
from .process import executeProcess, formatCommandSequence
from .video_encoder import VideoEncoder
from .helper import LogLevel
def create_temporary_output_path(source_path: str) -> str:
sourceDirectory = os.path.dirname(os.path.abspath(source_path)) or "."
@@ -49,41 +55,123 @@ def build_metadata_edit_context(context: dict) -> dict:
return editContext
def build_metadata_edit_command(
context: dict,
source_path: str,
target_path: str,
baseline_descriptor: MediaDescriptor,
draft_descriptor: MediaDescriptor,
) -> list[str]:
changeSet = MediaDescriptorChangeSet(context, draft_descriptor, baseline_descriptor)
return (
list(FFMPEG_COMMAND_TOKENS)
+ ["-i", source_path, "-map", "0", "-c", "copy"]
+ changeSet.generateMetadataTokens()
+ changeSet.generateDispositionTokens()
+ [target_path]
)
def notify_ffmpeg_invocation(
context: dict,
command_sequence: list[str],
*,
loggingHandler = None,
dry_run: bool = False,
) -> None:
loggingCallback = loggingHandler or context.get("logging_handler")
if not callable(loggingCallback):
return
verbosity = int(context.get("verbosity", 0) or 0)
if verbosity > 0:
if dry_run:
loggingCallback(f"ffmpeg dry-run: {formatCommandSequence(command_sequence)}", level = LogLevel.DEBUG)
else:
loggingCallback(f"ffmpeg: {formatCommandSequence(command_sequence)}", level = LogLevel.DEBUG)
return
loggingCallback("ffmpeg dry-run prepared.") if dry_run else loggingCallback(
"ffmpeg metadata write started."
)
def apply_metadata_edits(
context: dict,
source_path: str,
baseline_descriptor: MediaDescriptor,
draft_descriptor: MediaDescriptor,
*,
loggingHandler = None,
) -> dict[str, object]:
temporaryOutputPath = create_temporary_output_path(source_path)
editContext = build_metadata_edit_context(context)
controller = FfxController(editContext, draft_descriptor, baseline_descriptor)
try:
controller.runJob(
temporaryOutputPath = create_temporary_output_path(source_path)
editContext = build_metadata_edit_context(context)
commandSequence = build_metadata_edit_command(
editContext,
source_path,
temporaryOutputPath,
targetFormat="",
chainIteration=[],
baseline_descriptor,
draft_descriptor,
)
ffmpegSeconds = 0.0
replaceSeconds = 0.0
try:
if editContext.get("dry_run", False):
notify_ffmpeg_invocation(
editContext,
commandSequence,
loggingHandler = loggingHandler,
dry_run=True,
)
return {
"applied": False,
"dry_run": True,
"target_path": temporaryOutputPath,
"command_sequence": commandSequence,
"timings": {
"ffmpeg_seconds": ffmpegSeconds,
"replace_seconds": replaceSeconds,
"write_seconds": ffmpegSeconds + replaceSeconds,
},
}
notify_ffmpeg_invocation(editContext,
commandSequence,
loggingHandler = loggingHandler)
ffmpegStart = monotonic()
_out, err, rc = executeProcess(commandSequence, context=editContext)
ffmpegSeconds = monotonic() - ffmpegStart
if rc:
raise click.ClickException(f"ffmpeg edit failed: rc={rc} error={err}")
replaceStart = monotonic()
os.replace(temporaryOutputPath, source_path)
replaceSeconds = monotonic() - replaceStart
return {
"applied": True,
"dry_run": False,
"target_path": source_path,
"command_sequence": commandSequence,
"timings": {
"ffmpeg_seconds": ffmpegSeconds,
"replace_seconds": replaceSeconds,
"write_seconds": ffmpegSeconds + replaceSeconds,
},
}
except Exception:
if os.path.exists(temporaryOutputPath):
os.remove(temporaryOutputPath)
raise
finally:
if editContext.get("dry_run", False) and os.path.exists(temporaryOutputPath):
os.remove(temporaryOutputPath)

View File

@@ -4,6 +4,7 @@ from sqlalchemy.orm import relationship, declarative_base, sessionmaker
from .show import Base
from ffx.attachment_format import AttachmentFormat
from ffx.track_type import TrackType
from ffx.iso_language import IsoLanguage
@@ -132,9 +133,16 @@ class Track(Base):
if trackType in [t.label() for t in TrackType]:
if trackType == TrackType.ATTACHMENT.label():
storedFormatIdentifier = AttachmentFormat.identifyFfprobeStream(streamObj).identifier()
else:
storedFormatIdentifier = TrackCodec.identify(
streamObj.get(TrackDescriptor.FFPROBE_CODEC_KEY)
).identifier()
return cls(pattern_id = patternId,
track_type = trackType,
codec_name = streamObj[TrackDescriptor.FFPROBE_CODEC_NAME_KEY],
codec_name = storedFormatIdentifier,
disposition_flags = sum([2**t.index() for (k,v) in streamObj[TrackDescriptor.FFPROBE_DISPOSITION_KEY].items()
if v and (t := TrackDisposition.find(k)) is not None]),
audio_layout = AudioLayout.identify(streamObj))
@@ -153,8 +161,20 @@ class Track(Base):
return TrackType.fromIndex(self.track_type)
def getCodec(self) -> TrackCodec:
if self.getType() == TrackType.ATTACHMENT:
return TrackCodec.UNKNOWN
return TrackCodec.identify(self.codec_name)
def getAttachmentFormat(self) -> AttachmentFormat:
if self.getType() != TrackType.ATTACHMENT:
return AttachmentFormat.UNKNOWN
return AttachmentFormat.identify(self.codec_name)
def getFormatDescriptor(self):
if self.getType() == TrackType.ATTACHMENT:
return self.getAttachmentFormat()
return self.getCodec()
def getIndex(self):
return int(self.index) if self.index is not None else -1
@@ -206,6 +226,9 @@ class Track(Base):
kwargs[TrackDescriptor.SUB_INDEX_KEY] = subIndex
kwargs[TrackDescriptor.TRACK_TYPE_KEY] = self.getType()
if self.getType() == TrackType.ATTACHMENT:
kwargs[TrackDescriptor.ATTACHMENT_FORMAT_KEY] = self.getAttachmentFormat()
else:
kwargs[TrackDescriptor.CODEC_KEY] = self.getCodec()
kwargs[TrackDescriptor.DISPOSITION_SET_KEY] = self.getDispositionSet()

View File

@@ -134,7 +134,7 @@ class PatternController:
def _build_track_row(self, trackDescriptor: TrackDescriptor) -> Track:
track = Track(
track_type=int(trackDescriptor.getType().index()),
codec_name=str(trackDescriptor.getCodec().identifier()),
codec_name=str(trackDescriptor.getFormatDescriptor().identifier()),
index=int(trackDescriptor.getIndex()),
source_index=int(trackDescriptor.getSourceIndex()),
disposition_flags=int(

View File

@@ -7,7 +7,7 @@ from textual.containers import Grid
from .i18n import t
from .show_controller import ShowController
from .pattern_controller import PatternController
from .screen_support import go_back_or_exit
from .screen_support import build_screen_log_pane, go_back_or_exit
from ffx.model.pattern import Pattern
@@ -68,6 +68,10 @@ class PatternDeleteScreen(Screen):
def on_mount(self):
if getattr(self, 'context', {}).get('debug', False):
self.title = f"{self.app.title} - {self.__class__.__name__}"
if self.__showDescriptor:
self.query_one("#showlabel", Static).update(f"{self.__showDescriptor.getId()} - {self.__showDescriptor.getName()} ({self.__showDescriptor.getYear()})")
if not self.__pattern is None:
@@ -103,6 +107,7 @@ class PatternDeleteScreen(Screen):
yield Button(t("Delete"), id="delete_button")
yield Button(t("Cancel"), id="cancel_button")
yield build_screen_log_pane()
yield Footer()

View File

@@ -1,6 +1,7 @@
import click, re
from typing import List
from textual import events
from textual.screen import Screen
from textual.widgets import Header, Footer, Static, Button, Input, DataTable, TextArea
from textual.containers import Grid
@@ -18,6 +19,7 @@ from .screen_support import (
add_auto_table_column,
build_screen_bootstrap,
build_screen_controllers,
build_screen_log_pane,
go_back_or_exit,
populate_tag_table,
)
@@ -173,7 +175,7 @@ class PatternDetailsScreen(Screen):
row = (td.getIndex(),
t(trackType.label()),
typeCounter[trackType],
td.getCodec().label(),
td.getFormatDescriptor().label(),
t(audioLayout.label()) if trackType == TrackType.AUDIO
and audioLayout != AudioLayout.LAYOUT_UNDEFINED else ' ',
trackLanguage.label() if trackLanguage != IsoLanguage.UNDEFINED else ' ',
@@ -324,6 +326,9 @@ class PatternDetailsScreen(Screen):
def on_mount(self):
if getattr(self, 'context', {}).get('debug', False):
self.title = f"{self.app.title} - {self.__class__.__name__}"
if not self.__showDescriptor is None:
self.query_one("#showlabel", Static).update(f"{self.__showDescriptor.getId()} - {self.__showDescriptor.getName()} ({self.__showDescriptor.getYear()})")
@@ -341,6 +346,16 @@ class PatternDetailsScreen(Screen):
self.updateTracks()
self.updateShiftedSeasons()
def on_screen_resume(self, _event: events.ScreenResume) -> None:
if not hasattr(self, "tracksTable") or not hasattr(self, "tagsTable"):
return
self.updateTags()
self.updateTracks()
if self.__pattern is not None and hasattr(self, "shiftedSeasonsTable"):
self.updateShiftedSeasons()
def compose(self):
@@ -482,6 +497,7 @@ class PatternDetailsScreen(Screen):
# Row 20
yield Static(" ", classes="seven")
yield build_screen_log_pane()
yield Footer()

View File

@@ -1,13 +1,18 @@
from __future__ import annotations
import logging
import weakref
from collections.abc import Mapping
from dataclasses import dataclass
from rich.cells import cell_len
from rich.measure import measure_renderables
from rich.text import Text
from textual import events
from textual.widgets import Collapsible, RichLog, Static
from .helper import formatRichColor
from .i18n import t
from .pattern_controller import PatternController
from .show_controller import ShowController
from .shifted_season_controller import ShiftedSeasonController
@@ -16,6 +21,156 @@ from .tmdb_controller import TmdbController
from .track_controller import TrackController
SCREEN_LOG_PANE_ID = "screen_log_pane"
SCREEN_LOG_VIEW_ID = "screen_log_view"
SCREEN_LOG_RESIZE_HANDLE_ID = "screen_log_resize_handle"
SCREEN_LOG_HANDLER_NAME = "ffx-screen-log"
SCREEN_LOG_DEFAULT_HEIGHT = 8
SCREEN_LOG_MIN_HEIGHT = 4
SCREEN_LOG_COMPONENT_WIDTH = 16
SCREEN_LOG_LEVEL_WIDTH = 8
_SCREEN_LOG_PANE_ENABLED = False
class ScreenLogHandler(logging.Handler):
"""Mirror logger output into the active screen log pane when available."""
def __init__(self, app) -> None:
super().__init__(level=logging.DEBUG)
self.set_name(SCREEN_LOG_HANDLER_NAME)
self.set_app(app)
def set_app(self, app) -> None:
self._app_ref = weakref.ref(app) if app is not None else lambda: None
def emit(self, record: logging.LogRecord) -> None:
app = self._app_ref()
if app is None:
return
try:
message = str(self.format(record)).strip()
except Exception:
self.handleError(record)
return
if not message:
return
try:
app.call_from_thread(write_screen_log, app.screen, message)
except RuntimeError:
write_screen_log(app.screen, message)
except Exception:
self.handleError(record)
class ScreenLogResizeHandle(Static):
DEFAULT_CSS = """
ScreenLogResizeHandle {
width: 100%;
height: 1;
content-align: center middle;
color: $text-muted;
background: $panel-lighten-1;
}
ScreenLogResizeHandle:hover {
color: $text;
background: $panel-lighten-2;
}
"""
def __init__(self) -> None:
super().__init__(" drag to resize ", id=SCREEN_LOG_RESIZE_HANDLE_ID)
self._drag_active = False
self._drag_origin_screen_y = 0
self._drag_origin_height = SCREEN_LOG_DEFAULT_HEIGHT
def _get_log_pane(self):
return self.parent.parent if self.parent is not None else None
def on_mouse_down(self, event: events.MouseDown) -> None:
if event.button != 1:
return
log_pane = self._get_log_pane()
if log_pane is None:
return
self._drag_active = True
self._drag_origin_screen_y = event.screen_y
self._drag_origin_height = log_pane.get_log_height()
self.capture_mouse()
event.stop()
def on_mouse_move(self, event: events.MouseMove) -> None:
if not self._drag_active:
return
log_pane = self._get_log_pane()
if log_pane is None:
return
next_height = self._drag_origin_height + (
self._drag_origin_screen_y - event.screen_y
)
log_pane.set_log_height(next_height)
event.stop()
def on_mouse_up(self, event: events.MouseUp) -> None:
if not self._drag_active:
return
self._drag_active = False
self.release_mouse()
event.stop()
class ResizableScreenLogPane(Collapsible):
def __init__(self) -> None:
self._log_view = RichLog(
id=SCREEN_LOG_VIEW_ID,
wrap=True,
markup=False,
highlight=False,
auto_scroll=True,
)
self._log_height = SCREEN_LOG_DEFAULT_HEIGHT
self._apply_log_height()
super().__init__(
ScreenLogResizeHandle(),
self._log_view,
title=t("Log"),
collapsed=True,
id=SCREEN_LOG_PANE_ID,
)
self.styles.width = "100%"
def _apply_log_height(self) -> None:
self._log_view.styles.height = self._log_height
self._log_view.styles.width = "100%"
def get_log_height(self) -> int:
return int(self._log_height)
def set_log_height(self, height: int) -> None:
next_height = max(SCREEN_LOG_MIN_HEIGHT, int(height))
try:
available_height = int(self.app.size.height) - 8
except Exception:
available_height = next_height
if available_height > 0:
next_height = min(next_height, available_height)
self._log_height = next_height
self._apply_log_height()
@dataclass(frozen=True)
class ScreenBootstrap:
context: dict
@@ -46,6 +201,48 @@ def build_screen_bootstrap(context: dict) -> ScreenBootstrap:
)
def set_screen_log_pane_enabled(enabled: bool) -> None:
global _SCREEN_LOG_PANE_ENABLED
_SCREEN_LOG_PANE_ENABLED = bool(enabled)
def is_screen_log_pane_enabled() -> bool:
return bool(_SCREEN_LOG_PANE_ENABLED)
def configure_screen_log_handler(logger, app, *, enabled: bool):
if logger is None:
return None
screen_log_handler = next(
(handler for handler in logger.handlers if handler.get_name() == SCREEN_LOG_HANDLER_NAME),
None,
)
if not enabled:
if screen_log_handler is not None:
logger.removeHandler(screen_log_handler)
screen_log_handler.close()
return None
if screen_log_handler is None:
screen_log_handler = ScreenLogHandler(app)
logger.addHandler(screen_log_handler)
elif isinstance(screen_log_handler, ScreenLogHandler):
screen_log_handler.set_app(app)
screen_log_handler.setLevel(logging.DEBUG)
screen_log_handler.setFormatter(
logging.Formatter(
f"%(name)-{SCREEN_LOG_COMPONENT_WIDTH}s "
+ f"%(levelname)-{SCREEN_LOG_LEVEL_WIDTH}s "
+ "%(asctime)s | %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
)
return screen_log_handler
def build_screen_controllers(
context: dict,
*,
@@ -143,6 +340,48 @@ def update_table_column_label(table, column_key, label) -> None:
table.refresh()
def build_screen_log_pane() -> ResizableScreenLogPane | Static:
"""Create a shared collapsible log pane for screen-local diagnostics."""
if not is_screen_log_pane_enabled():
hidden = Static("", id=f"{SCREEN_LOG_PANE_ID}_disabled")
hidden.display = False
return hidden
return ResizableScreenLogPane()
def toggle_screen_log_pane(screen) -> bool:
"""Toggle the current screen log pane when present."""
try:
logPane = screen.query_one(f"#{SCREEN_LOG_PANE_ID}", Collapsible)
except Exception:
return False
logPane.collapsed = not bool(logPane.collapsed)
return True
def write_screen_log(screen, message: str) -> bool:
"""Append a line to the current screen log pane when present."""
if message is None:
return False
text = str(message).strip()
if not text:
return False
try:
logView = screen.query_one(f"#{SCREEN_LOG_VIEW_ID}", RichLog)
except Exception:
return False
logView.write(text)
return True
def go_back_or_exit(screen) -> None:
"""Pop the current screen when possible, otherwise exit the app."""

View File

@@ -3,7 +3,7 @@ from textual.screen import Screen
from textual.widgets import Footer, Placeholder
from .i18n import t
from .screen_support import go_back_or_exit
from .screen_support import build_screen_log_pane, go_back_or_exit
class SettingsScreen(Screen):
@@ -17,7 +17,15 @@ class SettingsScreen(Screen):
def compose(self) -> ComposeResult:
# Row 1
yield Placeholder(t("Settings Screen"))
yield build_screen_log_pane()
yield Footer()
def on_mount(self):
if getattr(self, 'context', {}).get('debug', False):
self.title = f"{self.app.title} - {self.__class__.__name__}"
def action_back(self):
go_back_or_exit(self)

View File

@@ -6,7 +6,7 @@ from textual.containers import Grid
from .i18n import t
from .shifted_season_controller import ShiftedSeasonController
from .screen_support import go_back_or_exit
from .screen_support import build_screen_log_pane, go_back_or_exit
from ffx.model.shifted_season import ShiftedSeason
@@ -67,6 +67,9 @@ class ShiftedSeasonDeleteScreen(Screen):
def on_mount(self):
if getattr(self, 'context', {}).get('debug', False):
self.title = f"{self.app.title} - {self.__class__.__name__}"
shiftedSeason: ShiftedSeason = self.__ssc.getShiftedSeason(self.__shiftedSeasonId)
ownerLabel = (
@@ -127,6 +130,7 @@ class ShiftedSeasonDeleteScreen(Screen):
yield Button(t("Delete"), id="delete_button")
yield Button(t("Cancel"), id="cancel_button")
yield build_screen_log_pane()
yield Footer()

View File

@@ -6,7 +6,7 @@ from textual.containers import Grid
from .i18n import t
from .shifted_season_controller import ShiftedSeasonController
from .screen_support import go_back_or_exit
from .screen_support import build_screen_log_pane, go_back_or_exit
from ffx.model.shifted_season import ShiftedSeason
@@ -109,6 +109,9 @@ class ShiftedSeasonDetailsScreen(Screen):
def on_mount(self):
if getattr(self, 'context', {}).get('debug', False):
self.title = f"{self.app.title} - {self.__class__.__name__}"
if self.__shiftedSeasonId is not None:
shiftedSeason: ShiftedSeason = self.__ssc.getShiftedSeason(self.__shiftedSeasonId)
@@ -175,6 +178,7 @@ class ShiftedSeasonDetailsScreen(Screen):
# Row 10
yield Static(" ", classes="three")
yield build_screen_log_pane()
yield Footer()

View File

@@ -4,7 +4,7 @@ from textual.containers import Grid
from .i18n import t
from .show_controller import ShowController
from .screen_support import go_back_or_exit
from .screen_support import build_screen_log_pane, go_back_or_exit
# Screen[dict[int, str, int]]
class ShowDeleteScreen(Screen):
@@ -89,6 +89,7 @@ class ShowDeleteScreen(Screen):
yield Button(t("Cancel"), id="cancel_button")
yield build_screen_log_pane()
yield Footer()
@@ -108,5 +109,12 @@ class ShowDeleteScreen(Screen):
if event.button.id == "cancel_button":
self.app.pop_screen()
def on_mount(self):
if getattr(self, 'context', {}).get('debug', False):
self.title = f"{self.app.title} - {self.__class__.__name__}"
def action_back(self):
go_back_or_exit(self)

View File

@@ -21,6 +21,7 @@ from .screen_support import (
add_auto_table_column,
build_screen_bootstrap,
build_screen_controllers,
build_screen_log_pane,
go_back_or_exit,
)
@@ -174,6 +175,9 @@ class ShowDetailsScreen(Screen):
def on_mount(self):
if getattr(self, 'context', {}).get('debug', False):
self.title = f"{self.app.title} - {self.__class__.__name__}"
if self.__showDescriptor is not None:
showId = int(self.__showDescriptor.getId())
@@ -433,6 +437,7 @@ class ShowDetailsScreen(Screen):
yield Button(t("Cancel"), id="cancel_button")
yield build_screen_log_pane()
yield Footer()

View File

@@ -5,7 +5,12 @@ from rich.text import Text
from .i18n import t
from .show_controller import ShowController
from .screen_support import add_auto_table_column, go_back_or_exit, update_table_column_label
from .screen_support import (
add_auto_table_column,
build_screen_log_pane,
go_back_or_exit,
update_table_column_label,
)
from .show_details_screen import ShowDetailsScreen
from .show_delete_screen import ShowDeleteScreen
@@ -239,6 +244,10 @@ class ShowsScreen(Screen):
def on_mount(self) -> None:
if getattr(self, 'context', {}).get('debug', False):
self.title = f"{self.app.title} - {self.__class__.__name__}"
for show in self.__sc.getAllShows():
self._add_show_row(show.getDescriptor(self.context))
@@ -278,4 +287,5 @@ class ShowsScreen(Screen):
f = Footer()
f.description = "yolo"
yield build_screen_log_pane()
yield f

View File

@@ -3,7 +3,7 @@ from textual.widgets import Header, Footer, Static, Button
from textual.containers import Grid
from .i18n import t
from .screen_support import go_back_or_exit
from .screen_support import build_screen_log_pane, go_back_or_exit
# Screen[dict[int, str, int]]
@@ -64,6 +64,9 @@ class TagDeleteScreen(Screen):
def on_mount(self):
if getattr(self, 'context', {}).get('debug', False):
self.title = f"{self.app.title} - {self.__class__.__name__}"
self.query_one("#keylabel", Static).update(str(self.__key))
self.query_one("#valuelabel", Static).update(str(self.__value))
@@ -92,6 +95,7 @@ class TagDeleteScreen(Screen):
yield Button(t("Delete"), id="delete_button")
yield Button(t("Cancel"), id="cancel_button")
yield build_screen_log_pane()
yield Footer()

View File

@@ -3,7 +3,7 @@ from textual.widgets import Header, Footer, Static, Button, Input
from textual.containers import Grid
from .i18n import t
from .screen_support import go_back_or_exit
from .screen_support import build_screen_log_pane, go_back_or_exit
# Screen[dict[int, str, int]]
@@ -87,6 +87,9 @@ class TagDetailsScreen(Screen):
def on_mount(self):
if getattr(self, 'context', {}).get('debug', False):
self.title = f"{self.app.title} - {self.__class__.__name__}"
if self.__key is not None:
self.query_one("#key_input", Input).value = str(self.__key)
@@ -121,6 +124,7 @@ class TagDetailsScreen(Screen):
# Row 6
yield Static(" ", classes="five", id="messagestatic")
yield build_screen_log_pane()
yield Footer(id="footer")

View File

@@ -3,20 +3,22 @@ from enum import Enum
class TrackCodec(Enum):
H265 = {'identifier': 'hevc', 'format': 'h265', 'extension': 'h265' ,'label': 'H.265'}
VP9 = {'identifier': 'vp9', 'format': 'ivf', 'extension': 'ivf' , 'label': 'VP9'}
H265 = {'identifier': 'hevc', 'format': None, 'extension': 'h265' ,'label': 'H.265'}
H264 = {'identifier': 'h264', 'format': 'h264', 'extension': 'h264' ,'label': 'H.264'}
MPEG4 = {'identifier': 'mpeg4', 'format': 'm4v', 'extension': 'm4v' ,'label': 'MPEG-4'}
MPEG2 = {'identifier': 'mpeg2video', 'format': 'mpeg2video', 'extension': 'mpg' ,'label': 'MPEG-2'}
OPUS = {'identifier': 'opus', 'format': 'opus', 'extension': 'opus' , 'label': 'Opus'}
AAC = {'identifier': 'aac', 'format': None, 'extension': 'aac' , 'label': 'AAC'}
AC3 = {'identifier': 'ac3', 'format': 'ac3', 'extension': 'ac3' , 'label': 'AC3'}
EAC3 = {'identifier': 'eac3', 'format': 'eac3', 'extension': 'eac3' , 'label': 'EAC3'}
DTS = {'identifier': 'dts', 'format': 'dts', 'extension': 'dts' , 'label': 'DTS'}
MP3 = {'identifier': 'mp3', 'format': 'mp3', 'extension': 'mp3' , 'label': 'MP3'}
WEBVTT = {'identifier': 'webvtt', 'format': 'webvtt', 'extension': 'vtt' , 'label': 'WebVTT'}
SRT = {'identifier': 'subrip', 'format': 'srt', 'extension': 'srt' , 'label': 'SRT'}
ASS = {'identifier': 'ass', 'format': 'ass', 'extension': 'ass' , 'label': 'ASS'}
TTF = {'identifier': 'ttf', 'format': None, 'extension': 'ttf' , 'label': 'TTF'}
PGS = {'identifier': 'hdmv_pgs_subtitle', 'format': 'sup', 'extension': 'sup' , 'label': 'PGS'}
VOBSUB = {'identifier': 'dvd_subtitle', 'format': None, 'extension': 'mkv' , 'label': 'VobSub'}

View File

@@ -43,7 +43,7 @@ class TrackController():
s = self.Session()
track = Track(pattern_id = patId,
track_type = int(trackDescriptor.getType().index()),
codec_name = str(trackDescriptor.getCodec().identifier()),
codec_name = str(trackDescriptor.getFormatDescriptor().identifier()),
index = int(trackDescriptor.getIndex()),
source_index = int(trackDescriptor.getSourceIndex()),
disposition_flags = int(TrackDisposition.toFlags(trackDescriptor.getDispositionSet())),
@@ -82,7 +82,7 @@ class TrackController():
track.index = int(trackDescriptor.getIndex())
track.track_type = int(trackDescriptor.getType().index())
track.codec_name = str(trackDescriptor.getCodec().identifier())
track.codec_name = str(trackDescriptor.getFormatDescriptor().identifier())
track.audio_layout = int(trackDescriptor.getAudioLayout().index())
track.disposition_flags = int(TrackDisposition.toFlags(trackDescriptor.getDispositionSet()))

View File

@@ -6,7 +6,7 @@ from textual.containers import Grid
from ffx.track_descriptor import TrackDescriptor
from .i18n import t
from .screen_support import go_back_or_exit
from .screen_support import build_screen_log_pane, go_back_or_exit
# Screen[dict[int, str, int]]
@@ -67,6 +67,9 @@ class TrackDeleteScreen(Screen):
def on_mount(self):
if getattr(self, 'context', {}).get('debug', False):
self.title = f"{self.app.title} - {self.__class__.__name__}"
self.query_one("#subindexlabel", Static).update(str(self.__trackDescriptor.getSubIndex()))
self.query_one("#patternlabel", Static).update(str(self.__trackDescriptor.getPatternId()))
self.query_one("#languagelabel", Static).update(str(self.__trackDescriptor.getLanguage().label()))
@@ -118,6 +121,7 @@ class TrackDeleteScreen(Screen):
yield Button(t("Delete"), id="delete_button")
yield Button(t("Cancel"), id="cancel_button")
yield build_screen_log_pane()
yield Footer()

View File

@@ -1,5 +1,6 @@
from typing import Self
from .attachment_format import AttachmentFormat
from .iso_language import IsoLanguage
from .track_type import TrackType
from .audio_layout import AudioLayout
@@ -26,6 +27,7 @@ class TrackDescriptor:
TRACK_TYPE_KEY = "track_type"
CODEC_KEY = "codec_name"
ATTACHMENT_FORMAT_KEY = "attachment_format"
AUDIO_LAYOUT_KEY = "audio_layout"
FFPROBE_INDEX_KEY = "index"
@@ -110,15 +112,6 @@ class TrackDescriptor:
else:
self.__trackType = TrackType.UNKNOWN
if TrackDescriptor.CODEC_KEY in kwargs.keys():
if type(kwargs[TrackDescriptor.CODEC_KEY]) is not TrackCodec:
raise TypeError(
f"TrackDesciptor.__init__(): Argument {TrackDescriptor.CODEC_KEY} is required to be of type TrackCodec"
)
self.__trackCodec = kwargs[TrackDescriptor.CODEC_KEY]
else:
self.__trackCodec = TrackCodec.UNKNOWN
if TrackDescriptor.TAGS_KEY in kwargs.keys():
if type(kwargs[TrackDescriptor.TAGS_KEY]) is not dict:
raise TypeError(
@@ -151,6 +144,34 @@ class TrackDescriptor:
else:
self.__audioLayout = AudioLayout.LAYOUT_UNDEFINED
self.__trackCodec = TrackCodec.UNKNOWN
self.__attachmentFormat = AttachmentFormat.UNKNOWN
if self.__trackType == TrackType.ATTACHMENT:
if TrackDescriptor.ATTACHMENT_FORMAT_KEY in kwargs.keys():
if type(kwargs[TrackDescriptor.ATTACHMENT_FORMAT_KEY]) is not AttachmentFormat:
raise TypeError(
f"TrackDesciptor.__init__(): Argument {TrackDescriptor.ATTACHMENT_FORMAT_KEY} is required to be of type AttachmentFormat"
)
self.__attachmentFormat = kwargs[TrackDescriptor.ATTACHMENT_FORMAT_KEY]
elif TrackDescriptor.CODEC_KEY in kwargs.keys():
legacyCodec = kwargs[TrackDescriptor.CODEC_KEY]
if type(legacyCodec) is AttachmentFormat:
self.__attachmentFormat = legacyCodec
elif type(legacyCodec) is TrackCodec:
self.__attachmentFormat = AttachmentFormat.fromTrackCodec(legacyCodec)
else:
raise TypeError(
f"TrackDesciptor.__init__(): Argument {TrackDescriptor.CODEC_KEY} is required to be of type TrackCodec for legacy attachment compatibility"
)
else:
if TrackDescriptor.CODEC_KEY in kwargs.keys():
if type(kwargs[TrackDescriptor.CODEC_KEY]) is not TrackCodec:
raise TypeError(
f"TrackDesciptor.__init__(): Argument {TrackDescriptor.CODEC_KEY} is required to be of type TrackCodec"
)
self.__trackCodec = kwargs[TrackDescriptor.CODEC_KEY]
@classmethod
def fromFfprobe(cls, streamObj, subIndex: int = -1):
"""Processes ffprobe stream data as array with elements according to the following example
@@ -215,7 +236,12 @@ class TrackDescriptor:
kwargs[TrackDescriptor.TRACK_TYPE_KEY] = trackType
kwargs[TrackDescriptor.CODEC_KEY] = TrackCodec.identify(streamObj[TrackDescriptor.FFPROBE_CODEC_KEY])
if trackType == TrackType.ATTACHMENT:
kwargs[TrackDescriptor.ATTACHMENT_FORMAT_KEY] = AttachmentFormat.identifyFfprobeStream(streamObj)
else:
kwargs[TrackDescriptor.CODEC_KEY] = TrackCodec.identify(
streamObj.get(TrackDescriptor.FFPROBE_CODEC_KEY)
)
kwargs[TrackDescriptor.DISPOSITION_SET_KEY] = (
{
@@ -277,6 +303,14 @@ class TrackDescriptor:
def getCodec(self) -> TrackCodec:
return self.__trackCodec
def getAttachmentFormat(self) -> AttachmentFormat:
return self.__attachmentFormat
def getFormatDescriptor(self):
if self.__trackType == TrackType.ATTACHMENT:
return self.__attachmentFormat
return self.__trackCodec
def getLanguage(self):
if "language" in self.__trackTags.keys():
return IsoLanguage.findThreeLetter(self.__trackTags["language"])
@@ -353,12 +387,16 @@ class TrackDescriptor:
TrackDescriptor.SOURCE_INDEX_KEY: int(self.__sourceIndex),
TrackDescriptor.SUB_INDEX_KEY: int(self.__subIndex),
TrackDescriptor.TRACK_TYPE_KEY: self.__trackType,
TrackDescriptor.CODEC_KEY: self.__trackCodec,
TrackDescriptor.TAGS_KEY: dict(self.__trackTags),
TrackDescriptor.DISPOSITION_SET_KEY: set(self.__dispositionSet),
TrackDescriptor.AUDIO_LAYOUT_KEY: self.__audioLayout,
}
if self.__trackType == TrackType.ATTACHMENT:
kwargs[TrackDescriptor.ATTACHMENT_FORMAT_KEY] = self.__attachmentFormat
else:
kwargs[TrackDescriptor.CODEC_KEY] = self.__trackCodec
if context is not None:
kwargs[TrackDescriptor.CONTEXT_KEY] = context
elif self.__context:

View File

@@ -5,6 +5,7 @@ from textual.widgets import Header, Footer, Static, Button, SelectionList, Selec
from textual.containers import Grid
from textual.widgets._data_table import CellDoesNotExist
from .attachment_format import AttachmentFormat
from .audio_layout import AudioLayout
from .iso_language import IsoLanguage
from .tag_delete_screen import TagDeleteScreen
@@ -14,7 +15,13 @@ from .track_descriptor import TrackDescriptor
from .track_disposition import TrackDisposition
from .track_type import TrackType
from .i18n import t
from .screen_support import add_auto_table_column, build_screen_bootstrap, go_back_or_exit, populate_tag_table
from .screen_support import (
add_auto_table_column,
build_screen_bootstrap,
build_screen_log_pane,
go_back_or_exit,
populate_tag_table,
)
class TrackDetailsScreen(Screen):
@@ -128,10 +135,14 @@ class TrackDetailsScreen(Screen):
self.__patternLabel = str(patternLabel)
self.__siblingTrackDescriptors = list(siblingTrackDescriptors or [])
self.__metadataOnly = bool(metadata_only)
self.__applyNormalization = bool(
self.context.get("apply_metadata_normalization", True)
)
if self.__isNew:
self.__trackType = trackType
self.__trackCodec = TrackCodec.UNKNOWN
self.__attachmentFormat = AttachmentFormat.UNKNOWN
self.__audioLayout = AudioLayout.LAYOUT_UNDEFINED
self.__index = index
self.__subIndex = subIndex
@@ -141,6 +152,7 @@ class TrackDetailsScreen(Screen):
else:
self.__trackType = trackDescriptor.getType()
self.__trackCodec = trackDescriptor.getCodec()
self.__attachmentFormat = trackDescriptor.getAttachmentFormat()
self.__audioLayout = trackDescriptor.getAudioLayout()
self.__index = trackDescriptor.getIndex()
self.__subIndex = trackDescriptor.getSubIndex()
@@ -152,8 +164,13 @@ class TrackDetailsScreen(Screen):
initial_language = trackDescriptor.getLanguage()
initial_title = trackDescriptor.getTitle()
self.__titleAutoManaged = (
initial_language == IsoLanguage.UNDEFINED and not str(initial_title).strip()
initialTitleEmpty = not str(initial_title).strip()
self.__titleAutoManaged = bool(
initialTitleEmpty
and (
initial_language == IsoLanguage.UNDEFINED
or (self.__metadataOnly and self.__applyNormalization)
)
)
self.__suppressTitleChanged = False
self.__lastAutoTitle = ""
@@ -222,6 +239,9 @@ class TrackDetailsScreen(Screen):
def on_mount(self):
if getattr(self, 'context', {}).get('debug', False):
self.title = f"{self.app.title} - {self.__class__.__name__}"
self.query_one("#index_label", Static).update(
str(self.__index) if self.__index is not None else "-"
)
@@ -256,6 +276,8 @@ class TrackDetailsScreen(Screen):
self.__trackDescriptor.getLanguage()
)
self.query_one("#title_input", Input).value = self.__trackDescriptor.getTitle()
if self.__titleAutoManaged and not self.__trackDescriptor.getTitle().strip():
self._apply_auto_title_for_language(self.__trackDescriptor.getLanguage())
self.updateTags()
if self.__metadataOnly:
@@ -387,6 +409,7 @@ class TrackDetailsScreen(Screen):
# Row 24
yield Static(" ", classes="five", id="messagestatic")
yield build_screen_log_pane()
yield Footer(id="footer")
def getTrackDescriptorFromInput(self):
@@ -413,6 +436,9 @@ class TrackDetailsScreen(Screen):
if not isinstance(selectedTrackType, TrackType):
selectedTrackType = TrackType.UNKNOWN
kwargs[TrackDescriptor.TRACK_TYPE_KEY] = selectedTrackType
if selectedTrackType == TrackType.ATTACHMENT:
kwargs[TrackDescriptor.ATTACHMENT_FORMAT_KEY] = self.__attachmentFormat
else:
kwargs[TrackDescriptor.CODEC_KEY] = self.__trackCodec
if selectedTrackType == TrackType.AUDIO:

View File

@@ -7,6 +7,7 @@ import os
from pathlib import Path
import subprocess
import sys
from functools import lru_cache
from typing import Mapping
@@ -95,8 +96,69 @@ def write_vtt(path: Path, lines: tuple[str, ...]) -> Path:
return path
def create_source_fixture(workdir: Path, filename: str, tracks: list[SourceTrackSpec], duration_seconds: int = 1) -> Path:
@lru_cache(maxsize=None)
def _ffmpeg_encoder_is_available(encoder_name: str) -> bool:
completed = subprocess.run(
["ffmpeg", "-encoders"],
capture_output=True,
text=True,
)
if completed.returncode != 0:
return False
encoder_label = str(encoder_name).strip()
for line in completed.stdout.splitlines():
if not line.startswith(" "):
continue
tokens = line.split(maxsplit=2)
if len(tokens) >= 2 and tokens[1] == encoder_label:
return True
return False
def _resolve_fixture_video_encoder(
video_encoder: str,
video_encoder_options: tuple[str, ...],
) -> tuple[str, tuple[str, ...]]:
if video_encoder != "libx264":
return video_encoder, video_encoder_options
if _ffmpeg_encoder_is_available("libx264"):
return video_encoder, video_encoder_options
if _ffmpeg_encoder_is_available("libopenh264"):
# Keep fixture generation software-based when libx264 is missing.
return "libopenh264", ("-pix_fmt", "yuv420p")
return video_encoder, video_encoder_options
def create_source_fixture(
workdir: Path,
filename: str,
tracks: list[SourceTrackSpec],
duration_seconds: int = 1,
*,
video_encoder: str = "libx264",
video_encoder_options: tuple[str, ...] = (
"-preset",
"ultrafast",
"-crf",
"35",
"-pix_fmt",
"yuv420p",
),
audio_encoder: str = "aac",
audio_encoder_options: tuple[str, ...] = ("-b:a", "48k"),
subtitle_encoder: str = "webvtt",
) -> Path:
output_path = workdir / filename
video_encoder, video_encoder_options = _resolve_fixture_video_encoder(
video_encoder,
video_encoder_options,
)
has_video = any(track.track_type == TrackType.VIDEO for track in tracks)
has_audio = any(track.track_type == TrackType.AUDIO for track in tracks)
@@ -189,21 +251,16 @@ def create_source_fixture(workdir: Path, filename: str, tracks: list[SourceTrack
command += map_tokens
command += metadata_tokens
command += disposition_tokens
if has_video:
command += ["-c:v", video_encoder] + list(video_encoder_options)
if has_audio:
command += ["-c:a", audio_encoder] + list(audio_encoder_options)
if subtitle_input_indices:
command += ["-c:s", subtitle_encoder]
command += [
"-c:v",
"libx264",
"-preset",
"ultrafast",
"-crf",
"35",
"-pix_fmt",
"yuv420p",
"-c:a",
"aac",
"-b:a",
"48k",
"-c:s",
"webvtt",
"-t",
str(duration_seconds),
"-shortest",

View File

@@ -168,6 +168,40 @@ class CliLazyImportTests(unittest.TestCase):
result["modules"],
)
def test_root_debug_flag_parses_without_loading_runtime_modules(self):
result = self.run_python(
textwrap.dedent(
f"""
import json
import sys
sys.path.insert(0, {str(SRC_ROOT)!r})
import ffx.cli
context = ffx.cli.ffx.make_context(
"ffx",
["--debug", "help"],
resilient_parsing=True,
)
print(json.dumps({{
"debug": context.params["debug"],
"modules": {{
module_name: module_name in sys.modules
for module_name in {HEAVY_MODULES!r}
}},
}}))
"""
)
)
self.assertTrue(result["debug"])
self.assertTrue(
all(not is_loaded for is_loaded in result["modules"].values()),
result["modules"],
)
def test_convert_cut_option_supports_flag_duration_and_start_duration_forms(self):
result = self.run_python(
textwrap.dedent(

View File

@@ -0,0 +1,89 @@
from __future__ import annotations
from pathlib import Path
import sys
import unittest
SRC_ROOT = Path(__file__).resolve().parents[2] / "src"
if str(SRC_ROOT) not in sys.path:
sys.path.insert(0, str(SRC_ROOT))
from ffx import cli # noqa: E402
from ffx.track_codec import TrackCodec # noqa: E402
from ffx.track_descriptor import TrackDescriptor # noqa: E402
from ffx.track_type import TrackType # noqa: E402
class UnmuxSequenceTests(unittest.TestCase):
def test_h265_video_unmux_uses_annex_b_bitstream_filter_without_forced_format(self):
track_descriptor = TrackDescriptor(
index=0,
sub_index=0,
track_type=TrackType.VIDEO,
codec_name=TrackCodec.H265,
tags={},
disposition_set=set(),
)
sequence = cli.getUnmuxSequence(
track_descriptor,
"input.mp4",
"episode_0_eng",
)
self.assertEqual(
[
"ffmpeg",
"-y",
"-i",
"input.mp4",
"-map",
"0:v:0",
"-c:v",
"copy",
"-bsf:v",
"hevc_mp4toannexb",
"episode_0_eng.h265",
],
sequence,
)
def test_non_h265_unmux_keeps_generic_copy_behavior(self):
track_descriptor = TrackDescriptor(
index=1,
sub_index=0,
track_type=TrackType.SUBTITLE,
codec_name=TrackCodec.SRT,
tags={},
disposition_set=set(),
)
sequence = cli.getUnmuxSequence(
track_descriptor,
"input.mkv",
"episode_1_eng",
)
self.assertEqual(
[
"ffmpeg",
"-y",
"-i",
"input.mkv",
"-map",
"0:s:0",
"-c",
"copy",
"-f",
"srt",
"episode_1_eng.srt",
],
sequence,
)
if __name__ == "__main__":
unittest.main()

View File

@@ -1,5 +1,6 @@
from __future__ import annotations
import click
from pathlib import Path
import sys
import unittest
@@ -32,6 +33,9 @@ class StaticConfig:
class FfxControllerTests(unittest.TestCase):
def tearDown(self):
FfxController.isFfmpegEncoderAvailable.cache_clear()
def make_context(self, video_encoder: VideoEncoder) -> dict:
return {
"logger": get_ffx_logger(),
@@ -192,6 +196,62 @@ class FfxControllerTests(unittest.TestCase):
self.assertIn("ENCODING_QUALITY=19", commands[0])
mocked_info.assert_any_call("Setting quality 19 from pattern")
def test_generate_h264_tokens_prefers_libx264_when_available(self):
context = self.make_context(VideoEncoder.H264)
target_descriptor, source_descriptor = self.make_media_descriptors()
controller = FfxController(context, target_descriptor, source_descriptor)
with patch.object(
FfxController,
"getSupportedSoftwareH264Encoder",
return_value="libx264",
):
tokens = controller.generateH264Tokens(23)
self.assertEqual(
["-c:v:0", "libx264", "-preset", "slow", "-crf", "23"],
tokens,
)
def test_generate_h264_tokens_falls_back_to_libopenh264_and_logs_warning(self):
context = self.make_context(VideoEncoder.H264)
target_descriptor, source_descriptor = self.make_media_descriptors()
controller = FfxController(context, target_descriptor, source_descriptor)
with (
patch.object(
FfxController,
"getSupportedSoftwareH264Encoder",
return_value="libopenh264",
),
patch.object(context["logger"], "warning") as mocked_warning,
):
tokens = controller.generateH264Tokens(23)
self.assertEqual(
["-c:v:0", "libopenh264", "-pix_fmt", "yuv420p"],
tokens,
)
mocked_warning.assert_called_once_with(
"libx264 encoder unavailable; falling back to libopenh264 for H.264 encoding."
)
def test_generate_h264_tokens_raises_when_no_supported_software_encoder_exists(self):
context = self.make_context(VideoEncoder.H264)
target_descriptor, source_descriptor = self.make_media_descriptors()
controller = FfxController(context, target_descriptor, source_descriptor)
with patch.object(
FfxController,
"getSupportedSoftwareH264Encoder",
return_value=None,
):
with self.assertRaisesRegex(
click.ClickException,
"no supported software H.264 encoder is available",
):
controller.generateH264Tokens(23)
if __name__ == "__main__":
unittest.main()

View File

@@ -0,0 +1,82 @@
from __future__ import annotations
from pathlib import Path
import sys
import tempfile
import unittest
SRC_ROOT = Path(__file__).resolve().parents[2] / "src"
if str(SRC_ROOT) not in sys.path:
sys.path.insert(0, str(SRC_ROOT))
from ffx.file_properties import FileProperties # noqa: E402
from ffx.i18n import set_current_language # noqa: E402
from ffx.logging_utils import get_ffx_logger # noqa: E402
from ffx.track_codec import TrackCodec # noqa: E402
from ffx.track_type import TrackType # noqa: E402
from tests.support.ffx_bundle import SourceTrackSpec, create_source_fixture # noqa: E402
class StaticConfig:
def __init__(self, data: dict):
self._data = data
def getData(self):
return self._data
class FilePropertiesAssetProbeTests(unittest.TestCase):
def tearDown(self):
set_current_language("de")
def test_boruto_webm_probe_recognizes_webm_stream_codecs(self):
context = {
"logger": get_ffx_logger(),
"config": StaticConfig({}),
"language": "de",
"use_pattern": False,
}
set_current_language("de")
with tempfile.TemporaryDirectory() as tmpdir:
media_path = create_source_fixture(
Path(tmpdir),
"fixture.webm",
[
SourceTrackSpec(TrackType.VIDEO, identity="video-0"),
SourceTrackSpec(TrackType.AUDIO, identity="audio-1", language="eng"),
SourceTrackSpec(
TrackType.SUBTITLE,
identity="subtitle-2",
language="eng",
subtitle_lines=("Lorem ipsum dolor sit amet.",),
),
],
duration_seconds=3,
video_encoder="libvpx-vp9",
video_encoder_options=("-b:v", "0", "-crf", "45"),
audio_encoder="libopus",
audio_encoder_options=("-b:a", "48k"),
subtitle_encoder="webvtt",
)
file_properties = FileProperties(context, str(media_path))
tracks = file_properties.getMediaDescriptor().getTrackDescriptors()
subtitle_codecs = [
track.getCodec()
for track in tracks
if track.getType() == TrackType.SUBTITLE
]
self.assertIn(TrackCodec.VP9, [track.getCodec() for track in tracks])
self.assertIn(TrackCodec.OPUS, [track.getCodec() for track in tracks])
self.assertTrue(subtitle_codecs)
self.assertTrue(all(codec == TrackCodec.WEBVTT for codec in subtitle_codecs))
if __name__ == "__main__":
unittest.main()

View File

@@ -16,8 +16,10 @@ if str(SRC_ROOT) not in sys.path:
from ffx.logging_utils import ( # noqa: E402
CONSOLE_HANDLER_NAME,
FILE_HANDLER_NAME,
MUTED_CONSOLE_LEVEL,
configure_ffx_logger,
get_ffx_logger,
set_ffx_console_logging_enabled,
)
@@ -81,6 +83,33 @@ class LoggingUtilsTests(unittest.TestCase):
self.cleanup_logger(logger_name)
def test_set_ffx_console_logging_enabled_mutes_and_restores_console_handler(self):
logger_name = "ffx-test-console-mute"
self.cleanup_logger(logger_name)
with tempfile.TemporaryDirectory() as tempdir:
log_path = Path(tempdir) / "ffx.log"
logger = configure_ffx_logger(
str(log_path),
logging.DEBUG,
logging.INFO,
name=logger_name,
)
console_handler = next(
handler for handler in logger.handlers if handler.get_name() == CONSOLE_HANDLER_NAME
)
self.assertEqual(logging.INFO, console_handler.level)
set_ffx_console_logging_enabled(logger, enabled=False)
self.assertEqual(MUTED_CONSOLE_LEVEL, console_handler.level)
set_ffx_console_logging_enabled(logger, enabled=True)
self.assertEqual(logging.INFO, console_handler.level)
self.cleanup_logger(logger_name)
if __name__ == "__main__":
unittest.main()

View File

@@ -247,7 +247,8 @@ class MediaDescriptorChangeSetTests(unittest.TestCase):
self.assertIn("title=German", metadata_tokens)
self.assertNotIn("title=Deutsch", metadata_tokens)
def test_non_subtitle_track_without_title_does_not_get_language_name(self):
def test_audio_track_without_title_gets_language_name_when_normalization_enabled(self):
set_current_language("de")
context = {
"logger": get_ffx_logger(),
"config": StaticConfig({}),
@@ -278,6 +279,73 @@ class MediaDescriptorChangeSetTests(unittest.TestCase):
self.assertIn("-metadata:s:a:0", metadata_tokens)
self.assertIn("language=deu", metadata_tokens)
self.assertIn("title=Deutsch", metadata_tokens)
def test_video_track_without_title_gets_language_name_when_normalization_enabled(self):
set_current_language("de")
context = {
"logger": get_ffx_logger(),
"config": StaticConfig({}),
}
source_track = TrackDescriptor(
index=0,
source_index=0,
sub_index=0,
track_type=TrackType.VIDEO,
tags={"language": "ger"},
)
target_track = TrackDescriptor(
index=0,
source_index=0,
sub_index=0,
track_type=TrackType.VIDEO,
tags={"language": "ger"},
)
change_set = MediaDescriptorChangeSet(
context,
MediaDescriptor(track_descriptors=[target_track]),
MediaDescriptor(track_descriptors=[source_track]),
)
metadata_tokens = change_set.generateMetadataTokens()
self.assertIn("language=deu", metadata_tokens)
self.assertIn("title=Deutsch", metadata_tokens)
def test_changed_track_language_does_not_autofill_title_when_title_already_exists(self):
set_current_language("de")
context = {
"logger": get_ffx_logger(),
"config": StaticConfig({}),
}
source_track = TrackDescriptor(
index=0,
source_index=0,
sub_index=0,
track_type=TrackType.SUBTITLE,
tags={"language": "ger", "title": "Deutsch [FN]"},
)
target_track = TrackDescriptor(
index=0,
source_index=0,
sub_index=0,
track_type=TrackType.SUBTITLE,
tags={"language": "jpn", "title": "Deutsch [FN]"},
)
change_set = MediaDescriptorChangeSet(
context,
MediaDescriptor(track_descriptors=[target_track]),
MediaDescriptor(track_descriptors=[source_track]),
)
metadata_tokens = change_set.generateMetadataTokens()
self.assertIn("language=jpn", metadata_tokens)
self.assertNotIn("title=Japanisch", metadata_tokens)
self.assertNotIn("title=Deutsch", metadata_tokens)
def test_target_only_tracks_still_emit_remove_tokens_for_configured_stream_keys(self):

View File

@@ -15,9 +15,11 @@ if str(SRC_ROOT) not in sys.path:
from ffx.logging_utils import get_ffx_logger # noqa: E402
from ffx.helper import LogLevel # noqa: E402
from ffx.media_descriptor import MediaDescriptor # noqa: E402
from ffx.metadata_editor import ( # noqa: E402
apply_metadata_edits,
build_metadata_edit_command,
build_metadata_edit_context,
create_temporary_output_path,
)
@@ -32,6 +34,16 @@ class StaticConfig:
return {}
class NotificationCollector:
def __init__(self) -> None:
self.messages: list[str] = []
self.levels: list[LogLevel | None] = []
def __call__(self, message: str, level: LogLevel | None = None) -> None:
self.messages.append(message)
self.levels.append(level)
def make_context(*, dry_run: bool = False) -> dict:
return {
"logger": get_ffx_logger(),
@@ -77,15 +89,45 @@ class MetadataEditorTests(unittest.TestCase):
self.assertEqual(".mkv", Path(temporary_path).suffix)
self.assertEqual(Path(source_path).parent, Path(temporary_path).parent)
def test_build_metadata_edit_command_maps_all_streams_and_uses_single_copy_codec(self):
context = build_metadata_edit_context(make_context())
baseline_descriptor = make_descriptor()
draft_descriptor = baseline_descriptor.clone(context=context)
command = build_metadata_edit_command(
context,
"/tmp/example.mkv",
"/tmp/.edit.mkv",
baseline_descriptor,
draft_descriptor,
)
self.assertEqual(1, command.count("-map"))
self.assertEqual(1, command.count("-c"))
self.assertNotIn("-c:v:0", command)
self.assertNotIn("-c:a:0", command)
self.assertNotIn("-c:s:0", command)
self.assertEqual(
["-map", "0", "-c", "copy"],
command[command.index("-map"):command.index("-c") + 2],
)
def test_apply_metadata_edits_rewrites_via_temporary_file_then_replaces_source(self):
context = make_context()
baseline_descriptor = make_descriptor()
draft_descriptor = baseline_descriptor.clone(context=context)
source_path = "/tmp/example.mkv"
expected_command = build_metadata_edit_command(
build_metadata_edit_context(context),
source_path,
"/tmp/.edit.mkv",
baseline_descriptor,
draft_descriptor,
)
with (
patch("ffx.metadata_editor.create_temporary_output_path", return_value="/tmp/.edit.mkv"),
patch("ffx.metadata_editor.FfxController.runJob") as mocked_run_job,
patch("ffx.metadata_editor.executeProcess", return_value=("", "", 0)) as mocked_execute,
patch("ffx.metadata_editor.os.replace") as mocked_replace,
):
result = apply_metadata_edits(
@@ -95,32 +137,43 @@ class MetadataEditorTests(unittest.TestCase):
draft_descriptor,
)
mocked_run_job.assert_called_once_with(
source_path,
"/tmp/.edit.mkv",
targetFormat="",
chainIteration=[],
)
mocked_execute.assert_called_once_with(expected_command, context=build_metadata_edit_context(context))
mocked_replace.assert_called_once_with("/tmp/.edit.mkv", source_path)
self.assertEqual(
{
"applied": True,
"dry_run": False,
"target_path": source_path,
"command_sequence": expected_command,
},
{
"applied": result["applied"],
"dry_run": result["dry_run"],
"target_path": result["target_path"],
"command_sequence": result["command_sequence"],
},
result,
)
self.assertIn("timings", result)
self.assertIn("ffmpeg_seconds", result["timings"])
self.assertIn("replace_seconds", result["timings"])
self.assertIn("write_seconds", result["timings"])
def test_apply_metadata_edits_dry_run_skips_replace_and_cleans_temp_path(self):
context = make_context(dry_run=True)
baseline_descriptor = make_descriptor()
draft_descriptor = baseline_descriptor.clone(context=context)
notifications = NotificationCollector()
expected_command = build_metadata_edit_command(
build_metadata_edit_context(context),
"/tmp/example.mkv",
"/tmp/.edit.mkv",
baseline_descriptor,
draft_descriptor,
)
with (
patch("ffx.metadata_editor.create_temporary_output_path", return_value="/tmp/.edit.mkv"),
patch("ffx.metadata_editor.FfxController.runJob") as mocked_run_job,
patch("ffx.metadata_editor.os.path.exists", return_value=True),
patch("ffx.metadata_editor.os.remove") as mocked_remove,
patch("ffx.metadata_editor.executeProcess") as mocked_execute,
patch("ffx.metadata_editor.os.replace") as mocked_replace,
):
result = apply_metadata_edits(
@@ -128,19 +181,59 @@ class MetadataEditorTests(unittest.TestCase):
"/tmp/example.mkv",
baseline_descriptor,
draft_descriptor,
loggingHandler = notifications,
)
mocked_run_job.assert_called_once()
mocked_execute.assert_not_called()
mocked_replace.assert_not_called()
mocked_remove.assert_called_once_with("/tmp/.edit.mkv")
self.assertEqual(["ffmpeg dry-run prepared."], notifications.messages)
self.assertEqual([None], notifications.levels)
self.assertEqual(
{
"applied": False,
"dry_run": True,
"target_path": "/tmp/.edit.mkv",
"command_sequence": expected_command,
},
{
"applied": result["applied"],
"dry_run": result["dry_run"],
"target_path": result["target_path"],
"command_sequence": result["command_sequence"],
},
result,
)
self.assertEqual(
{
"ffmpeg_seconds": 0.0,
"replace_seconds": 0.0,
"write_seconds": 0.0,
},
result["timings"],
)
def test_apply_metadata_edits_notifies_with_command_when_verbose(self):
context = make_context()
context["verbosity"] = 1
baseline_descriptor = make_descriptor()
draft_descriptor = baseline_descriptor.clone(context=context)
notifications = NotificationCollector()
with (
patch("ffx.metadata_editor.create_temporary_output_path", return_value="/tmp/.edit.mkv"),
patch("ffx.metadata_editor.executeProcess", return_value=("", "", 0)),
patch("ffx.metadata_editor.os.replace"),
):
apply_metadata_edits(
context,
"/tmp/example.mkv",
baseline_descriptor,
draft_descriptor,
loggingHandler = notifications,
)
self.assertEqual(1, len(notifications.messages))
self.assertTrue(notifications.messages[0].startswith("ffmpeg: ffmpeg "))
self.assertEqual([LogLevel.DEBUG], notifications.levels)
if __name__ == "__main__":

View File

@@ -1,6 +1,7 @@
from __future__ import annotations
from pathlib import Path
import logging
import sys
import unittest
from unittest.mock import patch
@@ -57,9 +58,38 @@ class FakeScreen:
self.app = FakeApp(screen_stack)
class FakeRichLog:
def __init__(self):
self.messages = []
def write(self, message):
self.messages.append(message)
class FakeScreenWithLog:
def __init__(self):
self.log_view = FakeRichLog()
def query_one(self, selector, _widget_type=None):
if selector == f"#{screen_support.SCREEN_LOG_VIEW_ID}":
return self.log_view
raise LookupError(selector)
class FakeThreadedApp:
def __init__(self, screen):
self.screen = screen
self.calls = []
def call_from_thread(self, func, *args):
self.calls.append((func, args))
return func(*args)
class ScreenSupportTests(unittest.TestCase):
def tearDown(self):
set_current_language("de")
screen_support.set_screen_log_pane_enabled(False)
def make_context(self):
return {
@@ -168,6 +198,63 @@ class ScreenSupportTests(unittest.TestCase):
self.assertGreater(len(translated), 8)
self.assertEqual(len(translated) + 2, screen_support.localized_column_width(translated, 8))
def test_build_screen_log_pane_is_hidden_when_debug_mode_is_disabled(self):
screen_support.set_screen_log_pane_enabled(False)
log_pane = screen_support.build_screen_log_pane()
self.assertFalse(log_pane.display)
def test_build_screen_log_pane_is_collapsed_when_debug_mode_is_enabled(self):
screen_support.set_screen_log_pane_enabled(True)
log_pane = screen_support.build_screen_log_pane()
self.assertIsInstance(log_pane, screen_support.ResizableScreenLogPane)
self.assertEqual(screen_support.SCREEN_LOG_PANE_ID, log_pane.id)
self.assertTrue(log_pane.collapsed)
def test_resizable_screen_log_pane_clamps_height_to_minimum(self):
log_pane = screen_support.ResizableScreenLogPane()
log_pane.set_log_height(1)
self.assertEqual(screen_support.SCREEN_LOG_MIN_HEIGHT, log_pane.get_log_height())
def test_configure_screen_log_handler_routes_logger_messages_to_active_screen(self):
logger_name = "ffx-test-screen-log-handler"
logger = logging.getLogger(logger_name)
logger.setLevel(logging.DEBUG)
logger.propagate = False
for handler in list(logger.handlers):
logger.removeHandler(handler)
handler.close()
screen = FakeScreenWithLog()
app = FakeThreadedApp(screen)
try:
handler = screen_support.configure_screen_log_handler(
logger,
app,
enabled=True,
)
self.assertIsNotNone(handler)
logger.info("hello pane")
self.assertEqual(1, len(screen.log_view.messages))
self.assertRegex(
screen.log_view.messages[0],
r"^ffx-test-screen-log-handler\s+INFO\s+\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2} \| hello pane$",
)
finally:
screen_support.configure_screen_log_handler(logger, app, enabled=False)
for handler in list(logger.handlers):
logger.removeHandler(handler)
handler.close()
if __name__ == "__main__":
unittest.main()

View File

@@ -14,10 +14,13 @@ if str(SRC_ROOT) not in sys.path:
from ffx.audio_layout import AudioLayout # noqa: E402
from ffx.attachment_format import AttachmentFormat # noqa: E402
from ffx.helper import DIFF_ADDED_KEY # noqa: E402
from ffx.iso_language import IsoLanguage # noqa: E402
from ffx.logging_utils import get_ffx_logger # noqa: E402
from ffx.inspect_details_screen import InspectDetailsScreen # noqa: E402
from ffx.i18n import set_current_language # noqa: E402
from ffx.media_descriptor import MediaDescriptor # noqa: E402
from ffx.media_edit_screen import MediaEditScreen # noqa: E402
from ffx.pattern_details_screen import PatternDetailsScreen # noqa: E402
from ffx.show_descriptor import ShowDescriptor # noqa: E402
@@ -89,16 +92,21 @@ class FakeTagTable:
class FakeMediaDescriptor:
def __init__(self, track_descriptors):
def __init__(self, track_descriptors, tags=None):
self._track_descriptors = list(track_descriptors)
self._tags = dict(tags or {})
def getTrackDescriptors(self):
return list(self._track_descriptors)
def getTags(self):
return dict(self._tags)
class FakeValueWidget:
def __init__(self, value):
self.value = value
self.disabled = False
class FakeInputWidget:
@@ -106,10 +114,21 @@ class FakeInputWidget:
self.value = value
class FakeStaticWidget:
def __init__(self, value=""):
self.value = value
def update(self, value):
self.value = value
class FakeSelectionListWidget:
def __init__(self, selected):
self.selected = selected
def add_option(self, _option):
return None
def make_track_descriptor(index, sub_index, track_type):
return TrackDescriptor(
@@ -182,6 +201,32 @@ class TagTableScreenStateTests(unittest.TestCase):
self.assertEqual("German Audio", descriptor.getTitle())
self.assertEqual("value", descriptor.getTags()["KEEP"])
def test_track_details_screen_preserves_attachment_format_for_attachment_tracks(self):
screen = object.__new__(TrackDetailsScreen)
screen.context = {"logger": get_ffx_logger()}
screen._TrackDetailsScreen__trackDescriptor = None
screen._TrackDetailsScreen__patternId = 5
screen._TrackDetailsScreen__index = 4
screen._TrackDetailsScreen__subIndex = 0
screen._TrackDetailsScreen__trackCodec = TrackCodec.UNKNOWN
screen._TrackDetailsScreen__attachmentFormat = AttachmentFormat.TTF
screen._TrackDetailsScreen__draftTrackTags = {"filename": "font.ttf", "mimetype": "font/ttf"}
widgets = {
"#type_select": FakeValueWidget(TrackType.ATTACHMENT),
"#audio_layout_select": FakeValueWidget(AudioLayout.LAYOUT_UNDEFINED),
"#language_select": FakeValueWidget(Select.NULL),
"#title_input": FakeInputWidget(""),
"#dispositions_selection_list": FakeSelectionListWidget(set()),
}
screen.query_one = lambda selector, _widget_type=None: widgets[selector]
descriptor = screen.getTrackDescriptorFromInput()
self.assertEqual(TrackType.ATTACHMENT, descriptor.getType())
self.assertEqual(AttachmentFormat.TTF, descriptor.getAttachmentFormat())
self.assertEqual(TrackCodec.UNKNOWN, descriptor.getCodec())
def test_track_details_screen_auto_sets_localized_title_from_selected_language(self):
set_current_language("de")
screen = object.__new__(TrackDetailsScreen)
@@ -244,6 +289,49 @@ class TagTableScreenStateTests(unittest.TestCase):
self.assertEqual("Preset", widgets["#title_input"].value)
def test_track_details_screen_metadata_only_mount_shows_normalized_title_preview(self):
set_current_language("de")
screen = object.__new__(TrackDetailsScreen)
screen._TrackDetailsScreen__index = 2
screen._TrackDetailsScreen__subIndex = 0
screen._TrackDetailsScreen__patternLabel = "demo"
screen._TrackDetailsScreen__trackType = TrackType.AUDIO
screen._TrackDetailsScreen__audioLayout = AudioLayout.LAYOUT_STEREO
screen._TrackDetailsScreen__trackDescriptor = TrackDescriptor(
index=2,
source_index=2,
sub_index=0,
track_type=TrackType.AUDIO,
codec_name=TrackCodec.DTS,
audio_layout=AudioLayout.LAYOUT_STEREO,
tags={"language": "ger"},
)
screen._TrackDetailsScreen__metadataOnly = True
screen._TrackDetailsScreen__titleAutoManaged = True
screen._TrackDetailsScreen__suppressTitleChanged = False
screen._TrackDetailsScreen__lastAutoTitle = ""
screen._TrackDetailsScreen__removeTrackKeys = []
screen._TrackDetailsScreen__ignoreTrackKeys = []
screen._TrackDetailsScreen__draftTrackTags = {}
screen._TrackDetailsScreen__tagRowData = {}
screen.updateTags = lambda: None
widgets = {
"#index_label": FakeStaticWidget(),
"#subindex_label": FakeStaticWidget(),
"#pattern_label": FakeStaticWidget(),
"#type_select": FakeValueWidget(None),
"#audio_layout_select": FakeValueWidget(None),
"#dispositions_selection_list": FakeSelectionListWidget(set()),
"#language_select": FakeValueWidget(None),
"#title_input": FakeInputWidget(""),
}
screen.query_one = lambda selector, _widget_type=None: widgets[selector]
screen.on_mount()
self.assertEqual("Deutsch", widgets["#title_input"].value)
def test_track_details_screen_language_options_are_sorted_by_localized_label(self):
set_current_language("de")
@@ -326,12 +414,177 @@ class TagTableScreenStateTests(unittest.TestCase):
screen.tracksTable = FakeTagTable()
screen._sourceMediaDescriptor = FakeMediaDescriptor([first_track])
screen._trackRowData = {}
screen._applyNormalization = False
screen.updateTracks()
self.assertEqual(9, len(screen.tracksTable.columns))
self.assertIn("A much longer updated title", screen.tracksTable.rows["row-0"])
def test_media_edit_screen_shows_normalized_audio_title_preview(self):
set_current_language("de")
audio_track = TrackDescriptor(
index=1,
source_index=1,
sub_index=0,
track_type=TrackType.AUDIO,
codec_name=TrackCodec.DTS,
audio_layout=AudioLayout.LAYOUT_STEREO,
tags={"language": "ger"},
)
screen = object.__new__(MediaEditScreen)
screen.tracksTable = FakeTagTable()
screen._sourceMediaDescriptor = FakeMediaDescriptor([audio_track])
screen._trackRowData = {}
screen._applyNormalization = True
screen.updateTracks()
self.assertIn("Deutsch", screen.tracksTable.rows["row-0"])
def test_media_edit_screen_shows_normalized_video_title_preview(self):
set_current_language("de")
video_track = TrackDescriptor(
index=0,
source_index=0,
sub_index=0,
track_type=TrackType.VIDEO,
codec_name=TrackCodec.H264,
tags={"language": "ger"},
)
screen = object.__new__(MediaEditScreen)
screen.tracksTable = FakeTagTable()
screen._sourceMediaDescriptor = FakeMediaDescriptor([video_track])
screen._trackRowData = {}
screen._applyNormalization = True
screen.updateTracks()
self.assertIn("Deutsch", screen.tracksTable.rows["row-0"])
def test_media_edit_screen_toggle_normalization_refreshes_tracks(self):
screen = object.__new__(MediaEditScreen)
screen._applyNormalization = False
calls = []
screen.setApplyNormalization = lambda enabled: (
setattr(screen, "_applyNormalization", bool(enabled)),
calls.append("setApplyNormalization"),
)
screen.updateToggleButtons = lambda: calls.append("updateToggleButtons")
screen.updateTracks = lambda: calls.append("updateTracks")
screen.updateDifferences = lambda: calls.append("updateDifferences")
screen.setMessage = lambda _message: calls.append("setMessage")
screen.action_toggle_normalization()
self.assertEqual(
[
"setApplyNormalization",
"updateToggleButtons",
"updateTracks",
"updateDifferences",
"setMessage",
],
calls,
)
def test_media_edit_screen_handle_edit_track_updates_draft_descriptor(self):
original_track = TrackDescriptor(
index=1,
source_index=1,
sub_index=0,
track_type=TrackType.SUBTITLE,
codec_name=TrackCodec.UNKNOWN,
tags={"language": "ger"},
)
context = {"logger": get_ffx_logger()}
updated_track = original_track.clone(context=context)
updated_track.getTags()["language"] = "eng"
screen = object.__new__(MediaEditScreen)
screen.context = context
screen._sourceMediaDescriptor = MediaDescriptor(
context=context,
track_descriptors=[original_track],
)
calls = []
screen.setMessage = lambda _message: calls.append("setMessage")
screen.refreshAfterDraftChange = lambda: calls.append("refreshAfterDraftChange")
screen.handle_edit_track(updated_track)
self.assertEqual(
"eng",
screen._sourceMediaDescriptor.getTrackDescriptors()[0].getTags()["language"],
)
self.assertEqual(
["setMessage", "refreshAfterDraftChange"],
calls,
)
def test_media_edit_screen_screen_resume_refreshes_draft_tables(self):
screen = object.__new__(MediaEditScreen)
screen.tracksTable = FakeTagTable()
calls = []
screen.refreshAfterDraftChange = lambda: calls.append("refreshAfterDraftChange")
screen.updateToggleButtons = lambda: calls.append("updateToggleButtons")
screen.on_screen_resume(None)
self.assertEqual(
["refreshAfterDraftChange", "updateToggleButtons"],
calls,
)
def test_pattern_details_screen_screen_resume_refreshes_tables(self):
screen = object.__new__(PatternDetailsScreen)
screen.tracksTable = FakeTagTable()
screen.tagsTable = FakeTagTable()
screen.shiftedSeasonsTable = FakeTagTable()
screen._PatternDetailsScreen__pattern = object()
calls = []
screen.updateTags = lambda: calls.append("updateTags")
screen.updateTracks = lambda: calls.append("updateTracks")
screen.updateShiftedSeasons = lambda: calls.append("updateShiftedSeasons")
screen.on_screen_resume(None)
self.assertEqual(
["updateTags", "updateTracks", "updateShiftedSeasons"],
calls,
)
def test_inspect_details_screen_handle_edit_pattern_refreshes_even_without_result(self):
screen = object.__new__(InspectDetailsScreen)
calls = []
screen.reloadProperties = lambda reset_draft=True: calls.append(
("reloadProperties", reset_draft)
)
screen._currentPattern = None
screen.updateMediaTags = lambda: calls.append("updateMediaTags")
screen.updateTracks = lambda: calls.append("updateTracks")
screen.updateDifferences = lambda: calls.append("updateDifferences")
screen.handle_edit_pattern(None)
self.assertEqual(
[
("reloadProperties", True),
"updateMediaTags",
"updateTracks",
"updateDifferences",
],
calls,
)
def test_pattern_details_screen_reads_selected_shifted_season_from_row_mapping(self):
screen = object.__new__(PatternDetailsScreen)
screen.shiftedSeasonsTable = FakeTagTable()
@@ -438,6 +691,154 @@ class TagTableScreenStateTests(unittest.TestCase):
self.assertNotIn(placeholder_key, screen._showRowData)
self.assertEqual(0, screen.getRowIndexFromShowId(8))
def test_inspect_details_screen_update_tracks_shows_target_pattern_tracks(self):
source_track = TrackDescriptor(
index=1,
source_index=1,
sub_index=0,
track_type=TrackType.SUBTITLE,
codec_name=TrackCodec.UNKNOWN,
tags={"language": "ger", "title": "German Full"},
)
target_track = TrackDescriptor(
index=1,
source_index=1,
sub_index=0,
track_type=TrackType.SUBTITLE,
codec_name=TrackCodec.UNKNOWN,
tags={"language": "eng", "title": "English Full"},
)
screen = object.__new__(InspectDetailsScreen)
screen.tracksTable = FakeTagTable()
screen._sourceMediaDescriptor = FakeMediaDescriptor([source_track])
screen._targetMediaDescriptor = FakeMediaDescriptor([target_track])
screen._currentPattern = object()
screen._trackRowData = {}
screen._applyNormalization = False
screen.updateTracks()
self.assertIn("English Full", screen.tracksTable.rows["row-0"])
self.assertIs(target_track, screen.getSelectedTrackDescriptor())
def test_inspect_details_screen_update_tracks_blanks_irrelevant_attachment_fields(self):
attachment_track = TrackDescriptor(
index=4,
source_index=4,
sub_index=0,
track_type=TrackType.ATTACHMENT,
attachment_format=AttachmentFormat.TTF,
tags={"filename": "font.ttf", "mimetype": "font/ttf"},
)
screen = object.__new__(InspectDetailsScreen)
screen.tracksTable = FakeTagTable()
screen._sourceMediaDescriptor = FakeMediaDescriptor([attachment_track])
screen._targetMediaDescriptor = None
screen._currentPattern = None
screen._trackRowData = {}
screen._applyNormalization = False
screen.updateTracks()
row = screen.tracksTable.rows["row-0"]
self.assertEqual("4", row[0])
self.assertEqual(" ", row[3])
self.assertEqual(" ", row[7])
self.assertEqual(" ", row[8])
def test_inspect_details_screen_maps_target_selection_back_to_source_track(self):
source_track = TrackDescriptor(
index=3,
source_index=7,
sub_index=1,
track_type=TrackType.SUBTITLE,
codec_name=TrackCodec.UNKNOWN,
tags={"language": "ger"},
)
target_track = TrackDescriptor(
index=1,
source_index=7,
sub_index=0,
track_type=TrackType.SUBTITLE,
codec_name=TrackCodec.UNKNOWN,
tags={"language": "eng"},
)
screen = object.__new__(InspectDetailsScreen)
screen.tracksTable = FakeTagTable()
screen._sourceMediaDescriptor = FakeMediaDescriptor([source_track])
screen._targetMediaDescriptor = FakeMediaDescriptor([target_track])
screen._currentPattern = object()
screen._trackRowData = {}
screen._applyNormalization = False
screen.updateTracks()
self.assertIs(source_track, screen.getTrackEditSourceDescriptor())
def test_inspect_details_screen_action_update_pattern_uses_existing_change_set_before_reload(self):
class _FakePattern:
def getPattern(self):
return r"demo_(s[0-9]+e[0-9]+)\.mkv"
def getId(self):
return 9
class _FakeTagController:
def __init__(self, calls):
self._calls = calls
def deleteMediaTagByKey(self, pattern_id, key):
self._calls.append(("deleteMediaTagByKey", pattern_id, key))
calls = []
screen = object.__new__(InspectDetailsScreen)
screen._currentPattern = _FakePattern()
screen._mediaChangeSetObj = {
"tags": {
DIFF_ADDED_KEY: {"TITLE": "Demo"},
}
}
screen._tac = _FakeTagController(calls)
screen._tc = type(
"_FakeTrackController",
(),
{
"addTrack": staticmethod(lambda *_args, **_kwargs: None),
"deleteTrack": staticmethod(lambda *_args, **_kwargs: None),
"setDispositionState": staticmethod(lambda *_args, **_kwargs: None),
},
)()
screen._sourceMediaDescriptor = FakeMediaDescriptor([], tags={})
screen._targetMediaDescriptor = FakeMediaDescriptor([])
screen.getPatternObjFromInput = lambda: {
"show_id": 1,
"pattern": r"demo_(s[0-9]+e[0-9]+)\.mkv",
}
screen.reloadProperties = lambda reset_draft=True: calls.append(
("reloadProperties", reset_draft)
)
screen.updateMediaTags = lambda: calls.append("updateMediaTags")
screen.updateTracks = lambda: calls.append("updateTracks")
screen.updateDifferences = lambda: calls.append("updateDifferences")
screen.action_update_pattern()
self.assertEqual(
[
("deleteMediaTagByKey", 9, "TITLE"),
("reloadProperties", True),
"updateMediaTags",
"updateTracks",
"updateDifferences",
],
calls,
)
if __name__ == "__main__":
unittest.main()

View File

@@ -0,0 +1,25 @@
from __future__ import annotations
from pathlib import Path
import sys
import unittest
SRC_ROOT = Path(__file__).resolve().parents[2] / "src"
if str(SRC_ROOT) not in sys.path:
sys.path.insert(0, str(SRC_ROOT))
from ffx.track_codec import TrackCodec # noqa: E402
class TrackCodecIdentificationTests(unittest.TestCase):
def test_identify_modern_webm_codecs(self):
self.assertEqual(TrackCodec.VP9, TrackCodec.identify("vp9"))
self.assertEqual(TrackCodec.OPUS, TrackCodec.identify("opus"))
self.assertEqual(TrackCodec.WEBVTT, TrackCodec.identify("webvtt"))
if __name__ == "__main__":
unittest.main()

View File

@@ -0,0 +1,87 @@
from __future__ import annotations
import json
from pathlib import Path
import sys
import unittest
SRC_ROOT = Path(__file__).resolve().parents[2] / "src"
if str(SRC_ROOT) not in sys.path:
sys.path.insert(0, str(SRC_ROOT))
from ffx.attachment_format import AttachmentFormat # noqa: E402
from ffx.media_descriptor import MediaDescriptor # noqa: E402
from ffx.track_codec import TrackCodec # noqa: E402
from ffx.track_descriptor import TrackDescriptor # noqa: E402
from ffx.track_type import TrackType # noqa: E402
ASSETS_ROOT = Path(__file__).resolve().parents[1] / "assets"
class TrackDescriptorProbeTests(unittest.TestCase):
def test_attachment_without_codec_name_uses_font_metadata_to_identify_ttf(self):
descriptor = TrackDescriptor.fromFfprobe(
{
"index": 4,
"codec_type": "attachment",
"disposition": {"default": 0},
"tags": {
"filename": "AmazonEmberTanuki-Italic.ttf",
"mimetype": "font/ttf",
},
},
subIndex=0,
)
self.assertIsNotNone(descriptor)
self.assertEqual(TrackType.ATTACHMENT, descriptor.getType())
self.assertEqual(AttachmentFormat.TTF, descriptor.getAttachmentFormat())
self.assertEqual(AttachmentFormat.TTF, descriptor.getFormatDescriptor())
self.assertEqual(TrackCodec.UNKNOWN, descriptor.getCodec())
def test_attachment_without_codec_name_still_probes_as_unknown_when_not_font(self):
descriptor = TrackDescriptor.fromFfprobe(
{
"index": 9,
"codec_type": "attachment",
"disposition": {"default": 0},
"tags": {
"filename": "cover.bin",
"mimetype": "application/octet-stream",
},
},
subIndex=0,
)
self.assertIsNotNone(descriptor)
self.assertEqual(TrackType.ATTACHMENT, descriptor.getType())
self.assertEqual(AttachmentFormat.UNKNOWN, descriptor.getAttachmentFormat())
self.assertEqual(TrackCodec.UNKNOWN, descriptor.getCodec())
def test_media_descriptor_from_boruto_probe_json_handles_attachment_streams_without_codec_name(self):
probe_payload = json.loads(
(ASSETS_ROOT / "ffprobe.out.json").read_text(encoding="utf-8")
)
descriptor = MediaDescriptor.fromFfprobe(
{"logger": None},
probe_payload["format"],
probe_payload["streams"],
)
track_descriptors = descriptor.getTrackDescriptors()
attachment_tracks = descriptor.getAttachmentTracks()
self.assertEqual(14, len(track_descriptors))
self.assertEqual(10, len(attachment_tracks))
self.assertTrue(
all(track.getAttachmentFormat() == AttachmentFormat.TTF for track in attachment_tracks)
)
if __name__ == "__main__":
unittest.main()

View File

@@ -172,20 +172,84 @@ fetch_remote_state() {
git fetch "${ORIGIN_REMOTE}" "${DEV_BRANCH}" "${MAIN_BRANCH}" --tags >/dev/null
}
require_branch_matches_remote() {
branch_divergence_counts() {
local branch="$1"
local local_sha=""
local remote_sha=""
local remote_only=""
local local_only=""
if ! git show-ref --verify --quiet "refs/remotes/${ORIGIN_REMOTE}/${branch}"; then
fail "Remote branch '${ORIGIN_REMOTE}/${branch}' does not exist."
fi
local_sha="$(git rev-parse "refs/heads/${branch}")"
remote_sha="$(git rev-parse "refs/remotes/${ORIGIN_REMOTE}/${branch}")"
read -r remote_only local_only < <(
git rev-list --left-right --count \
"refs/remotes/${ORIGIN_REMOTE}/${branch}...refs/heads/${branch}"
)
if [ "${local_sha}" != "${remote_sha}" ]; then
fail "Local branch '${branch}' is not up to date with '${ORIGIN_REMOTE}/${branch}'. Pull, rebase, or push first."
printf '%s %s\n' "${remote_only}" "${local_only}"
}
fast_forward_branch_to_remote() {
local branch="$1"
local remote_ref="refs/remotes/${ORIGIN_REMOTE}/${branch}"
local current_head=""
current_head="$(git rev-parse --abbrev-ref HEAD)"
printf "Fast-forwarding local branch '%s' to '%s/%s'...\n" \
"${branch}" \
"${ORIGIN_REMOTE}" \
"${branch}"
if [ "${current_head}" = "${branch}" ]; then
git merge --ff-only "${remote_ref}" >/dev/null
return 0
fi
git branch -f "${branch}" "${remote_ref}" >/dev/null
}
sync_release_source_branch() {
local branch="$1"
local remote_only=""
local local_only=""
read -r remote_only local_only < <(branch_divergence_counts "${branch}")
if [ "${remote_only}" -ne 0 ] && [ "${local_only}" -ne 0 ]; then
fail "Local branch '${branch}' has diverged from '${ORIGIN_REMOTE}/${branch}' (${local_only} local-only commit(s), ${remote_only} remote-only commit(s)). Reconcile the branches first."
fi
if [ "${remote_only}" -ne 0 ]; then
fast_forward_branch_to_remote "${branch}"
fi
if [ "${local_only}" -ne 0 ]; then
printf "Notice: local branch '%s' is ahead of '%s/%s' by %s commit(s); release will use the local tip.\n" \
"${branch}" \
"${ORIGIN_REMOTE}" \
"${branch}" \
"${local_only}"
fi
}
sync_release_target_branch() {
local branch="$1"
local remote_only=""
local local_only=""
read -r remote_only local_only < <(branch_divergence_counts "${branch}")
if [ "${remote_only}" -ne 0 ] && [ "${local_only}" -ne 0 ]; then
fail "Local branch '${branch}' has diverged from '${ORIGIN_REMOTE}/${branch}' (${local_only} local-only commit(s), ${remote_only} remote-only commit(s)). Reconcile the branches first."
fi
if [ "${local_only}" -ne 0 ]; then
fail "Local branch '${branch}' is ahead of '${ORIGIN_REMOTE}/${branch}' by ${local_only} commit(s). Push or reconcile first so the release starts from the published ${branch} tip."
fi
if [ "${remote_only}" -ne 0 ]; then
fast_forward_branch_to_remote "${branch}"
fi
}
@@ -249,13 +313,11 @@ print_release_plan() {
printf 'Dry run only. Planned steps:\n'
printf '1. Ensure current branch is %s and the worktree is clean.\n' "${DEV_BRANCH}"
printf '2. Fetch %s and verify local %s and %s exactly match %s/%s and %s/%s.\n' \
printf '2. Fetch %s, fast-forward local %s and %s from %s when safe, and fail on divergence or unpublished local %s commits.\n' \
"${ORIGIN_REMOTE}" \
"${DEV_BRANCH}" \
"${MAIN_BRANCH}" \
"${ORIGIN_REMOTE}" \
"${DEV_BRANCH}" \
"${ORIGIN_REMOTE}" \
"${MAIN_BRANCH}"
if [ "${SKIP_TESTS}" -eq 1 ]; then
printf '3. Skip the pre-release test gate.\n'
@@ -304,8 +366,8 @@ require_repo_state
require_dev_checkout
require_clean_worktree
fetch_remote_state
require_branch_matches_remote "${DEV_BRANCH}"
require_branch_matches_remote "${MAIN_BRANCH}"
sync_release_source_branch "${DEV_BRANCH}"
sync_release_target_branch "${MAIN_BRANCH}"
RELEASE_VERSION="$(resolve_release_version)"
RELEASE_TAG="v${RELEASE_VERSION}"
@@ -341,8 +403,8 @@ fi
run_pre_release_tests
require_clean_worktree
fetch_remote_state
require_branch_matches_remote "${DEV_BRANCH}"
require_branch_matches_remote "${MAIN_BRANCH}"
sync_release_source_branch "${DEV_BRANCH}"
sync_release_target_branch "${MAIN_BRANCH}"
require_release_tag_available "${RELEASE_VERSION}"
git switch "${MAIN_BRANCH}" >/dev/null