Merge branch 'dev' of gitea.maveno.de:Javanaut/ffx into dev

This commit is contained in:
Javanaut
2026-04-23 16:31:19 +02:00
42 changed files with 1005 additions and 103 deletions

View File

@@ -7,6 +7,7 @@ import os
from pathlib import Path
import subprocess
import sys
from functools import lru_cache
from typing import Mapping
@@ -95,8 +96,69 @@ def write_vtt(path: Path, lines: tuple[str, ...]) -> Path:
return path
def create_source_fixture(workdir: Path, filename: str, tracks: list[SourceTrackSpec], duration_seconds: int = 1) -> Path:
@lru_cache(maxsize=None)
def _ffmpeg_encoder_is_available(encoder_name: str) -> bool:
completed = subprocess.run(
["ffmpeg", "-encoders"],
capture_output=True,
text=True,
)
if completed.returncode != 0:
return False
encoder_label = str(encoder_name).strip()
for line in completed.stdout.splitlines():
if not line.startswith(" "):
continue
tokens = line.split(maxsplit=2)
if len(tokens) >= 2 and tokens[1] == encoder_label:
return True
return False
def _resolve_fixture_video_encoder(
video_encoder: str,
video_encoder_options: tuple[str, ...],
) -> tuple[str, tuple[str, ...]]:
if video_encoder != "libx264":
return video_encoder, video_encoder_options
if _ffmpeg_encoder_is_available("libx264"):
return video_encoder, video_encoder_options
if _ffmpeg_encoder_is_available("libopenh264"):
# Keep fixture generation software-based when libx264 is missing.
return "libopenh264", ("-pix_fmt", "yuv420p")
return video_encoder, video_encoder_options
def create_source_fixture(
workdir: Path,
filename: str,
tracks: list[SourceTrackSpec],
duration_seconds: int = 1,
*,
video_encoder: str = "libx264",
video_encoder_options: tuple[str, ...] = (
"-preset",
"ultrafast",
"-crf",
"35",
"-pix_fmt",
"yuv420p",
),
audio_encoder: str = "aac",
audio_encoder_options: tuple[str, ...] = ("-b:a", "48k"),
subtitle_encoder: str = "webvtt",
) -> Path:
output_path = workdir / filename
video_encoder, video_encoder_options = _resolve_fixture_video_encoder(
video_encoder,
video_encoder_options,
)
has_video = any(track.track_type == TrackType.VIDEO for track in tracks)
has_audio = any(track.track_type == TrackType.AUDIO for track in tracks)
@@ -189,21 +251,16 @@ def create_source_fixture(workdir: Path, filename: str, tracks: list[SourceTrack
command += map_tokens
command += metadata_tokens
command += disposition_tokens
if has_video:
command += ["-c:v", video_encoder] + list(video_encoder_options)
if has_audio:
command += ["-c:a", audio_encoder] + list(audio_encoder_options)
if subtitle_input_indices:
command += ["-c:s", subtitle_encoder]
command += [
"-c:v",
"libx264",
"-preset",
"ultrafast",
"-crf",
"35",
"-pix_fmt",
"yuv420p",
"-c:a",
"aac",
"-b:a",
"48k",
"-c:s",
"webvtt",
"-t",
str(duration_seconds),
"-shortest",

View File

@@ -18,7 +18,7 @@ from ffx.track_type import TrackType # noqa: E402
class UnmuxSequenceTests(unittest.TestCase):
def test_h265_video_unmux_uses_annex_b_bitstream_filter(self):
def test_h265_video_unmux_uses_annex_b_bitstream_filter_without_forced_format(self):
track_descriptor = TrackDescriptor(
index=0,
sub_index=0,
@@ -46,8 +46,6 @@ class UnmuxSequenceTests(unittest.TestCase):
"copy",
"-bsf:v",
"hevc_mp4toannexb",
"-f",
"h265",
"episode_0_eng.h265",
],
sequence,

View File

@@ -1,5 +1,6 @@
from __future__ import annotations
import click
from pathlib import Path
import sys
import unittest
@@ -33,6 +34,9 @@ class StaticConfig:
class FfxControllerTests(unittest.TestCase):
def tearDown(self):
FfxController.isFfmpegEncoderAvailable.cache_clear()
def make_context(self, video_encoder: VideoEncoder) -> dict:
return {
"logger": get_ffx_logger(),
@@ -318,6 +322,61 @@ class FfxControllerTests(unittest.TestCase):
self.assertNotIn("libopus", commands[0])
self.assertFalse(any(token.startswith("-b:a") for token in commands[0]))
self.assertFalse(any(token.startswith("-filter:a") for token in commands[0]))
def test_generate_h264_tokens_prefers_libx264_when_available(self):
context = self.make_context(VideoEncoder.H264)
target_descriptor, source_descriptor = self.make_media_descriptors()
controller = FfxController(context, target_descriptor, source_descriptor)
with patch.object(
FfxController,
"getSupportedSoftwareH264Encoder",
return_value="libx264",
):
tokens = controller.generateH264Tokens(23)
self.assertEqual(
["-c:v:0", "libx264", "-preset", "slow", "-crf", "23"],
tokens,
)
def test_generate_h264_tokens_falls_back_to_libopenh264_and_logs_warning(self):
context = self.make_context(VideoEncoder.H264)
target_descriptor, source_descriptor = self.make_media_descriptors()
controller = FfxController(context, target_descriptor, source_descriptor)
with (
patch.object(
FfxController,
"getSupportedSoftwareH264Encoder",
return_value="libopenh264",
),
patch.object(context["logger"], "warning") as mocked_warning,
):
tokens = controller.generateH264Tokens(23)
self.assertEqual(
["-c:v:0", "libopenh264", "-pix_fmt", "yuv420p"],
tokens,
)
mocked_warning.assert_called_once_with(
"libx264 encoder unavailable; falling back to libopenh264 for H.264 encoding."
)
def test_generate_h264_tokens_raises_when_no_supported_software_encoder_exists(self):
context = self.make_context(VideoEncoder.H264)
target_descriptor, source_descriptor = self.make_media_descriptors()
controller = FfxController(context, target_descriptor, source_descriptor)
with patch.object(
FfxController,
"getSupportedSoftwareH264Encoder",
return_value=None,
):
with self.assertRaisesRegex(
click.ClickException,
"no supported software H.264 encoder is available",
):
controller.generateH264Tokens(23)
if __name__ == "__main__":

View File

@@ -2,6 +2,7 @@ from __future__ import annotations
from pathlib import Path
import sys
import tempfile
import unittest
@@ -16,6 +17,7 @@ from ffx.i18n import set_current_language # noqa: E402
from ffx.logging_utils import get_ffx_logger # noqa: E402
from ffx.track_codec import TrackCodec # noqa: E402
from ffx.track_type import TrackType # noqa: E402
from tests.support.ffx_bundle import SourceTrackSpec, create_source_fixture # noqa: E402
class StaticConfig:
@@ -39,25 +41,41 @@ class FilePropertiesAssetProbeTests(unittest.TestCase):
}
set_current_language("de")
media_path = (
Path(__file__).resolve().parents[1]
/ "assets"
/ "Boruto; Naruto Next Generations (2017) - 0069 Super-Chochos Liebestaumel - S01E0069.webm"
)
with tempfile.TemporaryDirectory() as tmpdir:
media_path = create_source_fixture(
Path(tmpdir),
"fixture.webm",
[
SourceTrackSpec(TrackType.VIDEO, identity="video-0"),
SourceTrackSpec(TrackType.AUDIO, identity="audio-1", language="eng"),
SourceTrackSpec(
TrackType.SUBTITLE,
identity="subtitle-2",
language="eng",
subtitle_lines=("Lorem ipsum dolor sit amet.",),
),
],
duration_seconds=3,
video_encoder="libvpx-vp9",
video_encoder_options=("-b:v", "0", "-crf", "45"),
audio_encoder="libopus",
audio_encoder_options=("-b:a", "48k"),
subtitle_encoder="webvtt",
)
file_properties = FileProperties(context, str(media_path))
tracks = file_properties.getMediaDescriptor().getTrackDescriptors()
file_properties = FileProperties(context, str(media_path))
tracks = file_properties.getMediaDescriptor().getTrackDescriptors()
subtitle_codecs = [
track.getCodec()
for track in tracks
if track.getType() == TrackType.SUBTITLE
]
subtitle_codecs = [
track.getCodec()
for track in tracks
if track.getType() == TrackType.SUBTITLE
]
self.assertIn(TrackCodec.VP9, [track.getCodec() for track in tracks])
self.assertIn(TrackCodec.OPUS, [track.getCodec() for track in tracks])
self.assertTrue(subtitle_codecs)
self.assertTrue(all(codec == TrackCodec.WEBVTT for codec in subtitle_codecs))
self.assertIn(TrackCodec.VP9, [track.getCodec() for track in tracks])
self.assertIn(TrackCodec.OPUS, [track.getCodec() for track in tracks])
self.assertTrue(subtitle_codecs)
self.assertTrue(all(codec == TrackCodec.WEBVTT for codec in subtitle_codecs))
if __name__ == "__main__":

View File

@@ -15,6 +15,7 @@ if str(SRC_ROOT) not in sys.path:
from ffx.logging_utils import get_ffx_logger # noqa: E402
from ffx.helper import LogLevel # noqa: E402
from ffx.media_descriptor import MediaDescriptor # noqa: E402
from ffx.metadata_editor import ( # noqa: E402
apply_metadata_edits,
@@ -33,6 +34,16 @@ class StaticConfig:
return {}
class NotificationCollector:
def __init__(self) -> None:
self.messages: list[str] = []
self.levels: list[LogLevel | None] = []
def __call__(self, message: str, level: LogLevel | None = None) -> None:
self.messages.append(message)
self.levels.append(level)
def make_context(*, dry_run: bool = False) -> dict:
return {
"logger": get_ffx_logger(),
@@ -151,7 +162,7 @@ class MetadataEditorTests(unittest.TestCase):
context = make_context(dry_run=True)
baseline_descriptor = make_descriptor()
draft_descriptor = baseline_descriptor.clone(context=context)
notifications = []
notifications = NotificationCollector()
expected_command = build_metadata_edit_command(
build_metadata_edit_context(context),
"/tmp/example.mkv",
@@ -170,12 +181,13 @@ class MetadataEditorTests(unittest.TestCase):
"/tmp/example.mkv",
baseline_descriptor,
draft_descriptor,
loggingHandler = notifications.append,
loggingHandler = notifications,
)
mocked_execute.assert_not_called()
mocked_replace.assert_not_called()
self.assertEqual(["ffmpeg dry-run prepared."], notifications)
self.assertEqual(["ffmpeg dry-run prepared."], notifications.messages)
self.assertEqual([None], notifications.levels)
self.assertEqual(
{
"applied": False,
@@ -204,7 +216,7 @@ class MetadataEditorTests(unittest.TestCase):
context["verbosity"] = 1
baseline_descriptor = make_descriptor()
draft_descriptor = baseline_descriptor.clone(context=context)
notifications = []
notifications = NotificationCollector()
with (
patch("ffx.metadata_editor.create_temporary_output_path", return_value="/tmp/.edit.mkv"),
@@ -216,11 +228,12 @@ class MetadataEditorTests(unittest.TestCase):
"/tmp/example.mkv",
baseline_descriptor,
draft_descriptor,
loggingHandler = notifications.append,
loggingHandler = notifications,
)
self.assertEqual(1, len(notifications))
self.assertTrue(notifications[0].startswith("ffmpeg: ffmpeg "))
self.assertEqual(1, len(notifications.messages))
self.assertTrue(notifications.messages[0].startswith("ffmpeg: ffmpeg "))
self.assertEqual([LogLevel.DEBUG], notifications.levels)
if __name__ == "__main__":

View File

@@ -14,6 +14,7 @@ if str(SRC_ROOT) not in sys.path:
from ffx.audio_layout import AudioLayout # noqa: E402
from ffx.attachment_format import AttachmentFormat # noqa: E402
from ffx.helper import DIFF_ADDED_KEY # noqa: E402
from ffx.iso_language import IsoLanguage # noqa: E402
from ffx.logging_utils import get_ffx_logger # noqa: E402
@@ -200,6 +201,32 @@ class TagTableScreenStateTests(unittest.TestCase):
self.assertEqual("German Audio", descriptor.getTitle())
self.assertEqual("value", descriptor.getTags()["KEEP"])
def test_track_details_screen_preserves_attachment_format_for_attachment_tracks(self):
screen = object.__new__(TrackDetailsScreen)
screen.context = {"logger": get_ffx_logger()}
screen._TrackDetailsScreen__trackDescriptor = None
screen._TrackDetailsScreen__patternId = 5
screen._TrackDetailsScreen__index = 4
screen._TrackDetailsScreen__subIndex = 0
screen._TrackDetailsScreen__trackCodec = TrackCodec.UNKNOWN
screen._TrackDetailsScreen__attachmentFormat = AttachmentFormat.TTF
screen._TrackDetailsScreen__draftTrackTags = {"filename": "font.ttf", "mimetype": "font/ttf"}
widgets = {
"#type_select": FakeValueWidget(TrackType.ATTACHMENT),
"#audio_layout_select": FakeValueWidget(AudioLayout.LAYOUT_UNDEFINED),
"#language_select": FakeValueWidget(Select.NULL),
"#title_input": FakeInputWidget(""),
"#dispositions_selection_list": FakeSelectionListWidget(set()),
}
screen.query_one = lambda selector, _widget_type=None: widgets[selector]
descriptor = screen.getTrackDescriptorFromInput()
self.assertEqual(TrackType.ATTACHMENT, descriptor.getType())
self.assertEqual(AttachmentFormat.TTF, descriptor.getAttachmentFormat())
self.assertEqual(TrackCodec.UNKNOWN, descriptor.getCodec())
def test_track_details_screen_auto_sets_localized_title_from_selected_language(self):
set_current_language("de")
screen = object.__new__(TrackDetailsScreen)
@@ -695,6 +722,33 @@ class TagTableScreenStateTests(unittest.TestCase):
self.assertIn("English Full", screen.tracksTable.rows["row-0"])
self.assertIs(target_track, screen.getSelectedTrackDescriptor())
def test_inspect_details_screen_update_tracks_blanks_irrelevant_attachment_fields(self):
attachment_track = TrackDescriptor(
index=4,
source_index=4,
sub_index=0,
track_type=TrackType.ATTACHMENT,
attachment_format=AttachmentFormat.TTF,
tags={"filename": "font.ttf", "mimetype": "font/ttf"},
)
screen = object.__new__(InspectDetailsScreen)
screen.tracksTable = FakeTagTable()
screen._sourceMediaDescriptor = FakeMediaDescriptor([attachment_track])
screen._targetMediaDescriptor = None
screen._currentPattern = None
screen._trackRowData = {}
screen._applyNormalization = False
screen.updateTracks()
row = screen.tracksTable.rows["row-0"]
self.assertEqual("4", row[0])
self.assertEqual(" ", row[3])
self.assertEqual(" ", row[7])
self.assertEqual(" ", row[8])
def test_inspect_details_screen_maps_target_selection_back_to_source_track(self):
source_track = TrackDescriptor(
index=3,

View File

@@ -0,0 +1,87 @@
from __future__ import annotations
import json
from pathlib import Path
import sys
import unittest
SRC_ROOT = Path(__file__).resolve().parents[2] / "src"
if str(SRC_ROOT) not in sys.path:
sys.path.insert(0, str(SRC_ROOT))
from ffx.attachment_format import AttachmentFormat # noqa: E402
from ffx.media_descriptor import MediaDescriptor # noqa: E402
from ffx.track_codec import TrackCodec # noqa: E402
from ffx.track_descriptor import TrackDescriptor # noqa: E402
from ffx.track_type import TrackType # noqa: E402
ASSETS_ROOT = Path(__file__).resolve().parents[1] / "assets"
class TrackDescriptorProbeTests(unittest.TestCase):
def test_attachment_without_codec_name_uses_font_metadata_to_identify_ttf(self):
descriptor = TrackDescriptor.fromFfprobe(
{
"index": 4,
"codec_type": "attachment",
"disposition": {"default": 0},
"tags": {
"filename": "AmazonEmberTanuki-Italic.ttf",
"mimetype": "font/ttf",
},
},
subIndex=0,
)
self.assertIsNotNone(descriptor)
self.assertEqual(TrackType.ATTACHMENT, descriptor.getType())
self.assertEqual(AttachmentFormat.TTF, descriptor.getAttachmentFormat())
self.assertEqual(AttachmentFormat.TTF, descriptor.getFormatDescriptor())
self.assertEqual(TrackCodec.UNKNOWN, descriptor.getCodec())
def test_attachment_without_codec_name_still_probes_as_unknown_when_not_font(self):
descriptor = TrackDescriptor.fromFfprobe(
{
"index": 9,
"codec_type": "attachment",
"disposition": {"default": 0},
"tags": {
"filename": "cover.bin",
"mimetype": "application/octet-stream",
},
},
subIndex=0,
)
self.assertIsNotNone(descriptor)
self.assertEqual(TrackType.ATTACHMENT, descriptor.getType())
self.assertEqual(AttachmentFormat.UNKNOWN, descriptor.getAttachmentFormat())
self.assertEqual(TrackCodec.UNKNOWN, descriptor.getCodec())
def test_media_descriptor_from_boruto_probe_json_handles_attachment_streams_without_codec_name(self):
probe_payload = json.loads(
(ASSETS_ROOT / "ffprobe.out.json").read_text(encoding="utf-8")
)
descriptor = MediaDescriptor.fromFfprobe(
{"logger": None},
probe_payload["format"],
probe_payload["streams"],
)
track_descriptors = descriptor.getTrackDescriptors()
attachment_tracks = descriptor.getAttachmentTracks()
self.assertEqual(14, len(track_descriptors))
self.assertEqual(10, len(attachment_tracks))
self.assertTrue(
all(track.getAttachmentFormat() == AttachmentFormat.TTF for track in attachment_tracks)
)
if __name__ == "__main__":
unittest.main()