Files
ffx/src/ffx/ffx_controller.py
Javanaut a24b6dedaa ff
2026-04-12 18:26:39 +02:00

472 lines
19 KiB
Python

import os, click
from logging import Logger
from ffx.media_descriptor_change_set import MediaDescriptorChangeSet
from ffx.media_descriptor import MediaDescriptor
from ffx.audio_layout import AudioLayout
from ffx.track_type import TrackType
from ffx.track_codec import TrackCodec
from ffx.video_encoder import VideoEncoder
from ffx.process import executeProcess
from ffx.constants import (
DEFAULT_CONTAINER_EXTENSION,
DEFAULT_CONTAINER_FORMAT,
DEFAULT_VIDEO_ENCODER_LABEL,
DEFAULT_cut_start,
DEFAULT_cut_length,
FFMPEG_COMMAND_TOKENS,
FFMPEG_NULL_OUTPUT_TOKENS,
SUPPORTED_INPUT_FILE_EXTENSIONS,
)
from ffx.filter.quality_filter import QualityFilter
from ffx.filter.preset_filter import PresetFilter
from ffx.filter.crop_filter import CropFilter
from ffx.model.pattern import Pattern
class FfxController():
COMMAND_TOKENS = list(FFMPEG_COMMAND_TOKENS)
NULL_TOKENS = list(FFMPEG_NULL_OUTPUT_TOKENS) # -f null /dev/null
TEMP_FILE_NAME = "ffmpeg2pass-0.log"
DEFAULT_VIDEO_ENCODER = DEFAULT_VIDEO_ENCODER_LABEL
DEFAULT_FILE_FORMAT = DEFAULT_CONTAINER_FORMAT
DEFAULT_FILE_EXTENSION = DEFAULT_CONTAINER_EXTENSION
INPUT_FILE_EXTENSIONS = list(SUPPORTED_INPUT_FILE_EXTENSIONS)
CHANNEL_MAP_5_1 = 'FL-FL|FR-FR|FC-FC|LFE-LFE|SL-BL|SR-BR:5.1'
# SIGNATURE_TAGS = {'RECODED_WITH': 'FFX'}
def __init__(self,
context : dict,
targetMediaDescriptor : MediaDescriptor,
sourceMediaDescriptor : MediaDescriptor = None):
self.__context = context
self.__targetMediaDescriptor = targetMediaDescriptor
self.__sourceMediaDescriptor = sourceMediaDescriptor
self.__mdcs = MediaDescriptorChangeSet(context,
targetMediaDescriptor,
sourceMediaDescriptor)
self.__logger: Logger = context['logger']
def executeCommandSequence(self, commandSequence):
out, err, rc = executeProcess(commandSequence, context=self.__context)
if rc:
raise click.ClickException(f"Command resulted in error: rc={rc} error={err}")
return out, err, rc
def generateAV1Tokens(self, quality, preset, subIndex : int = 0):
return [f"-c:v:{int(subIndex)}", 'libsvtav1',
'-svtav1-params', f"crf={quality}:preset={preset}:tune=0:enable-overlays=1:scd=1:scm=0",
'-pix_fmt', 'yuv420p10le']
# -c:v libx264 -preset slow -crf 17
def generateH264Tokens(self, quality, subIndex : int = 0):
return [f"-c:v:{int(subIndex)}", 'libx264',
"-preset", "slow",
'-crf', str(quality)]
# -c:v:0 libvpx-vp9 -row-mt 1 -crf 32 -pass 1 -speed 4 -frame-parallel 0 -g 9999 -aq-mode 0
def generateVP9Pass1Tokens(self, quality, subIndex : int = 0):
return [f"-c:v:{int(subIndex)}",
'libvpx-vp9',
'-row-mt', '1',
'-crf', str(quality),
'-pass', '1',
'-speed', '4',
'-frame-parallel', '0',
'-g', '9999',
'-aq-mode', '0']
# -c:v:0 libvpx-vp9 -row-mt 1 -crf 32 -pass 2 -frame-parallel 0 -g 9999 -aq-mode 0 -auto-alt-ref 1 -lag-in-frames 25
def generateVP9Pass2Tokens(self, quality, subIndex : int = 0):
return [f"-c:v:{int(subIndex)}",
'libvpx-vp9',
'-row-mt', '1',
'-crf', str(quality),
'-pass', '2',
'-frame-parallel', '0',
'-g', '9999',
'-aq-mode', '0',
'-auto-alt-ref', '1',
'-lag-in-frames', '25']
def generateVideoCopyTokens(self, subIndex):
return [f"-c:v:{int(subIndex)}",
'copy']
def generateAudioCopyTokens(self, subIndex):
return [f"-c:a:{int(subIndex)}", 'copy']
def generateSubtitleCopyTokens(self, subIndex):
return [f"-c:s:{int(subIndex)}", 'copy']
def generateAttachmentCopyTokens(self, subIndex):
return [f"-c:t:{int(subIndex)}", 'copy']
def generateCopyTokens(self):
copyTokens = []
for trackDescriptor in self.__targetMediaDescriptor.getTrackDescriptors(trackType=TrackType.VIDEO):
copyTokens += self.generateVideoCopyTokens(trackDescriptor.getSubIndex())
for trackDescriptor in self.__targetMediaDescriptor.getTrackDescriptors(trackType=TrackType.AUDIO):
copyTokens += self.generateAudioCopyTokens(trackDescriptor.getSubIndex())
for trackDescriptor in self.__targetMediaDescriptor.getTrackDescriptors(trackType=TrackType.SUBTITLE):
copyTokens += self.generateSubtitleCopyTokens(trackDescriptor.getSubIndex())
attachmentDescriptors = (
self.__sourceMediaDescriptor.getTrackDescriptors(trackType=TrackType.ATTACHMENT)
if self.__sourceMediaDescriptor is not None
else self.__targetMediaDescriptor.getTrackDescriptors(trackType=TrackType.ATTACHMENT)
)
for trackDescriptor in attachmentDescriptors:
copyTokens += self.generateAttachmentCopyTokens(trackDescriptor.getSubIndex())
return copyTokens
def generateCropTokens(self):
if 'cut_start' in self.__context.keys() and 'cut_length' in self.__context.keys():
cropStart = int(self.__context['cut_start'])
cropLength = int(self.__context['cut_length'])
else:
cropStart = DEFAULT_cut_start
cropLength = DEFAULT_cut_length
return ['-ss', str(cropStart), '-t', str(cropLength)]
def generateOutputTokens(self, filePathBase, format = '', ext = ''):
self.__logger.debug(f"FfxController.generateOutputTokens(): base='{filePathBase}' format='{format}' ext='{ext}'")
outputFilePath = f"{filePathBase}{('.'+str(ext)) if ext else ''}"
if format:
return ['-f', format, outputFilePath]
else:
return [outputFilePath]
def generateEncodingMetadataTags(self, videoEncoder: VideoEncoder, quality, preset) -> dict:
metadataTags = {}
if videoEncoder in (VideoEncoder.AV1, VideoEncoder.H264, VideoEncoder.VP9):
metadataTags["ENCODING_QUALITY"] = str(quality)
if videoEncoder == VideoEncoder.AV1:
metadataTags["ENCODING_PRESET"] = str(preset)
return metadataTags
def generateAudioEncodingTokens(self):
"""Generates ffmpeg options audio streams including channel remapping, codec and bitrate"""
audioTokens = []
# targetAudioTrackDescriptors = [td for td in self.__targetMediaDescriptor.getAllTrackDescriptors() if td.getType() == TrackType.AUDIO]
targetAudioTrackDescriptors = self.__targetMediaDescriptor.getTrackDescriptors(trackType=TrackType.AUDIO)
trackSubIndex = 0
for trackDescriptor in targetAudioTrackDescriptors:
trackAudioLayout = trackDescriptor.getAudioLayout()
if trackAudioLayout == AudioLayout.LAYOUT_6_1:
audioTokens += [f"-c:a:{trackSubIndex}",
'libopus',
f"-filter:a:{trackSubIndex}",
'channelmap=channel_layout=6.1',
f"-b:a:{trackSubIndex}",
self.__context['bitrates']['dts']]
if trackAudioLayout == AudioLayout.LAYOUT_5_1:
audioTokens += [f"-c:a:{trackSubIndex}",
'libopus',
f"-filter:a:{trackSubIndex}",
f"channelmap={FfxController.CHANNEL_MAP_5_1}",
f"-b:a:{trackSubIndex}",
self.__context['bitrates']['ac3']]
if trackAudioLayout == AudioLayout.LAYOUT_STEREO:
audioTokens += [f"-c:a:{trackSubIndex}",
'libopus',
f"-b:a:{trackSubIndex}",
self.__context['bitrates']['stereo']]
if trackAudioLayout == AudioLayout.LAYOUT_6CH:
audioTokens += [f"-c:a:{trackSubIndex}",
'libopus',
f"-filter:a:{trackSubIndex}",
f"channelmap={FfxController.CHANNEL_MAP_5_1}",
f"-b:a:{trackSubIndex}",
self.__context['bitrates']['ac3']]
# -ac 5 ?
if trackAudioLayout == AudioLayout.LAYOUT_5_0:
audioTokens += [f"-c:a:{trackSubIndex}",
'libopus',
f"-filter:a:{trackSubIndex}",
'channelmap=channel_layout=5.0',
f"-b:a:{trackSubIndex}",
self.__context['bitrates']['ac3']]
trackSubIndex += 1
return audioTokens
def runJob(self,
sourcePath,
targetPath,
targetFormat: str = '',
chainIteration: list = [],
cropArguments: dict = {},
currentPattern: Pattern = None,
currentShowDescriptor = None):
# quality: int = DEFAULT_QUALITY,
# preset: int = DEFAULT_AV1_PRESET):
videoEncoder: VideoEncoder = self.__context.get('video_encoder', VideoEncoder.VP9)
qualityFilters = [fy for fy in chainIteration if fy['identifier'] == 'quality']
presetFilters = [fy for fy in chainIteration if fy['identifier'] == 'preset']
cropFilters = [fy for fy in chainIteration if fy['identifier'] == 'crop']
denoiseFilters = [fy for fy in chainIteration if fy['identifier'] == 'nlmeans']
deinterlaceFilters = [fy for fy in chainIteration if fy['identifier'] == 'bwdif']
if qualityFilters and (quality := qualityFilters[0]['parameters']['quality']):
self.__logger.info(f"Setting quality {quality} from command line")
elif currentPattern is not None and (quality := currentPattern.quality):
self.__logger.info(f"Setting quality {quality} from pattern")
elif currentShowDescriptor is not None and (quality := currentShowDescriptor.getQuality()):
self.__logger.info(f"Setting quality {quality} from show")
else:
quality = (QualityFilter.DEFAULT_H264_QUALITY
if (videoEncoder == VideoEncoder.H264)
else QualityFilter.DEFAULT_VP9_QUALITY)
self.__logger.info(f"Setting quality {quality} from default")
preset = presetFilters[0]['parameters']['preset'] if presetFilters else PresetFilter.DEFAULT_PRESET
self.__context['encoding_metadata_tags'] = self.generateEncodingMetadataTags(
videoEncoder,
quality,
preset,
)
filterParamTokens = []
if cropArguments:
cropParams = (f"crop="
+ f"{cropArguments[CropFilter.OUTPUT_WIDTH_KEY]}"
+ f":{cropArguments[CropFilter.OUTPUT_HEIGHT_KEY]}"
+ f":{cropArguments[CropFilter.OFFSET_X_KEY]}"
+ f":{cropArguments[CropFilter.OFFSET_Y_KEY]}")
filterParamTokens.append(cropParams)
filterParamTokens.extend(denoiseFilters[0]['tokens'] if denoiseFilters else [])
filterParamTokens.extend(deinterlaceFilters[0]['tokens'] if deinterlaceFilters else [])
deinterlaceFilters
filterTokens = ['-vf', ', '.join(filterParamTokens)] if filterParamTokens else []
commandTokens = FfxController.COMMAND_TOKENS + ['-i', sourcePath]
if videoEncoder == VideoEncoder.COPY:
commandSequence = (commandTokens
+ self.__targetMediaDescriptor.getImportFileTokens()
+ self.__targetMediaDescriptor.getInputMappingTokens(sourceMediaDescriptor = self.__sourceMediaDescriptor)
+ self.__mdcs.generateDispositionTokens())
commandSequence += self.__mdcs.generateMetadataTokens()
commandSequence += self.generateCopyTokens()
if self.__context['perform_cut']:
commandSequence += self.generateCropTokens()
commandSequence += self.generateOutputTokens(targetPath,
targetFormat)
self.__logger.debug("FfxController.runJob(): Running command sequence")
if not self.__context['dry_run']:
self.executeCommandSequence(commandSequence)
return
if videoEncoder == VideoEncoder.AV1:
commandSequence = (commandTokens
+ self.__targetMediaDescriptor.getImportFileTokens()
+ self.__targetMediaDescriptor.getInputMappingTokens(sourceMediaDescriptor = self.__sourceMediaDescriptor)
+ self.__mdcs.generateDispositionTokens())
# Optional tokens
commandSequence += self.__mdcs.generateMetadataTokens()
commandSequence += filterTokens
for td in self.__targetMediaDescriptor.getTrackDescriptors(trackType=TrackType.VIDEO):
#HINT: Attached thumbnails are not supported by .webm container format
if td.getCodec != TrackCodec.PNG:
commandSequence += self.generateAV1Tokens(int(quality), int(preset))
commandSequence += self.generateAudioEncodingTokens()
if self.__context['perform_cut']:
commandSequence += self.generateCropTokens()
commandSequence += self.generateOutputTokens(targetPath,
targetFormat)
self.__logger.debug(f"FfxController.runJob(): Running command sequence")
if not self.__context['dry_run']:
self.executeCommandSequence(commandSequence)
if videoEncoder == VideoEncoder.H264:
commandSequence = (commandTokens
+ self.__targetMediaDescriptor.getImportFileTokens()
+ self.__targetMediaDescriptor.getInputMappingTokens(sourceMediaDescriptor = self.__sourceMediaDescriptor)
+ self.__mdcs.generateDispositionTokens())
# Optional tokens
commandSequence += self.__mdcs.generateMetadataTokens()
commandSequence += filterTokens
for td in self.__targetMediaDescriptor.getTrackDescriptors(trackType=TrackType.VIDEO):
#HINT: Attached thumbnails are not supported by .webm container format
if td.getCodec != TrackCodec.PNG:
commandSequence += self.generateH264Tokens(int(quality))
commandSequence += self.generateAudioEncodingTokens()
if self.__context['perform_cut']:
commandSequence += self.generateCropTokens()
commandSequence += self.generateOutputTokens(targetPath,
targetFormat)
self.__logger.debug(f"FfxController.runJob(): Running command sequence")
if not self.__context['dry_run']:
self.executeCommandSequence(commandSequence)
if videoEncoder == VideoEncoder.VP9:
commandSequence1 = (commandTokens
+ self.__targetMediaDescriptor.getInputMappingTokens(only_video=True))
# Optional tokens
#NOTE: Filters and so needs to run on the first pass as well, as here
# the required bitrate for the second run is determined and recorded
# TODO: Results seems to be slightly better with first pass omitted,
# Confirm or find better filter settings for 2-pass
# commandSequence1 += self.__context['denoiser'].generatefilterTokens()
for td in self.__targetMediaDescriptor.getTrackDescriptors(trackType=TrackType.VIDEO):
#HINT: Attached thumbnails are not supported by .webm container format
if td.getCodec != TrackCodec.PNG:
commandSequence1 += self.generateVP9Pass1Tokens(int(quality))
if self.__context['perform_cut']:
commandSequence1 += self.generateCropTokens()
commandSequence1 += FfxController.NULL_TOKENS
if os.path.exists(FfxController.TEMP_FILE_NAME):
os.remove(FfxController.TEMP_FILE_NAME)
self.__logger.debug(f"FfxController.runJob(): Running command sequence 1")
if not self.__context['dry_run']:
self.executeCommandSequence(commandSequence1)
commandSequence2 = (commandTokens
+ self.__targetMediaDescriptor.getImportFileTokens()
+ self.__targetMediaDescriptor.getInputMappingTokens(sourceMediaDescriptor = self.__sourceMediaDescriptor)
+ self.__mdcs.generateDispositionTokens())
# Optional tokens
commandSequence2 += self.__mdcs.generateMetadataTokens()
commandSequence2 += filterTokens
for td in self.__targetMediaDescriptor.getTrackDescriptors(trackType=TrackType.VIDEO):
#HINT: Attached thumbnails are not supported by .webm container format
if td.getCodec != TrackCodec.PNG:
commandSequence2 += self.generateVP9Pass2Tokens(int(quality))
commandSequence2 += self.generateAudioEncodingTokens()
if self.__context['perform_cut']:
commandSequence2 += self.generateCropTokens()
commandSequence2 += self.generateOutputTokens(targetPath,
targetFormat)
self.__logger.debug(f"FfxController.runJob(): Running command sequence 2")
if not self.__context['dry_run']:
self.executeCommandSequence(commandSequence2)
def createEmptyFile(self,
path: str = 'empty.mkv',
sizeX: int = 1280,
sizeY: int = 720,
rate: int = 25,
length: int = 10):
commandTokens = FfxController.COMMAND_TOKENS
commandTokens += ['-f',
'lavfi',
'-i',
f"color=size={sizeX}x{sizeY}:rate={rate}:color=black",
'-f',
'lavfi',
'-i',
'anullsrc=channel_layout=stereo:sample_rate=44100',
'-t',
str(length),
path]
self.executeCommandSequence(commandTokens)