2 Commits

Author SHA1 Message Date
Javanaut
eedcbaed0a Merge branch 'dev' of gitea.maveno.de:Javanaut/ffx into dev 2026-04-23 16:31:19 +02:00
Javanaut
653ce7b417 Copy audio and video flags 2026-04-23 16:30:15 +02:00
4 changed files with 278 additions and 32 deletions

View File

@@ -68,6 +68,14 @@ CUT_OPTION_HELP = (
+ "or --cut START,DURATION for an explicit start and duration. " + "or --cut START,DURATION for an explicit start and duration. "
+ "Omit to disable." + "Omit to disable."
) )
COPY_VIDEO_OPTION_HELP = (
"Copy video streams without re-encoding. Skips video encoder options "
+ "and video filters."
)
COPY_AUDIO_OPTION_HELP = (
"Copy audio streams without re-encoding. Skips audio encoder options "
+ "and audio filters."
)
def normalizeNicenessOption(ctx, param, value): def normalizeNicenessOption(ctx, param, value):
@@ -915,6 +923,8 @@ def checkUniqueDispositions(context, mediaDescriptor: MediaDescriptor):
@click.option('-l', '--label', type=str, default='', help='Label to be used as filename prefix') @click.option('-l', '--label', type=str, default='', help='Label to be used as filename prefix')
@click.option('-v', '--video-encoder', type=str, default=DEFAULT_VIDEO_ENCODER_LABEL, help=f"Target video encoder (vp9, av1, h264 or copy)", show_default=True) @click.option('-v', '--video-encoder', type=str, default=DEFAULT_VIDEO_ENCODER_LABEL, help=f"Target video encoder (vp9, av1, h264 or copy)", show_default=True)
@click.option('--copy-video', is_flag=True, default=False, help=COPY_VIDEO_OPTION_HELP)
@click.option('--copy-audio', is_flag=True, default=False, help=COPY_AUDIO_OPTION_HELP)
@click.option('-q', '--quality', type=str, default="", help=f"Quality settings to be used with VP9/H264 encoder") @click.option('-q', '--quality', type=str, default="", help=f"Quality settings to be used with VP9/H264 encoder")
@click.option('-p', '--preset', type=str, default="", help=f"Quality preset to be used with AV1 encoder") @click.option('-p', '--preset', type=str, default="", help=f"Quality preset to be used with AV1 encoder")
@@ -1011,6 +1021,8 @@ def convert(ctx,
paths, paths,
label, label,
video_encoder, video_encoder,
copy_video,
copy_audio,
quality, quality,
preset, preset,
stereo_bitrate, stereo_bitrate,
@@ -1090,9 +1102,12 @@ def convert(ctx,
context = ctx.obj context = ctx.obj
context['video_encoder'] = VideoEncoder.fromLabel(video_encoder) context['video_encoder'] = VideoEncoder.fromLabel(video_encoder)
context['copy_video'] = copy_video
context['copy_audio'] = copy_audio
copyVideoEffective = copy_video or context['video_encoder'] == VideoEncoder.COPY
# HINT: quick and dirty override for h264, todo improve # HINT: quick and dirty override for h264, todo improve
if context['video_encoder'] in (VideoEncoder.H264, VideoEncoder.COPY): if context['video_encoder'] in (VideoEncoder.H264, VideoEncoder.COPY) or copy_video or copy_audio:
targetFormat = '' targetFormat = ''
targetExtension = 'mkv' targetExtension = 'mkv'
else: else:
@@ -1225,36 +1240,54 @@ def convert(ctx,
tc = TmdbController() if context['use_tmdb'] else None tc = TmdbController() if context['use_tmdb'] else None
qualityKwargs = {QualityFilter.QUALITY_KEY: str(quality)} if copyVideoEffective and quality:
ctx.obj['logger'].warning("Ignoring quality settings because video is being copied")
qualityKwargs = {
QualityFilter.QUALITY_KEY: "" if copyVideoEffective else str(quality)
}
qf = QualityFilter(**qualityKwargs) qf = QualityFilter(**qualityKwargs)
if context['video_encoder'] == VideoEncoder.AV1 and preset: if context['video_encoder'] == VideoEncoder.AV1 and preset and not copyVideoEffective:
presetKwargs = {PresetFilter.PRESET_KEY: preset} presetKwargs = {PresetFilter.PRESET_KEY: preset}
PresetFilter(**presetKwargs) PresetFilter(**presetKwargs)
cf = None cf = None
# if crop != 'none': # if crop != 'none':
if crop == 'auto': videoFilterOptionsRequested = (
crop != 'none'
or deinterlace != 'none'
or denoise != 'none'
or denoise_strength
or denoise_patch_size
or denoise_chroma_patch_size
or denoise_research_window
or denoise_chroma_research_window
)
if copyVideoEffective and videoFilterOptionsRequested:
ctx.obj['logger'].warning("Ignoring video filter options because video is being copied")
if crop == 'auto' and not copyVideoEffective:
cropKwargs = {} cropKwargs = {}
cf = CropFilter(**cropKwargs) cf = CropFilter(**cropKwargs)
denoiseKwargs = {} denoiseKwargs = {}
if denoise_strength: if denoise_strength and not copyVideoEffective:
denoiseKwargs[NlmeansFilter.STRENGTH_KEY] = denoise_strength denoiseKwargs[NlmeansFilter.STRENGTH_KEY] = denoise_strength
if denoise_patch_size: if denoise_patch_size and not copyVideoEffective:
denoiseKwargs[NlmeansFilter.PATCH_SIZE_KEY] = denoise_patch_size denoiseKwargs[NlmeansFilter.PATCH_SIZE_KEY] = denoise_patch_size
if denoise_chroma_patch_size: if denoise_chroma_patch_size and not copyVideoEffective:
denoiseKwargs[NlmeansFilter.CHROMA_PATCH_SIZE_KEY] = denoise_chroma_patch_size denoiseKwargs[NlmeansFilter.CHROMA_PATCH_SIZE_KEY] = denoise_chroma_patch_size
if denoise_research_window: if denoise_research_window and not copyVideoEffective:
denoiseKwargs[NlmeansFilter.RESEARCH_WINDOW_KEY] = denoise_research_window denoiseKwargs[NlmeansFilter.RESEARCH_WINDOW_KEY] = denoise_research_window
if denoise_chroma_research_window: if denoise_chroma_research_window and not copyVideoEffective:
denoiseKwargs[NlmeansFilter.CHROMA_RESEARCH_WINDOW_KEY] = denoise_chroma_research_window denoiseKwargs[NlmeansFilter.CHROMA_RESEARCH_WINDOW_KEY] = denoise_chroma_research_window
if denoise != 'none' or denoiseKwargs: if not copyVideoEffective and (denoise != 'none' or denoiseKwargs):
NlmeansFilter(**denoiseKwargs) NlmeansFilter(**denoiseKwargs)
if deinterlace != 'none': if deinterlace != 'none' and not copyVideoEffective:
DeinterlaceFilter() DeinterlaceFilter()
chainYield = list(qf.getChainYield()) chainYield = list(qf.getChainYield())

View File

@@ -172,6 +172,16 @@ class FfxController():
def generateAudioCopyTokens(self, subIndex): def generateAudioCopyTokens(self, subIndex):
return [f"-c:a:{int(subIndex)}", 'copy'] return [f"-c:a:{int(subIndex)}", 'copy']
def generateVideoCopyAllTokens(self):
if self.__targetMediaDescriptor.getTrackDescriptors(trackType=TrackType.VIDEO):
return ["-c:v", "copy"]
return []
def generateAudioCopyAllTokens(self):
if self.__targetMediaDescriptor.getTrackDescriptors(trackType=TrackType.AUDIO):
return ["-c:a", "copy"]
return []
def generateSubtitleCopyTokens(self, subIndex): def generateSubtitleCopyTokens(self, subIndex):
return [f"-c:s:{int(subIndex)}", 'copy'] return [f"-c:s:{int(subIndex)}", 'copy']
@@ -292,6 +302,12 @@ class FfxController():
return audioTokens return audioTokens
def generateAudioProcessingTokens(self):
if self.__context.get('copy_audio', False):
return self.generateAudioCopyAllTokens()
return self.generateAudioEncodingTokens()
def runJob(self, def runJob(self,
sourcePath, sourcePath,
targetPath, targetPath,
@@ -305,6 +321,7 @@ class FfxController():
videoEncoder: VideoEncoder = self.__context.get('video_encoder', VideoEncoder.VP9) videoEncoder: VideoEncoder = self.__context.get('video_encoder', VideoEncoder.VP9)
copyVideo = self.__context.get('copy_video', False) or videoEncoder == VideoEncoder.COPY
qualityFilters = [fy for fy in chainIteration if fy['identifier'] == 'quality'] qualityFilters = [fy for fy in chainIteration if fy['identifier'] == 'quality']
@@ -315,30 +332,35 @@ class FfxController():
deinterlaceFilters = [fy for fy in chainIteration if fy['identifier'] == 'bwdif'] deinterlaceFilters = [fy for fy in chainIteration if fy['identifier'] == 'bwdif']
if qualityFilters and (quality := qualityFilters[0]['parameters']['quality']): if copyVideo:
self.__logger.info(f"Setting quality {quality} from command line") quality = None
elif currentPattern is not None and (quality := currentPattern.quality): self.__context['encoding_metadata_tags'] = {}
self.__logger.info(f"Setting quality {quality} from pattern")
elif currentShowDescriptor is not None and (quality := currentShowDescriptor.getQuality()):
self.__logger.info(f"Setting quality {quality} from show")
else: else:
quality = (QualityFilter.DEFAULT_H264_QUALITY if qualityFilters and (quality := qualityFilters[0]['parameters']['quality']):
if (videoEncoder == VideoEncoder.H264) self.__logger.info(f"Setting quality {quality} from command line")
else QualityFilter.DEFAULT_VP9_QUALITY) elif currentPattern is not None and (quality := currentPattern.quality):
self.__logger.info(f"Setting quality {quality} from default") self.__logger.info(f"Setting quality {quality} from pattern")
elif currentShowDescriptor is not None and (quality := currentShowDescriptor.getQuality()):
self.__logger.info(f"Setting quality {quality} from show")
else:
quality = (QualityFilter.DEFAULT_H264_QUALITY
if (videoEncoder == VideoEncoder.H264)
else QualityFilter.DEFAULT_VP9_QUALITY)
self.__logger.info(f"Setting quality {quality} from default")
preset = presetFilters[0]['parameters']['preset'] if presetFilters else PresetFilter.DEFAULT_PRESET preset = presetFilters[0]['parameters']['preset'] if presetFilters else PresetFilter.DEFAULT_PRESET
self.__context['encoding_metadata_tags'] = self.generateEncodingMetadataTags( if not copyVideo:
videoEncoder, self.__context['encoding_metadata_tags'] = self.generateEncodingMetadataTags(
quality, videoEncoder,
preset, quality,
) preset,
)
filterParamTokens = [] filterParamTokens = []
if cropArguments: if cropArguments and not copyVideo:
cropParams = (f"crop=" cropParams = (f"crop="
+ f"{cropArguments[CropFilter.OUTPUT_WIDTH_KEY]}" + f"{cropArguments[CropFilter.OUTPUT_WIDTH_KEY]}"
@@ -348,8 +370,9 @@ class FfxController():
filterParamTokens.append(cropParams) filterParamTokens.append(cropParams)
filterParamTokens.extend(denoiseFilters[0]['tokens'] if denoiseFilters else []) if not copyVideo:
filterParamTokens.extend(deinterlaceFilters[0]['tokens'] if deinterlaceFilters else []) filterParamTokens.extend(denoiseFilters[0]['tokens'] if denoiseFilters else [])
filterParamTokens.extend(deinterlaceFilters[0]['tokens'] if deinterlaceFilters else [])
deinterlaceFilters deinterlaceFilters
@@ -380,6 +403,29 @@ class FfxController():
self.executeCommandSequence(commandSequence) self.executeCommandSequence(commandSequence)
return return
if copyVideo:
commandSequence = (commandTokens
+ self.__targetMediaDescriptor.getImportFileTokens()
+ self.__targetMediaDescriptor.getInputMappingTokens(sourceMediaDescriptor = self.__sourceMediaDescriptor)
+ self.__mdcs.generateDispositionTokens())
commandSequence += self.__mdcs.generateMetadataTokens()
commandSequence += self.generateVideoCopyAllTokens()
commandSequence += self.generateAudioProcessingTokens()
if self.__context['perform_cut']:
commandSequence += self.generateCropTokens()
commandSequence += self.generateOutputTokens(targetPath,
targetFormat)
self.__logger.debug("FfxController.runJob(): Running command sequence")
if not self.__context['dry_run']:
self.executeCommandSequence(commandSequence)
return
if videoEncoder == VideoEncoder.AV1: if videoEncoder == VideoEncoder.AV1:
commandSequence = (commandTokens commandSequence = (commandTokens
@@ -396,7 +442,7 @@ class FfxController():
if td.getCodec != TrackCodec.PNG: if td.getCodec != TrackCodec.PNG:
commandSequence += self.generateAV1Tokens(int(quality), int(preset)) commandSequence += self.generateAV1Tokens(int(quality), int(preset))
commandSequence += self.generateAudioEncodingTokens() commandSequence += self.generateAudioProcessingTokens()
if self.__context['perform_cut']: if self.__context['perform_cut']:
commandSequence += self.generateCropTokens() commandSequence += self.generateCropTokens()
@@ -426,7 +472,7 @@ class FfxController():
if td.getCodec != TrackCodec.PNG: if td.getCodec != TrackCodec.PNG:
commandSequence += self.generateH264Tokens(int(quality)) commandSequence += self.generateH264Tokens(int(quality))
commandSequence += self.generateAudioEncodingTokens() commandSequence += self.generateAudioProcessingTokens()
if self.__context['perform_cut']: if self.__context['perform_cut']:
commandSequence += self.generateCropTokens() commandSequence += self.generateCropTokens()
@@ -485,7 +531,7 @@ class FfxController():
if td.getCodec != TrackCodec.PNG: if td.getCodec != TrackCodec.PNG:
commandSequence2 += self.generateVP9Pass2Tokens(int(quality)) commandSequence2 += self.generateVP9Pass2Tokens(int(quality))
commandSequence2 += self.generateAudioEncodingTokens() commandSequence2 += self.generateAudioProcessingTokens()
if self.__context['perform_cut']: if self.__context['perform_cut']:
commandSequence2 += self.generateCropTokens() commandSequence2 += self.generateCropTokens()

View File

@@ -263,6 +263,47 @@ class CliLazyImportTests(unittest.TestCase):
result["modules"], result["modules"],
) )
def test_convert_copy_flags_parse_without_loading_runtime_modules(self):
result = self.run_python(
textwrap.dedent(
f"""
import click
import json
import sys
sys.path.insert(0, {str(SRC_ROOT)!r})
import ffx.cli
context = ffx.cli.convert.make_context(
"convert",
["--copy-video", "--copy-audio"],
resilient_parsing=True,
)
help_output = ffx.cli.convert.get_help(click.Context(ffx.cli.convert))
print(json.dumps({{
"copy_video": context.params["copy_video"],
"copy_audio": context.params["copy_audio"],
"output": help_output,
"modules": {{
module_name: module_name in sys.modules
for module_name in {HEAVY_MODULES!r}
}},
}}))
"""
)
)
self.assertTrue(result["copy_video"])
self.assertTrue(result["copy_audio"])
self.assertIn("--copy-video", result["output"])
self.assertIn("--copy-audio", result["output"])
self.assertTrue(
all(not is_loaded for is_loaded in result["modules"].values()),
result["modules"],
)
def test_edit_command_avoids_database_bootstrap(self): def test_edit_command_avoids_database_bootstrap(self):
result = self.run_python( result = self.run_python(
textwrap.dedent( textwrap.dedent(

View File

@@ -15,6 +15,7 @@ if str(SRC_ROOT) not in sys.path:
from ffx.ffx_controller import FfxController # noqa: E402 from ffx.ffx_controller import FfxController # noqa: E402
from ffx.audio_layout import AudioLayout # noqa: E402
from ffx.logging_utils import get_ffx_logger # noqa: E402 from ffx.logging_utils import get_ffx_logger # noqa: E402
from ffx.media_descriptor import MediaDescriptor # noqa: E402 from ffx.media_descriptor import MediaDescriptor # noqa: E402
from ffx.show_descriptor import ShowDescriptor # noqa: E402 from ffx.show_descriptor import ShowDescriptor # noqa: E402
@@ -43,6 +44,8 @@ class FfxControllerTests(unittest.TestCase):
"video_encoder": video_encoder, "video_encoder": video_encoder,
"dry_run": False, "dry_run": False,
"perform_cut": False, "perform_cut": False,
"copy_video": False,
"copy_audio": False,
"bitrates": { "bitrates": {
"stereo": "112k", "stereo": "112k",
"ac3": "256k", "ac3": "256k",
@@ -75,6 +78,56 @@ class FfxControllerTests(unittest.TestCase):
) )
return descriptor, source_descriptor return descriptor, source_descriptor
def make_media_descriptors_with_audio(
self,
audio_layout: AudioLayout = AudioLayout.LAYOUT_STEREO,
) -> tuple[MediaDescriptor, MediaDescriptor]:
descriptor = MediaDescriptor(
track_descriptors=[
TrackDescriptor(
index=0,
source_index=0,
sub_index=0,
track_type=TrackType.VIDEO,
codec_name=TrackCodec.H264,
),
TrackDescriptor(
index=1,
source_index=1,
sub_index=0,
track_type=TrackType.AUDIO,
codec_name=TrackCodec.AAC,
audio_layout=audio_layout,
),
]
)
source_descriptor = MediaDescriptor(
track_descriptors=[
TrackDescriptor(
index=0,
source_index=0,
sub_index=0,
track_type=TrackType.VIDEO,
codec_name=TrackCodec.H264,
),
TrackDescriptor(
index=1,
source_index=1,
sub_index=0,
track_type=TrackType.AUDIO,
codec_name=TrackCodec.AAC,
audio_layout=audio_layout,
),
]
)
return descriptor, source_descriptor
def assert_token_pair(self, command: list[str], first: str, second: str):
self.assertTrue(
any(command[index:index + 2] == [first, second] for index in range(len(command) - 1)),
command,
)
def test_vp9_run_job_emits_file_level_encoding_quality_metadata(self): def test_vp9_run_job_emits_file_level_encoding_quality_metadata(self):
context = self.make_context(VideoEncoder.VP9) context = self.make_context(VideoEncoder.VP9)
target_descriptor, source_descriptor = self.make_media_descriptors() target_descriptor, source_descriptor = self.make_media_descriptors()
@@ -196,6 +249,79 @@ class FfxControllerTests(unittest.TestCase):
self.assertIn("ENCODING_QUALITY=19", commands[0]) self.assertIn("ENCODING_QUALITY=19", commands[0])
mocked_info.assert_any_call("Setting quality 19 from pattern") mocked_info.assert_any_call("Setting quality 19 from pattern")
def test_copy_video_uses_single_copy_command_without_video_encoding_options(self):
context = self.make_context(VideoEncoder.VP9)
context["copy_video"] = True
target_descriptor, source_descriptor = self.make_media_descriptors_with_audio()
controller = FfxController(context, target_descriptor, source_descriptor)
commands = []
with patch.object(
controller,
"executeCommandSequence",
side_effect=lambda command: commands.append(command) or ("", "", 0),
):
controller.runJob(
"input.mkv",
"output.mkv",
chainIteration=[
{
"identifier": "quality",
"parameters": {"quality": 27},
},
{
"identifier": "nlmeans",
"parameters": {},
"tokens": ["nlmeans=s=2.0"],
},
],
cropArguments={
"output_width": 1280,
"output_height": 720,
"x_offset": 0,
"y_offset": 0,
},
)
self.assertEqual(1, len(commands))
self.assert_token_pair(commands[0], "-c:v", "copy")
self.assertIn("libopus", commands[0])
self.assertNotIn("libvpx-vp9", commands[0])
self.assertNotIn("-pass", commands[0])
self.assertNotIn("-vf", commands[0])
self.assertFalse(any(token.startswith("ENCODING_QUALITY=") for token in commands[0]))
def test_copy_audio_uses_audio_copy_without_audio_encoding_options(self):
context = self.make_context(VideoEncoder.H264)
context["copy_audio"] = True
target_descriptor, source_descriptor = self.make_media_descriptors_with_audio(
AudioLayout.LAYOUT_5_1
)
controller = FfxController(context, target_descriptor, source_descriptor)
commands = []
with patch.object(
controller,
"executeCommandSequence",
side_effect=lambda command: commands.append(command) or ("", "", 0),
):
controller.runJob(
"input.mkv",
"output.mkv",
chainIteration=[
{
"identifier": "quality",
"parameters": {"quality": 21},
}
],
)
self.assertEqual(1, len(commands))
self.assert_token_pair(commands[0], "-c:a", "copy")
self.assertIn("libx264", commands[0])
self.assertNotIn("libopus", commands[0])
self.assertFalse(any(token.startswith("-b:a") for token in commands[0]))
self.assertFalse(any(token.startswith("-filter:a") for token in commands[0]))
def test_generate_h264_tokens_prefers_libx264_when_available(self): def test_generate_h264_tokens_prefers_libx264_when_available(self):
context = self.make_context(VideoEncoder.H264) context = self.make_context(VideoEncoder.H264)
target_descriptor, source_descriptor = self.make_media_descriptors() target_descriptor, source_descriptor = self.make_media_descriptors()