from __future__ import annotations import click from pathlib import Path import sys import unittest from unittest.mock import patch from types import SimpleNamespace SRC_ROOT = Path(__file__).resolve().parents[2] / "src" if str(SRC_ROOT) not in sys.path: sys.path.insert(0, str(SRC_ROOT)) from ffx.ffx_controller import FfxController # noqa: E402 from ffx.audio_layout import AudioLayout # noqa: E402 from ffx.logging_utils import get_ffx_logger # noqa: E402 from ffx.media_descriptor import MediaDescriptor # noqa: E402 from ffx.show_descriptor import ShowDescriptor # noqa: E402 from ffx.track_codec import TrackCodec # noqa: E402 from ffx.track_descriptor import TrackDescriptor # noqa: E402 from ffx.track_type import TrackType # noqa: E402 from ffx.video_encoder import VideoEncoder # noqa: E402 class StaticConfig: def __init__(self, data: dict | None = None): self._data = data or {} def getData(self): return self._data class FfxControllerTests(unittest.TestCase): def tearDown(self): FfxController.isFfmpegEncoderAvailable.cache_clear() def make_context(self, video_encoder: VideoEncoder) -> dict: return { "logger": get_ffx_logger(), "config": StaticConfig(), "video_encoder": video_encoder, "dry_run": False, "perform_cut": False, "copy_video": False, "copy_audio": False, "bitrates": { "stereo": "112k", "ac3": "256k", "dts": "320k", }, } def make_media_descriptors(self) -> tuple[MediaDescriptor, MediaDescriptor]: descriptor = MediaDescriptor( track_descriptors=[ TrackDescriptor( index=0, source_index=0, sub_index=0, track_type=TrackType.VIDEO, codec_name=TrackCodec.H264, ) ] ) source_descriptor = MediaDescriptor( track_descriptors=[ TrackDescriptor( index=0, source_index=0, sub_index=0, track_type=TrackType.VIDEO, codec_name=TrackCodec.H264, ) ] ) return descriptor, source_descriptor def make_media_descriptors_with_audio( self, audio_layout: AudioLayout = AudioLayout.LAYOUT_STEREO, ) -> tuple[MediaDescriptor, MediaDescriptor]: descriptor = MediaDescriptor( track_descriptors=[ TrackDescriptor( index=0, source_index=0, sub_index=0, track_type=TrackType.VIDEO, codec_name=TrackCodec.H264, ), TrackDescriptor( index=1, source_index=1, sub_index=0, track_type=TrackType.AUDIO, codec_name=TrackCodec.AAC, audio_layout=audio_layout, ), ] ) source_descriptor = MediaDescriptor( track_descriptors=[ TrackDescriptor( index=0, source_index=0, sub_index=0, track_type=TrackType.VIDEO, codec_name=TrackCodec.H264, ), TrackDescriptor( index=1, source_index=1, sub_index=0, track_type=TrackType.AUDIO, codec_name=TrackCodec.AAC, audio_layout=audio_layout, ), ] ) return descriptor, source_descriptor def assert_token_pair(self, command: list[str], first: str, second: str): self.assertTrue( any(command[index:index + 2] == [first, second] for index in range(len(command) - 1)), command, ) def test_vp9_run_job_emits_file_level_encoding_quality_metadata(self): context = self.make_context(VideoEncoder.VP9) target_descriptor, source_descriptor = self.make_media_descriptors() controller = FfxController(context, target_descriptor, source_descriptor) commands = [] with ( patch.object( controller, "executeCommandSequence", side_effect=lambda command: commands.append(command) or ("", "", 0), ), patch("ffx.ffx_controller.os.path.exists", return_value=False), ): controller.runJob( "input.mkv", "output.webm", targetFormat="webm", chainIteration=[ { "identifier": "quality", "parameters": {"quality": 27}, } ], ) self.assertEqual(2, len(commands)) self.assertIn("-metadata:g", commands[1]) self.assertIn("ENCODING_QUALITY=27", commands[1]) self.assertFalse( any(token.startswith("ENCODING_PRESET=") for token in commands[1]) ) def test_av1_run_job_emits_file_level_quality_and_preset_metadata(self): context = self.make_context(VideoEncoder.AV1) target_descriptor, source_descriptor = self.make_media_descriptors() controller = FfxController(context, target_descriptor, source_descriptor) commands = [] with patch.object( controller, "executeCommandSequence", side_effect=lambda command: commands.append(command) or ("", "", 0), ): controller.runJob( "input.mkv", "output.webm", targetFormat="webm", chainIteration=[ { "identifier": "quality", "parameters": {"quality": 29}, }, { "identifier": "preset", "parameters": {"preset": 7}, }, ], ) self.assertEqual(1, len(commands)) self.assertIn("-metadata:g", commands[0]) self.assertIn("ENCODING_QUALITY=29", commands[0]) self.assertIn("ENCODING_PRESET=7", commands[0]) def test_run_job_uses_show_quality_when_pattern_quality_is_unset(self): context = self.make_context(VideoEncoder.H264) target_descriptor, source_descriptor = self.make_media_descriptors() controller = FfxController(context, target_descriptor, source_descriptor) commands = [] show_descriptor = ShowDescriptor(id=1, name="Show", year=2024, quality=23) pattern = SimpleNamespace(quality=0) with ( patch.object( controller, "executeCommandSequence", side_effect=lambda command: commands.append(command) or ("", "", 0), ), patch.object(context["logger"], "info") as mocked_info, ): controller.runJob( "input.mkv", "output.mkv", chainIteration=[], currentPattern=pattern, currentShowDescriptor=show_descriptor, ) self.assertEqual(1, len(commands)) self.assertIn("ENCODING_QUALITY=23", commands[0]) mocked_info.assert_any_call("Setting quality 23 from show") def test_run_job_prefers_pattern_quality_over_show_quality(self): context = self.make_context(VideoEncoder.H264) target_descriptor, source_descriptor = self.make_media_descriptors() controller = FfxController(context, target_descriptor, source_descriptor) commands = [] show_descriptor = ShowDescriptor(id=1, name="Show", year=2024, quality=23) pattern = SimpleNamespace(quality=19) with ( patch.object( controller, "executeCommandSequence", side_effect=lambda command: commands.append(command) or ("", "", 0), ), patch.object(context["logger"], "info") as mocked_info, ): controller.runJob( "input.mkv", "output.mkv", chainIteration=[], currentPattern=pattern, currentShowDescriptor=show_descriptor, ) self.assertEqual(1, len(commands)) self.assertIn("ENCODING_QUALITY=19", commands[0]) mocked_info.assert_any_call("Setting quality 19 from pattern") def test_copy_video_uses_single_copy_command_without_video_encoding_options(self): context = self.make_context(VideoEncoder.VP9) context["copy_video"] = True target_descriptor, source_descriptor = self.make_media_descriptors_with_audio() controller = FfxController(context, target_descriptor, source_descriptor) commands = [] with patch.object( controller, "executeCommandSequence", side_effect=lambda command: commands.append(command) or ("", "", 0), ): controller.runJob( "input.mkv", "output.mkv", chainIteration=[ { "identifier": "quality", "parameters": {"quality": 27}, }, { "identifier": "nlmeans", "parameters": {}, "tokens": ["nlmeans=s=2.0"], }, ], cropArguments={ "output_width": 1280, "output_height": 720, "x_offset": 0, "y_offset": 0, }, ) self.assertEqual(1, len(commands)) self.assert_token_pair(commands[0], "-c:v", "copy") self.assertIn("libopus", commands[0]) self.assertNotIn("libvpx-vp9", commands[0]) self.assertNotIn("-pass", commands[0]) self.assertNotIn("-vf", commands[0]) self.assertFalse(any(token.startswith("ENCODING_QUALITY=") for token in commands[0])) def test_copy_audio_uses_audio_copy_without_audio_encoding_options(self): context = self.make_context(VideoEncoder.H264) context["copy_audio"] = True target_descriptor, source_descriptor = self.make_media_descriptors_with_audio( AudioLayout.LAYOUT_5_1 ) controller = FfxController(context, target_descriptor, source_descriptor) commands = [] with patch.object( controller, "executeCommandSequence", side_effect=lambda command: commands.append(command) or ("", "", 0), ): controller.runJob( "input.mkv", "output.mkv", chainIteration=[ { "identifier": "quality", "parameters": {"quality": 21}, } ], ) self.assertEqual(1, len(commands)) self.assert_token_pair(commands[0], "-c:a", "copy") self.assertIn("libx264", commands[0]) self.assertNotIn("libopus", commands[0]) self.assertFalse(any(token.startswith("-b:a") for token in commands[0])) self.assertFalse(any(token.startswith("-filter:a") for token in commands[0])) def test_generate_h264_tokens_prefers_libx264_when_available(self): context = self.make_context(VideoEncoder.H264) target_descriptor, source_descriptor = self.make_media_descriptors() controller = FfxController(context, target_descriptor, source_descriptor) with patch.object( FfxController, "getSupportedSoftwareH264Encoder", return_value="libx264", ): tokens = controller.generateH264Tokens(23) self.assertEqual( ["-c:v:0", "libx264", "-preset", "slow", "-crf", "23"], tokens, ) def test_generate_h264_tokens_falls_back_to_libopenh264_and_logs_warning(self): context = self.make_context(VideoEncoder.H264) target_descriptor, source_descriptor = self.make_media_descriptors() controller = FfxController(context, target_descriptor, source_descriptor) with ( patch.object( FfxController, "getSupportedSoftwareH264Encoder", return_value="libopenh264", ), patch.object(context["logger"], "warning") as mocked_warning, ): tokens = controller.generateH264Tokens(23) self.assertEqual( ["-c:v:0", "libopenh264", "-pix_fmt", "yuv420p"], tokens, ) mocked_warning.assert_called_once_with( "libx264 encoder unavailable; falling back to libopenh264 for H.264 encoding." ) def test_generate_h264_tokens_raises_when_no_supported_software_encoder_exists(self): context = self.make_context(VideoEncoder.H264) target_descriptor, source_descriptor = self.make_media_descriptors() controller = FfxController(context, target_descriptor, source_descriptor) with patch.object( FfxController, "getSupportedSoftwareH264Encoder", return_value=None, ): with self.assertRaisesRegex( click.ClickException, "no supported software H.264 encoder is available", ): controller.generateH264Tokens(23) if __name__ == "__main__": unittest.main()