347 lines
11 KiB
Python
347 lines
11 KiB
Python
from __future__ import annotations
|
|
|
|
from dataclasses import dataclass, field
|
|
import json
|
|
import logging
|
|
import os
|
|
from pathlib import Path
|
|
import subprocess
|
|
import sys
|
|
from typing import Mapping
|
|
|
|
|
|
REPO_ROOT = Path(__file__).resolve().parents[2]
|
|
SRC_ROOT = REPO_ROOT / "src"
|
|
|
|
if str(SRC_ROOT) not in sys.path:
|
|
sys.path.insert(0, str(SRC_ROOT))
|
|
|
|
|
|
from ffx.audio_layout import AudioLayout
|
|
from ffx.database import databaseContext
|
|
from ffx.pattern_controller import PatternController
|
|
from ffx.show_controller import ShowController
|
|
from ffx.show_descriptor import ShowDescriptor
|
|
from ffx.track_descriptor import TrackDescriptor
|
|
from ffx.track_disposition import TrackDisposition
|
|
from ffx.track_type import TrackType
|
|
|
|
|
|
class StaticConfig:
|
|
def __init__(self, data: dict | None = None):
|
|
self._data = data or {}
|
|
|
|
def getData(self):
|
|
return self._data
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class SourceTrackSpec:
|
|
track_type: TrackType
|
|
identity: str | None = None
|
|
language: str | None = None
|
|
title: str | None = None
|
|
extra_tags: Mapping[str, str] = field(default_factory=dict)
|
|
dispositions: tuple[TrackDisposition, ...] = ()
|
|
subtitle_lines: tuple[str, ...] = ("subtitle line",)
|
|
attachment_name: str = "fixture.ttf"
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class PatternTrackSpec:
|
|
index: int
|
|
source_index: int
|
|
track_type: TrackType
|
|
tags: Mapping[str, str] = field(default_factory=dict)
|
|
dispositions: tuple[TrackDisposition, ...] = ()
|
|
audio_layout: AudioLayout = AudioLayout.LAYOUT_STEREO
|
|
|
|
|
|
def make_logger(name: str) -> logging.Logger:
|
|
logger = logging.getLogger(name)
|
|
logger.handlers = []
|
|
logger.setLevel(logging.DEBUG)
|
|
logger.propagate = False
|
|
logger.addHandler(logging.NullHandler())
|
|
return logger
|
|
|
|
|
|
def build_controller_context(database_path: Path) -> dict:
|
|
return {
|
|
"logger": make_logger(f"ffx-test-db-{database_path.stem}"),
|
|
"config": StaticConfig(),
|
|
"database": databaseContext(str(database_path)),
|
|
}
|
|
|
|
|
|
def dispose_controller_context(context: dict) -> None:
|
|
context["database"]["engine"].dispose()
|
|
|
|
|
|
def write_vtt(path: Path, lines: tuple[str, ...]) -> Path:
|
|
body = ["WEBVTT", ""]
|
|
for index, line in enumerate(lines):
|
|
start_ms = index * 600
|
|
end_ms = start_ms + 500
|
|
body.extend(
|
|
[
|
|
f"{start_ms // 3600000:02d}:{(start_ms // 60000) % 60:02d}:{(start_ms // 1000) % 60:02d}.{start_ms % 1000:03d} --> "
|
|
+ f"{end_ms // 3600000:02d}:{(end_ms // 60000) % 60:02d}:{(end_ms // 1000) % 60:02d}.{end_ms % 1000:03d}",
|
|
line,
|
|
"",
|
|
]
|
|
)
|
|
path.write_text("\n".join(body), encoding="utf-8")
|
|
return path
|
|
|
|
|
|
def create_source_fixture(
|
|
workdir: Path,
|
|
filename: str,
|
|
tracks: list[SourceTrackSpec],
|
|
duration_seconds: int = 1,
|
|
*,
|
|
video_encoder: str = "libx264",
|
|
video_encoder_options: tuple[str, ...] = (
|
|
"-preset",
|
|
"ultrafast",
|
|
"-crf",
|
|
"35",
|
|
"-pix_fmt",
|
|
"yuv420p",
|
|
),
|
|
audio_encoder: str = "aac",
|
|
audio_encoder_options: tuple[str, ...] = ("-b:a", "48k"),
|
|
subtitle_encoder: str = "webvtt",
|
|
) -> Path:
|
|
output_path = workdir / filename
|
|
|
|
has_video = any(track.track_type == TrackType.VIDEO for track in tracks)
|
|
has_audio = any(track.track_type == TrackType.AUDIO for track in tracks)
|
|
|
|
command = ["ffmpeg", "-y"]
|
|
|
|
input_indices: dict[str, int] = {}
|
|
next_input_index = 0
|
|
|
|
if has_video:
|
|
command += ["-f", "lavfi", "-i", "color=size=96x54:rate=2:color=black"]
|
|
input_indices["video"] = next_input_index
|
|
next_input_index += 1
|
|
|
|
if has_audio:
|
|
command += ["-f", "lavfi", "-i", "anullsrc=channel_layout=stereo:sample_rate=48000"]
|
|
input_indices["audio"] = next_input_index
|
|
next_input_index += 1
|
|
|
|
subtitle_input_indices: list[int] = []
|
|
subtitle_counter = 0
|
|
for track in tracks:
|
|
if track.track_type == TrackType.SUBTITLE:
|
|
subtitle_path = write_vtt(
|
|
workdir / f"{output_path.stem}_subtitle_{subtitle_counter}.vtt",
|
|
track.subtitle_lines,
|
|
)
|
|
command += ["-i", str(subtitle_path)]
|
|
subtitle_input_indices.append(next_input_index)
|
|
next_input_index += 1
|
|
subtitle_counter += 1
|
|
|
|
map_tokens: list[str] = []
|
|
metadata_tokens: list[str] = []
|
|
disposition_tokens: list[str] = []
|
|
attachment_tokens: list[str] = []
|
|
|
|
per_type_subindex: dict[TrackType, int] = {}
|
|
subtitle_input_cursor = 0
|
|
attachment_subindex = 0
|
|
|
|
for track in tracks:
|
|
if track.track_type == TrackType.VIDEO:
|
|
map_tokens += ["-map", f"{input_indices['video']}:v:0"]
|
|
stream_group = "v"
|
|
elif track.track_type == TrackType.AUDIO:
|
|
map_tokens += ["-map", f"{input_indices['audio']}:a:0"]
|
|
stream_group = "a"
|
|
elif track.track_type == TrackType.SUBTITLE:
|
|
map_tokens += ["-map", f"{subtitle_input_indices[subtitle_input_cursor]}:s:0"]
|
|
subtitle_input_cursor += 1
|
|
stream_group = "s"
|
|
elif track.track_type == TrackType.ATTACHMENT:
|
|
attachment_path = workdir / track.attachment_name
|
|
attachment_path.write_bytes(b"dummy font bytes")
|
|
attachment_tokens += [
|
|
"-attach",
|
|
str(attachment_path),
|
|
f"-metadata:s:t:{attachment_subindex}",
|
|
"mimetype=application/x-truetype-font",
|
|
f"-metadata:s:t:{attachment_subindex}",
|
|
f"filename={attachment_path.name}",
|
|
]
|
|
attachment_subindex += 1
|
|
continue
|
|
else:
|
|
raise ValueError(f"Unsupported track type {track.track_type}")
|
|
|
|
subindex = per_type_subindex.get(track.track_type, 0)
|
|
per_type_subindex[track.track_type] = subindex + 1
|
|
|
|
tags = {}
|
|
if track.identity is not None:
|
|
tags["THIS_IS"] = track.identity
|
|
if track.language is not None:
|
|
tags["language"] = track.language
|
|
if track.title is not None:
|
|
tags["title"] = track.title
|
|
tags.update(track.extra_tags)
|
|
|
|
for key, value in tags.items():
|
|
metadata_tokens += [f"-metadata:s:{stream_group}:{subindex}", f"{key}={value}"]
|
|
|
|
if track.dispositions:
|
|
disposition_tokens += [
|
|
f"-disposition:{stream_group}:{subindex}",
|
|
"+".join(disposition.label() for disposition in track.dispositions),
|
|
]
|
|
|
|
command += map_tokens
|
|
command += metadata_tokens
|
|
command += disposition_tokens
|
|
if has_video:
|
|
command += ["-c:v", video_encoder] + list(video_encoder_options)
|
|
|
|
if has_audio:
|
|
command += ["-c:a", audio_encoder] + list(audio_encoder_options)
|
|
|
|
if subtitle_input_indices:
|
|
command += ["-c:s", subtitle_encoder]
|
|
|
|
command += [
|
|
"-t",
|
|
str(duration_seconds),
|
|
"-shortest",
|
|
]
|
|
command += attachment_tokens
|
|
command += [str(output_path)]
|
|
|
|
completed = subprocess.run(command, cwd=workdir, capture_output=True, text=True)
|
|
if completed.returncode != 0:
|
|
raise AssertionError(f"ffmpeg fixture creation failed\nSTDOUT:\n{completed.stdout}\nSTDERR:\n{completed.stderr}")
|
|
|
|
return output_path
|
|
|
|
|
|
def add_show(context: dict, show_id: int = 1) -> None:
|
|
show_descriptor = ShowDescriptor(
|
|
id=show_id,
|
|
name="Bundle Test Show",
|
|
year=2000,
|
|
)
|
|
ShowController(context).updateShow(show_descriptor)
|
|
|
|
|
|
def prepare_pattern_database(database_path: Path, filename_pattern: str, track_specs: list[PatternTrackSpec], show_id: int = 1) -> None:
|
|
context = build_controller_context(database_path)
|
|
try:
|
|
add_show(context, show_id=show_id)
|
|
track_descriptors = []
|
|
for track in track_specs:
|
|
kwargs = {
|
|
TrackDescriptor.INDEX_KEY: track.index,
|
|
TrackDescriptor.SOURCE_INDEX_KEY: track.source_index,
|
|
TrackDescriptor.TRACK_TYPE_KEY: track.track_type,
|
|
TrackDescriptor.TAGS_KEY: dict(track.tags),
|
|
TrackDescriptor.DISPOSITION_SET_KEY: set(track.dispositions),
|
|
}
|
|
if track.track_type == TrackType.AUDIO:
|
|
kwargs[TrackDescriptor.AUDIO_LAYOUT_KEY] = track.audio_layout
|
|
track_descriptors.append(TrackDescriptor(**kwargs))
|
|
|
|
pattern_id = PatternController(context).savePatternSchema(
|
|
{
|
|
"show_id": show_id,
|
|
"pattern": filename_pattern,
|
|
},
|
|
trackDescriptors=track_descriptors,
|
|
)
|
|
if not pattern_id:
|
|
raise AssertionError("Failed to create pattern in test database")
|
|
finally:
|
|
dispose_controller_context(context)
|
|
|
|
|
|
def run_ffx_convert(workdir: Path, home_dir: Path, database_path: Path, *args: str) -> subprocess.CompletedProcess[str]:
|
|
env = os.environ.copy()
|
|
env["HOME"] = str(home_dir)
|
|
existing_pythonpath = env.get("PYTHONPATH", "")
|
|
env["PYTHONPATH"] = str(SRC_ROOT) if not existing_pythonpath else f"{SRC_ROOT}{os.pathsep}{existing_pythonpath}"
|
|
|
|
command = [
|
|
sys.executable,
|
|
"-m",
|
|
"ffx",
|
|
"--database-file",
|
|
str(database_path),
|
|
"convert",
|
|
*args,
|
|
]
|
|
return subprocess.run(command, cwd=workdir, env=env, capture_output=True, text=True)
|
|
|
|
|
|
def ffprobe_json(path: Path) -> dict:
|
|
completed = subprocess.run(
|
|
[
|
|
"ffprobe",
|
|
"-hide_banner",
|
|
"-show_streams",
|
|
"-show_format",
|
|
"-of",
|
|
"json",
|
|
str(path),
|
|
],
|
|
capture_output=True,
|
|
text=True,
|
|
)
|
|
if completed.returncode != 0:
|
|
raise AssertionError(f"ffprobe failed for {path}\nSTDERR:\n{completed.stderr}")
|
|
return json.loads(completed.stdout)
|
|
|
|
|
|
def stream_tags(stream: dict) -> dict[str, str]:
|
|
return {str(key): str(value) for key, value in stream.get("tags", {}).items()}
|
|
|
|
|
|
def get_tag(stream: dict, key: str) -> str | None:
|
|
tags = stream_tags(stream)
|
|
for candidate in (key, key.lower(), key.upper()):
|
|
if candidate in tags:
|
|
return tags[candidate]
|
|
return None
|
|
|
|
|
|
def extract_first_subtitle_text(workdir: Path, media_path: Path) -> str:
|
|
extracted_path = workdir / f"{media_path.stem}.subtitle.vtt"
|
|
completed = subprocess.run(
|
|
[
|
|
"ffmpeg",
|
|
"-y",
|
|
"-i",
|
|
str(media_path),
|
|
"-map",
|
|
"0:s:0",
|
|
"-c",
|
|
"copy",
|
|
str(extracted_path),
|
|
],
|
|
cwd=workdir,
|
|
capture_output=True,
|
|
text=True,
|
|
)
|
|
if completed.returncode != 0:
|
|
raise AssertionError(f"Subtitle extraction failed\nSTDERR:\n{completed.stderr}")
|
|
return extracted_path.read_text(encoding="utf-8")
|
|
|
|
|
|
def expected_output_path(workdir: Path, source_filename: str) -> Path:
|
|
return workdir / f"out_{source_filename}"
|