Tidy up logging and rework tests from scratch

This commit is contained in:
Javanaut
2026-04-09 12:46:24 +02:00
parent f9c8b8ac5e
commit 60ae58500a
84 changed files with 1283 additions and 187 deletions

337
tests/support/ffx_bundle.py Normal file
View File

@@ -0,0 +1,337 @@
from __future__ import annotations
from dataclasses import dataclass, field
import json
import logging
import os
from pathlib import Path
import subprocess
import sys
from typing import Mapping
REPO_ROOT = Path(__file__).resolve().parents[2]
SRC_ROOT = REPO_ROOT / "src"
if str(SRC_ROOT) not in sys.path:
sys.path.insert(0, str(SRC_ROOT))
from ffx.audio_layout import AudioLayout
from ffx.database import databaseContext
from ffx.pattern_controller import PatternController
from ffx.show_controller import ShowController
from ffx.show_descriptor import ShowDescriptor
from ffx.track_controller import TrackController
from ffx.track_descriptor import TrackDescriptor
from ffx.track_disposition import TrackDisposition
from ffx.track_type import TrackType
class StaticConfig:
def __init__(self, data: dict | None = None):
self._data = data or {}
def getData(self):
return self._data
@dataclass(frozen=True)
class SourceTrackSpec:
track_type: TrackType
identity: str | None = None
language: str | None = None
title: str | None = None
extra_tags: Mapping[str, str] = field(default_factory=dict)
dispositions: tuple[TrackDisposition, ...] = ()
subtitle_lines: tuple[str, ...] = ("subtitle line",)
attachment_name: str = "fixture.ttf"
@dataclass(frozen=True)
class PatternTrackSpec:
index: int
source_index: int
track_type: TrackType
tags: Mapping[str, str] = field(default_factory=dict)
dispositions: tuple[TrackDisposition, ...] = ()
audio_layout: AudioLayout = AudioLayout.LAYOUT_STEREO
def make_logger(name: str) -> logging.Logger:
logger = logging.getLogger(name)
logger.handlers = []
logger.setLevel(logging.DEBUG)
logger.propagate = False
logger.addHandler(logging.NullHandler())
return logger
def build_controller_context(database_path: Path) -> dict:
return {
"logger": make_logger(f"ffx-test-db-{database_path.stem}"),
"config": StaticConfig(),
"database": databaseContext(str(database_path)),
}
def dispose_controller_context(context: dict) -> None:
context["database"]["engine"].dispose()
def write_vtt(path: Path, lines: tuple[str, ...]) -> Path:
body = ["WEBVTT", ""]
for index, line in enumerate(lines):
start_ms = index * 600
end_ms = start_ms + 500
body.extend(
[
f"{start_ms // 3600000:02d}:{(start_ms // 60000) % 60:02d}:{(start_ms // 1000) % 60:02d}.{start_ms % 1000:03d} --> "
+ f"{end_ms // 3600000:02d}:{(end_ms // 60000) % 60:02d}:{(end_ms // 1000) % 60:02d}.{end_ms % 1000:03d}",
line,
"",
]
)
path.write_text("\n".join(body), encoding="utf-8")
return path
def create_source_fixture(workdir: Path, filename: str, tracks: list[SourceTrackSpec], duration_seconds: int = 1) -> Path:
output_path = workdir / filename
has_video = any(track.track_type == TrackType.VIDEO for track in tracks)
has_audio = any(track.track_type == TrackType.AUDIO for track in tracks)
command = ["ffmpeg", "-y"]
input_indices: dict[str, int] = {}
next_input_index = 0
if has_video:
command += ["-f", "lavfi", "-i", "color=size=96x54:rate=2:color=black"]
input_indices["video"] = next_input_index
next_input_index += 1
if has_audio:
command += ["-f", "lavfi", "-i", "anullsrc=channel_layout=stereo:sample_rate=48000"]
input_indices["audio"] = next_input_index
next_input_index += 1
subtitle_input_indices: list[int] = []
subtitle_counter = 0
for track in tracks:
if track.track_type == TrackType.SUBTITLE:
subtitle_path = write_vtt(
workdir / f"{output_path.stem}_subtitle_{subtitle_counter}.vtt",
track.subtitle_lines,
)
command += ["-i", str(subtitle_path)]
subtitle_input_indices.append(next_input_index)
next_input_index += 1
subtitle_counter += 1
map_tokens: list[str] = []
metadata_tokens: list[str] = []
disposition_tokens: list[str] = []
attachment_tokens: list[str] = []
per_type_subindex: dict[TrackType, int] = {}
subtitle_input_cursor = 0
attachment_subindex = 0
for track in tracks:
if track.track_type == TrackType.VIDEO:
map_tokens += ["-map", f"{input_indices['video']}:v:0"]
stream_group = "v"
elif track.track_type == TrackType.AUDIO:
map_tokens += ["-map", f"{input_indices['audio']}:a:0"]
stream_group = "a"
elif track.track_type == TrackType.SUBTITLE:
map_tokens += ["-map", f"{subtitle_input_indices[subtitle_input_cursor]}:s:0"]
subtitle_input_cursor += 1
stream_group = "s"
elif track.track_type == TrackType.ATTACHMENT:
attachment_path = workdir / track.attachment_name
attachment_path.write_bytes(b"dummy font bytes")
attachment_tokens += [
"-attach",
str(attachment_path),
f"-metadata:s:t:{attachment_subindex}",
"mimetype=application/x-truetype-font",
f"-metadata:s:t:{attachment_subindex}",
f"filename={attachment_path.name}",
]
attachment_subindex += 1
continue
else:
raise ValueError(f"Unsupported track type {track.track_type}")
subindex = per_type_subindex.get(track.track_type, 0)
per_type_subindex[track.track_type] = subindex + 1
tags = {}
if track.identity is not None:
tags["THIS_IS"] = track.identity
if track.language is not None:
tags["language"] = track.language
if track.title is not None:
tags["title"] = track.title
tags.update(track.extra_tags)
for key, value in tags.items():
metadata_tokens += [f"-metadata:s:{stream_group}:{subindex}", f"{key}={value}"]
if track.dispositions:
disposition_tokens += [
f"-disposition:{stream_group}:{subindex}",
"+".join(disposition.label() for disposition in track.dispositions),
]
command += map_tokens
command += metadata_tokens
command += disposition_tokens
command += [
"-c:v",
"libx264",
"-preset",
"ultrafast",
"-crf",
"35",
"-pix_fmt",
"yuv420p",
"-c:a",
"aac",
"-b:a",
"48k",
"-c:s",
"webvtt",
"-t",
str(duration_seconds),
"-shortest",
]
command += attachment_tokens
command += [str(output_path)]
completed = subprocess.run(command, cwd=workdir, capture_output=True, text=True)
if completed.returncode != 0:
raise AssertionError(f"ffmpeg fixture creation failed\nSTDOUT:\n{completed.stdout}\nSTDERR:\n{completed.stderr}")
return output_path
def add_show_and_pattern(context: dict, filename_pattern: str, show_id: int = 1) -> int:
show_descriptor = ShowDescriptor(
id=show_id,
name="Bundle Test Show",
year=2000,
)
ShowController(context).updateShow(show_descriptor)
pattern_id = PatternController(context).addPattern(
{
"show_id": show_id,
"pattern": filename_pattern,
}
)
if not pattern_id:
raise AssertionError("Failed to create pattern in test database")
return pattern_id
def add_pattern_tracks(context: dict, pattern_id: int, track_specs: list[PatternTrackSpec]) -> None:
track_controller = TrackController(context)
for track in track_specs:
kwargs = {
TrackDescriptor.INDEX_KEY: track.index,
TrackDescriptor.SOURCE_INDEX_KEY: track.source_index,
TrackDescriptor.TRACK_TYPE_KEY: track.track_type,
TrackDescriptor.TAGS_KEY: dict(track.tags),
TrackDescriptor.DISPOSITION_SET_KEY: set(track.dispositions),
}
if track.track_type == TrackType.AUDIO:
kwargs[TrackDescriptor.AUDIO_LAYOUT_KEY] = track.audio_layout
track_controller.addTrack(TrackDescriptor(**kwargs), pattern_id)
def prepare_pattern_database(database_path: Path, filename_pattern: str, track_specs: list[PatternTrackSpec], show_id: int = 1) -> None:
context = build_controller_context(database_path)
try:
pattern_id = add_show_and_pattern(context, filename_pattern, show_id=show_id)
add_pattern_tracks(context, pattern_id, track_specs)
finally:
dispose_controller_context(context)
def run_ffx_convert(workdir: Path, home_dir: Path, database_path: Path, *args: str) -> subprocess.CompletedProcess[str]:
env = os.environ.copy()
env["HOME"] = str(home_dir)
existing_pythonpath = env.get("PYTHONPATH", "")
env["PYTHONPATH"] = str(SRC_ROOT) if not existing_pythonpath else f"{SRC_ROOT}{os.pathsep}{existing_pythonpath}"
command = [
sys.executable,
"-m",
"ffx",
"--database-file",
str(database_path),
"convert",
*args,
]
return subprocess.run(command, cwd=workdir, env=env, capture_output=True, text=True)
def ffprobe_json(path: Path) -> dict:
completed = subprocess.run(
[
"ffprobe",
"-hide_banner",
"-show_streams",
"-show_format",
"-of",
"json",
str(path),
],
capture_output=True,
text=True,
)
if completed.returncode != 0:
raise AssertionError(f"ffprobe failed for {path}\nSTDERR:\n{completed.stderr}")
return json.loads(completed.stdout)
def stream_tags(stream: dict) -> dict[str, str]:
return {str(key): str(value) for key, value in stream.get("tags", {}).items()}
def get_tag(stream: dict, key: str) -> str | None:
tags = stream_tags(stream)
for candidate in (key, key.lower(), key.upper()):
if candidate in tags:
return tags[candidate]
return None
def extract_first_subtitle_text(workdir: Path, media_path: Path) -> str:
extracted_path = workdir / f"{media_path.stem}.subtitle.vtt"
completed = subprocess.run(
[
"ffmpeg",
"-y",
"-i",
str(media_path),
"-map",
"0:s:0",
"-c",
"copy",
str(extracted_path),
],
cwd=workdir,
capture_output=True,
text=True,
)
if completed.returncode != 0:
raise AssertionError(f"Subtitle extraction failed\nSTDERR:\n{completed.stderr}")
return extracted_path.read_text(encoding="utf-8")
def expected_output_path(workdir: Path, source_filename: str) -> Path:
return workdir / f"out_{source_filename}"