from __future__ import annotations from dataclasses import dataclass, field import json import logging import os from pathlib import Path import subprocess import sys from typing import Mapping REPO_ROOT = Path(__file__).resolve().parents[2] SRC_ROOT = REPO_ROOT / "src" if str(SRC_ROOT) not in sys.path: sys.path.insert(0, str(SRC_ROOT)) from ffx.audio_layout import AudioLayout from ffx.database import databaseContext from ffx.pattern_controller import PatternController from ffx.show_controller import ShowController from ffx.show_descriptor import ShowDescriptor from ffx.track_controller import TrackController from ffx.track_descriptor import TrackDescriptor from ffx.track_disposition import TrackDisposition from ffx.track_type import TrackType class StaticConfig: def __init__(self, data: dict | None = None): self._data = data or {} def getData(self): return self._data @dataclass(frozen=True) class SourceTrackSpec: track_type: TrackType identity: str | None = None language: str | None = None title: str | None = None extra_tags: Mapping[str, str] = field(default_factory=dict) dispositions: tuple[TrackDisposition, ...] = () subtitle_lines: tuple[str, ...] = ("subtitle line",) attachment_name: str = "fixture.ttf" @dataclass(frozen=True) class PatternTrackSpec: index: int source_index: int track_type: TrackType tags: Mapping[str, str] = field(default_factory=dict) dispositions: tuple[TrackDisposition, ...] = () audio_layout: AudioLayout = AudioLayout.LAYOUT_STEREO def make_logger(name: str) -> logging.Logger: logger = logging.getLogger(name) logger.handlers = [] logger.setLevel(logging.DEBUG) logger.propagate = False logger.addHandler(logging.NullHandler()) return logger def build_controller_context(database_path: Path) -> dict: return { "logger": make_logger(f"ffx-test-db-{database_path.stem}"), "config": StaticConfig(), "database": databaseContext(str(database_path)), } def dispose_controller_context(context: dict) -> None: context["database"]["engine"].dispose() def write_vtt(path: Path, lines: tuple[str, ...]) -> Path: body = ["WEBVTT", ""] for index, line in enumerate(lines): start_ms = index * 600 end_ms = start_ms + 500 body.extend( [ f"{start_ms // 3600000:02d}:{(start_ms // 60000) % 60:02d}:{(start_ms // 1000) % 60:02d}.{start_ms % 1000:03d} --> " + f"{end_ms // 3600000:02d}:{(end_ms // 60000) % 60:02d}:{(end_ms // 1000) % 60:02d}.{end_ms % 1000:03d}", line, "", ] ) path.write_text("\n".join(body), encoding="utf-8") return path def create_source_fixture(workdir: Path, filename: str, tracks: list[SourceTrackSpec], duration_seconds: int = 1) -> Path: output_path = workdir / filename has_video = any(track.track_type == TrackType.VIDEO for track in tracks) has_audio = any(track.track_type == TrackType.AUDIO for track in tracks) command = ["ffmpeg", "-y"] input_indices: dict[str, int] = {} next_input_index = 0 if has_video: command += ["-f", "lavfi", "-i", "color=size=96x54:rate=2:color=black"] input_indices["video"] = next_input_index next_input_index += 1 if has_audio: command += ["-f", "lavfi", "-i", "anullsrc=channel_layout=stereo:sample_rate=48000"] input_indices["audio"] = next_input_index next_input_index += 1 subtitle_input_indices: list[int] = [] subtitle_counter = 0 for track in tracks: if track.track_type == TrackType.SUBTITLE: subtitle_path = write_vtt( workdir / f"{output_path.stem}_subtitle_{subtitle_counter}.vtt", track.subtitle_lines, ) command += ["-i", str(subtitle_path)] subtitle_input_indices.append(next_input_index) next_input_index += 1 subtitle_counter += 1 map_tokens: list[str] = [] metadata_tokens: list[str] = [] disposition_tokens: list[str] = [] attachment_tokens: list[str] = [] per_type_subindex: dict[TrackType, int] = {} subtitle_input_cursor = 0 attachment_subindex = 0 for track in tracks: if track.track_type == TrackType.VIDEO: map_tokens += ["-map", f"{input_indices['video']}:v:0"] stream_group = "v" elif track.track_type == TrackType.AUDIO: map_tokens += ["-map", f"{input_indices['audio']}:a:0"] stream_group = "a" elif track.track_type == TrackType.SUBTITLE: map_tokens += ["-map", f"{subtitle_input_indices[subtitle_input_cursor]}:s:0"] subtitle_input_cursor += 1 stream_group = "s" elif track.track_type == TrackType.ATTACHMENT: attachment_path = workdir / track.attachment_name attachment_path.write_bytes(b"dummy font bytes") attachment_tokens += [ "-attach", str(attachment_path), f"-metadata:s:t:{attachment_subindex}", "mimetype=application/x-truetype-font", f"-metadata:s:t:{attachment_subindex}", f"filename={attachment_path.name}", ] attachment_subindex += 1 continue else: raise ValueError(f"Unsupported track type {track.track_type}") subindex = per_type_subindex.get(track.track_type, 0) per_type_subindex[track.track_type] = subindex + 1 tags = {} if track.identity is not None: tags["THIS_IS"] = track.identity if track.language is not None: tags["language"] = track.language if track.title is not None: tags["title"] = track.title tags.update(track.extra_tags) for key, value in tags.items(): metadata_tokens += [f"-metadata:s:{stream_group}:{subindex}", f"{key}={value}"] if track.dispositions: disposition_tokens += [ f"-disposition:{stream_group}:{subindex}", "+".join(disposition.label() for disposition in track.dispositions), ] command += map_tokens command += metadata_tokens command += disposition_tokens command += [ "-c:v", "libx264", "-preset", "ultrafast", "-crf", "35", "-pix_fmt", "yuv420p", "-c:a", "aac", "-b:a", "48k", "-c:s", "webvtt", "-t", str(duration_seconds), "-shortest", ] command += attachment_tokens command += [str(output_path)] completed = subprocess.run(command, cwd=workdir, capture_output=True, text=True) if completed.returncode != 0: raise AssertionError(f"ffmpeg fixture creation failed\nSTDOUT:\n{completed.stdout}\nSTDERR:\n{completed.stderr}") return output_path def add_show_and_pattern(context: dict, filename_pattern: str, show_id: int = 1) -> int: show_descriptor = ShowDescriptor( id=show_id, name="Bundle Test Show", year=2000, ) ShowController(context).updateShow(show_descriptor) pattern_id = PatternController(context).addPattern( { "show_id": show_id, "pattern": filename_pattern, } ) if not pattern_id: raise AssertionError("Failed to create pattern in test database") return pattern_id def add_pattern_tracks(context: dict, pattern_id: int, track_specs: list[PatternTrackSpec]) -> None: track_controller = TrackController(context) for track in track_specs: kwargs = { TrackDescriptor.INDEX_KEY: track.index, TrackDescriptor.SOURCE_INDEX_KEY: track.source_index, TrackDescriptor.TRACK_TYPE_KEY: track.track_type, TrackDescriptor.TAGS_KEY: dict(track.tags), TrackDescriptor.DISPOSITION_SET_KEY: set(track.dispositions), } if track.track_type == TrackType.AUDIO: kwargs[TrackDescriptor.AUDIO_LAYOUT_KEY] = track.audio_layout track_controller.addTrack(TrackDescriptor(**kwargs), pattern_id) def prepare_pattern_database(database_path: Path, filename_pattern: str, track_specs: list[PatternTrackSpec], show_id: int = 1) -> None: context = build_controller_context(database_path) try: pattern_id = add_show_and_pattern(context, filename_pattern, show_id=show_id) add_pattern_tracks(context, pattern_id, track_specs) finally: dispose_controller_context(context) def run_ffx_convert(workdir: Path, home_dir: Path, database_path: Path, *args: str) -> subprocess.CompletedProcess[str]: env = os.environ.copy() env["HOME"] = str(home_dir) existing_pythonpath = env.get("PYTHONPATH", "") env["PYTHONPATH"] = str(SRC_ROOT) if not existing_pythonpath else f"{SRC_ROOT}{os.pathsep}{existing_pythonpath}" command = [ sys.executable, "-m", "ffx", "--database-file", str(database_path), "convert", *args, ] return subprocess.run(command, cwd=workdir, env=env, capture_output=True, text=True) def ffprobe_json(path: Path) -> dict: completed = subprocess.run( [ "ffprobe", "-hide_banner", "-show_streams", "-show_format", "-of", "json", str(path), ], capture_output=True, text=True, ) if completed.returncode != 0: raise AssertionError(f"ffprobe failed for {path}\nSTDERR:\n{completed.stderr}") return json.loads(completed.stdout) def stream_tags(stream: dict) -> dict[str, str]: return {str(key): str(value) for key, value in stream.get("tags", {}).items()} def get_tag(stream: dict, key: str) -> str | None: tags = stream_tags(stream) for candidate in (key, key.lower(), key.upper()): if candidate in tags: return tags[candidate] return None def extract_first_subtitle_text(workdir: Path, media_path: Path) -> str: extracted_path = workdir / f"{media_path.stem}.subtitle.vtt" completed = subprocess.run( [ "ffmpeg", "-y", "-i", str(media_path), "-map", "0:s:0", "-c", "copy", str(extracted_path), ], cwd=workdir, capture_output=True, text=True, ) if completed.returncode != 0: raise AssertionError(f"Subtitle extraction failed\nSTDERR:\n{completed.stderr}") return extracted_path.read_text(encoding="utf-8") def expected_output_path(workdir: Path, source_filename: str) -> Path: return workdir / f"out_{source_filename}"