Files
ffx/src/ffx/cli.py
2026-04-15 00:03:17 +02:00

1545 lines
57 KiB
Python
Executable File

#! /usr/bin/python3
from __future__ import annotations
import os, sys, click, time, shutil, subprocess
from typing import TYPE_CHECKING
# Allow direct execution via `python src/ffx/cli.py` by preferring the package
# root on sys.path.
if __package__ in (None, ''):
script_dir = os.path.dirname(__file__)
package_root = os.path.dirname(os.path.dirname(__file__))
sys.path = [p for p in sys.path if os.path.abspath(p) != os.path.abspath(script_dir)]
sys.path.insert(0, package_root)
from ffx.constants import (
DEFAULT_AC3_BANDWIDTH,
DEFAULT_CROPDETECT_DURATION_SECONDS,
DEFAULT_CROPDETECT_SEEK_SECONDS,
DEFAULT_cut_length,
DEFAULT_cut_start,
DEFAULT_CONTAINER_EXTENSION,
DEFAULT_CONTAINER_FORMAT,
DEFAULT_DTS_BANDWIDTH,
DEFAULT_STEREO_BANDWIDTH,
DEFAULT_VIDEO_ENCODER_LABEL,
FFMPEG_COMMAND_TOKENS,
SUPPORTED_INPUT_FILE_EXTENSIONS,
VERSION,
)
if TYPE_CHECKING:
from ffx.media_descriptor import MediaDescriptor
from ffx.track_descriptor import TrackDescriptor
LIGHTWEIGHT_COMMANDS = {None, 'version', 'help', 'setup', 'configure_workstation', 'upgrade', 'rename'}
CONFIG_ONLY_COMMANDS = {'edit'}
CPU_OPTION_HELP = (
"Limit CPU for started processes. Use an absolute cpulimit value such as 200 "
+ "(about 2 cores), or use a percentage such as 25% for a share of present cores. "
+ "Omit to disable; 0 also disables."
)
SUBTITLE_DIRECTORY_OPTION_HELP = (
"Load subtitles from here. When omitted and --subtitle-prefix is set, "
+ "FFX uses the configured subtitlesDirectory base path plus the prefix as a subdirectory."
)
SUBTITLE_PREFIX_OPTION_HELP = (
"Subtitle filename prefix. Requires --subtitle-directory, or a configured "
+ "subtitlesDirectory base path that contains a matching <prefix>/ subdirectory."
)
UNMUX_OUTPUT_DIRECTORY_OPTION_HELP = (
"Write extracted streams here. When omitted together with --subtitles-only and "
+ "--label, FFX uses the configured subtitlesDirectory base path plus the label."
)
CROPDETECT_SEEK_OPTION_HELP = (
"Start crop detection this many seconds into the input. "
+ "Useful for skipping logos, intros, or black frames."
)
CROPDETECT_DURATION_OPTION_HELP = (
"Analyze this many seconds for crop detection. "
+ "Shorter windows are faster; longer windows are usually steadier."
)
DEFAULT_CUT_OPTION_VALUE = f"{DEFAULT_cut_start},{DEFAULT_cut_length}"
CUT_OPTION_HELP = (
"Cut output in seconds. "
+ f"Use --cut for the default {DEFAULT_CUT_OPTION_VALUE}, "
+ "--cut DURATION to cut from 0 for DURATION seconds, "
+ "or --cut START,DURATION for an explicit start and duration. "
+ "Omit to disable."
)
def normalizeNicenessOption(ctx, param, value):
from ffx.process import normalizeNiceness
try:
return normalizeNiceness(value)
except ValueError as ex:
raise click.BadParameter(str(ex)) from ex
def normalizeCpuOption(ctx, param, value):
from ffx.process import normalizeCpuPercent
try:
return normalizeCpuPercent(value)
except ValueError as ex:
raise click.BadParameter(str(ex)) from ex
def parseCutOptionValue(value) -> tuple[int, int] | None:
if value is None:
return None
cutValue = str(value).strip()
if not cutValue:
raise ValueError(
"Cut value must be DURATION or START,DURATION, or use --cut without a value."
)
cutTokens = [token.strip() for token in cutValue.split(',')]
try:
if len(cutTokens) == 1:
cutStart = 0
cutLength = int(cutTokens[0])
elif len(cutTokens) == 2:
cutStart = int(cutTokens[0])
cutLength = int(cutTokens[1])
else:
raise ValueError
except ValueError as ex:
raise ValueError(
"Cut value must be DURATION or START,DURATION, or use --cut without a value."
) from ex
if cutStart < 0:
raise ValueError("Cut start must be 0 or greater.")
if cutLength <= 0:
raise ValueError("Cut duration must be greater than 0.")
return cutStart, cutLength
def normalizeCutOption(ctx, param, value):
try:
return parseCutOptionValue(value)
except ValueError as ex:
raise click.BadParameter(str(ex)) from ex
def resolveSubtitleImportOptions(context, subtitleDirectory, subtitlePrefix):
resolvedSubtitlePrefix = str(subtitlePrefix).strip()
resolvedSubtitleDirectory = (
os.path.expanduser(str(subtitleDirectory).strip())
if subtitleDirectory
else ''
)
if not resolvedSubtitlePrefix:
return False, resolvedSubtitleDirectory, resolvedSubtitlePrefix
if resolvedSubtitleDirectory:
return True, resolvedSubtitleDirectory, resolvedSubtitlePrefix
configuredSubtitlesBaseDirectory = context['config'].getSubtitlesDirectoryPath()
if not configuredSubtitlesBaseDirectory:
raise click.ClickException(
"Subtitle prefix was set but no --subtitle-directory was provided and "
+ "no subtitlesDirectory default is configured in ffx.json."
)
resolvedSubtitleDirectory = os.path.join(
configuredSubtitlesBaseDirectory,
resolvedSubtitlePrefix,
)
if not os.path.isdir(resolvedSubtitleDirectory):
raise click.ClickException(
"Subtitle prefix was set but the resolved subtitle directory does not exist: "
+ resolvedSubtitleDirectory
)
return True, resolvedSubtitleDirectory, resolvedSubtitlePrefix
def resolveUnmuxOutputDirectory(context, outputDirectory, subtitlesOnly, label):
resolvedOutputDirectory = (
os.path.expanduser(str(outputDirectory).strip())
if outputDirectory
else ''
)
resolvedLabel = str(label).strip()
if resolvedOutputDirectory or not subtitlesOnly or not resolvedLabel:
return resolvedOutputDirectory, False
configuredSubtitlesBaseDirectory = context['config'].getSubtitlesDirectoryPath()
if not configuredSubtitlesBaseDirectory:
raise click.ClickException(
"Subtitles-only unmux with --label requires --output-directory or a configured "
+ "subtitlesDirectory default in ffx.json."
)
return os.path.join(configuredSubtitlesBaseDirectory, resolvedLabel), True
def resolveIndicatorDigitLengths(context=None, showDescriptor=None):
from ffx.show_descriptor import ShowDescriptor
defaultDigitLengths = ShowDescriptor.getDefaultDigitLengths(context)
if showDescriptor is None:
return (
defaultDigitLengths[ShowDescriptor.INDICATOR_SEASON_DIGITS_KEY],
defaultDigitLengths[ShowDescriptor.INDICATOR_EPISODE_DIGITS_KEY],
)
return (
int(showDescriptor.getIndicatorSeasonDigits()),
int(showDescriptor.getIndicatorEpisodeDigits()),
)
def buildRenameTargetFilename(
sourcePath,
prefix,
seasonOverride=None,
suffix='',
indicatorSeasonDigits=None,
indicatorEpisodeDigits=None,
):
from ffx.file_properties import FileProperties
from ffx.show_descriptor import ShowDescriptor
sourceFilename = os.path.basename(sourcePath)
seasonEpisodeValues = FileProperties.extractSeasonEpisodeValues(sourceFilename)
if seasonEpisodeValues is None:
return None
sourceSeason, sourceEpisode = seasonEpisodeValues
resolvedSeason = int(seasonOverride) if seasonOverride is not None else (
int(sourceSeason) if sourceSeason is not None else 1
)
resolvedIndicatorSeasonDigits = (
int(indicatorSeasonDigits)
if indicatorSeasonDigits is not None
else ShowDescriptor.DEFAULT_INDICATOR_SEASON_DIGITS
)
resolvedIndicatorEpisodeDigits = (
int(indicatorEpisodeDigits)
if indicatorEpisodeDigits is not None
else ShowDescriptor.DEFAULT_INDICATOR_EPISODE_DIGITS
)
_sourceBasename, sourceExtension = os.path.splitext(sourceFilename)
targetFilenameTokens = [
str(prefix).strip(),
f"s{resolvedSeason:0{resolvedIndicatorSeasonDigits}d}e{int(sourceEpisode):0{resolvedIndicatorEpisodeDigits}d}",
]
resolvedSuffix = str(suffix).strip()
if resolvedSuffix:
targetFilenameTokens.append(resolvedSuffix)
return f"{'_'.join(targetFilenameTokens)}{sourceExtension}"
@click.group()
@click.pass_context
@click.option('--language', 'app_language', type=str, default='', help='Set application language')
@click.option('--database-file', type=str, default='', help='Path to database file')
@click.option(
'--debug',
is_flag=True,
default=False,
help='Enable debug-only TUI diagnostics such as the log pane',
)
@click.option('-v', '--verbose', type=int, default=0, help='Set verbosity of output')
@click.option("--dry-run", is_flag=True, default=False)
def ffx(ctx, app_language, database_file, debug, verbose, dry_run):
"""FFX"""
ctx.obj = {}
if ctx.resilient_parsing:
return
from ffx.i18n import (
read_configured_language,
resolve_application_language,
set_current_language,
)
resolvedLanguage = resolve_application_language(
cli_language=app_language,
config_language=read_configured_language(),
)
set_current_language(resolvedLanguage)
ctx.obj['language'] = resolvedLanguage
ctx.obj['debug'] = bool(debug)
if ctx.invoked_subcommand in LIGHTWEIGHT_COMMANDS:
ctx.obj['dry_run'] = dry_run
ctx.obj['verbosity'] = verbose
return
from ffx.configuration_controller import ConfigurationController
from ffx.logging_utils import configure_ffx_logger
ctx.obj['config'] = ConfigurationController()
ctx.obj['dry_run'] = dry_run
ctx.obj['verbosity'] = verbose
ctx.obj['debug'] = bool(debug)
ctx.obj['language'] = resolve_application_language(
cli_language=app_language,
config_language=ctx.obj['config'].getLanguage(),
)
set_current_language(ctx.obj['language'])
# Critical 50
# Error 40
# Warning 30
# Info 20
# Debug 10
fileLogVerbosity = max(40 - verbose * 10, 10)
consoleLogVerbosity = max(20 - verbose * 10, 10)
ctx.obj['logger'] = configure_ffx_logger(
ctx.obj['config'].getLogFilePath(),
fileLogVerbosity,
consoleLogVerbosity,
)
if ctx.invoked_subcommand in CONFIG_ONLY_COMMANDS:
return
from ffx.database import databaseContext
ctx.obj['database'] = databaseContext(
databasePath=database_file
if database_file
else ctx.obj['config'].getDatabaseFilePath()
)
# Define a subcommand
@ffx.command()
def version():
click.echo(VERSION)
# Another subcommand
@ffx.command()
def help():
click.echo(f"ffx {VERSION}\n")
click.echo("Maintenance commands: setup, configure_workstation, upgrade")
click.echo("Media commands: shows, inspect, edit, convert, rename, unmux, cropdetect")
click.echo("Use 'ffx --help' or 'ffx <command> --help' for full command help.")
def getRepoRootPath():
currentFilePath = os.path.abspath(__file__)
return os.path.dirname(os.path.dirname(os.path.dirname(currentFilePath)))
def getConfigureWorkstationScriptPath():
return os.path.join(getRepoRootPath(), 'tools', 'configure_workstation.sh')
def getSetupScriptPath():
return os.path.join(getRepoRootPath(), 'tools', 'setup.sh')
def getBundleVenvDirectory():
return os.path.join(os.path.expanduser('~'), '.local', 'share', 'ffx.venv')
def getBundlePipPath():
return os.path.join(getBundleVenvDirectory(), 'bin', 'pip')
def getBundleRepoPath():
return getRepoRootPath()
def getTrackedGitChanges(repoPath):
completed = subprocess.run(
['git', 'status', '--porcelain', '--untracked-files=no'],
cwd=repoPath,
capture_output=True,
text=True,
)
if completed.returncode != 0:
commandLabel = 'git status --porcelain --untracked-files=no'
errorOutput = completed.stderr.strip() or completed.stdout.strip()
raise click.ClickException(
f"Unable to inspect bundle repository state using '{commandLabel}': {errorOutput}"
)
return [line for line in completed.stdout.splitlines() if line.strip()]
def runScriptWrapper(ctx, scriptPath, missingDescription, commandArgs):
if not os.path.isfile(scriptPath):
raise click.ClickException(f"{missingDescription} not found at {scriptPath}")
commandSequence = ['bash', scriptPath] + list(commandArgs)
if ctx.obj.get('dry_run', False):
click.echo(' '.join(commandSequence))
return
completed = subprocess.run(commandSequence)
ctx.exit(completed.returncode)
def runTuiApp(ctx) -> None:
from ffx.ffx_app import FfxApp
from ffx.logging_utils import set_ffx_console_logging_enabled
logger = ctx.obj.get('logger')
set_ffx_console_logging_enabled(logger, enabled=False)
try:
app = FfxApp(ctx.obj)
app.run()
finally:
set_ffx_console_logging_enabled(logger, enabled=True)
@ffx.command(name='setup')
@click.pass_context
@click.option('--check', is_flag=True, default=False, help='Only verify bundle-setup readiness')
@click.option('--with-tests', is_flag=True, default=False, help='Also install or verify Python test packages in the bundle venv')
@click.argument('setup_args', nargs=-1, type=click.UNPROCESSED)
def setup(ctx, check, with_tests, setup_args):
"""Prepare or repair the FFX bundle virtualenv and shell alias."""
commandArgs = []
if check:
commandArgs.append('--check')
if with_tests:
commandArgs.append('--with-tests')
commandArgs += list(setup_args)
runScriptWrapper(ctx, getSetupScriptPath(), "Bundle setup script", commandArgs)
@ffx.command(name='configure_workstation')
@click.pass_context
@click.option('--check', is_flag=True, default=False, help='Only verify workstation-configuration readiness')
@click.argument('configure_args', nargs=-1, type=click.UNPROCESSED)
def configure_workstation(ctx, check, configure_args):
"""Prepare workstation dependencies and local config after bundle install."""
commandArgs = []
if check:
commandArgs.append('--check')
commandArgs += list(configure_args)
runScriptWrapper(
ctx,
getConfigureWorkstationScriptPath(),
"Workstation configuration script",
commandArgs,
)
@ffx.command(name='upgrade')
@click.pass_context
@click.option('--branch', type=str, default='', help='Checkout this branch before pulling')
def upgrade(ctx, branch):
bundleRepoPath = getBundleRepoPath()
bundlePipPath = getBundlePipPath()
if not os.path.isdir(bundleRepoPath):
raise click.ClickException(f"Bundle repository not found at {bundleRepoPath}")
if not os.path.isfile(bundlePipPath):
raise click.ClickException(f"Bundle pip not found at {bundlePipPath}")
commandSequences = []
trackedChanges = getTrackedGitChanges(bundleRepoPath)
if trackedChanges:
click.echo("Tracked local changes detected in the bundle repository:")
for trackedChange in trackedChanges:
click.echo(f" {trackedChange}")
shouldReset = click.confirm(
"Discard these tracked changes with 'git reset --hard HEAD' before upgrade?",
default=False,
)
if not shouldReset:
raise click.ClickException(
"Upgrade aborted because tracked local changes are present."
)
commandSequences.append(['git', 'reset', '--hard', 'HEAD'])
if branch:
commandSequences += [
['git', 'fetch', 'origin', branch],
['git', 'checkout', '-B', branch, 'FETCH_HEAD'],
]
else:
commandSequences.append(['git', 'pull'])
commandSequences += [
[bundlePipPath, 'install', '--upgrade', 'pip', 'setuptools', 'wheel'],
[bundlePipPath, 'install', '--editable', '.'],
]
if ctx.obj.get('dry_run', False):
for commandSequence in commandSequences:
click.echo(f"(cd {bundleRepoPath} && {' '.join(commandSequence)})")
return
for commandSequence in commandSequences:
completed = subprocess.run(commandSequence, cwd=bundleRepoPath)
if completed.returncode != 0:
ctx.exit(completed.returncode)
@ffx.command()
@click.pass_context
@click.option('--shift', is_flag=True, default=False, help='Print resolved season-shift mapping for each file instead of opening the TUI')
@click.argument('filenames', nargs=-1)
def inspect(ctx, shift, filenames):
if not filenames:
raise click.ClickException("At least one filename is required.")
if shift:
from ffx.file_properties import FileProperties
from ffx.shifted_season_controller import ShiftedSeasonController
shiftedSeasonController = ShiftedSeasonController(ctx.obj)
for filename in filenames:
fileProperties = FileProperties(ctx.obj, filename)
season = fileProperties.getSeason()
episode = fileProperties.getEpisode()
if season == -1 or episode == -1:
click.echo(f"{filename}: no season/episode recognized")
continue
currentPattern = fileProperties.getPattern()
shiftedSeason, shiftedEpisode, sourceLabel = shiftedSeasonController.resolveShiftSeason(
fileProperties.getShowId(),
season=season,
episode=episode,
patternId=currentPattern.getId() if currentPattern is not None else None,
)
if shiftedSeason == season and shiftedEpisode == episode:
click.echo(f"{filename}: none")
else:
click.echo(
f"{filename}: {season}/{episode} -> {shiftedSeason}/{shiftedEpisode} from {sourceLabel}"
)
return
if len(filenames) != 1:
raise click.ClickException("Inspect without --shift requires exactly one filename.")
ctx.obj['command'] = 'inspect'
ctx.obj['arguments'] = {}
ctx.obj['arguments']['filename'] = filenames[0]
runTuiApp(ctx)
@ffx.command()
@click.pass_context
@click.argument('filename', nargs=1)
def edit(ctx, filename):
if not os.path.isfile(filename):
raise click.ClickException(f"File not found: {filename}")
ctx.obj['command'] = 'edit'
ctx.obj['arguments'] = {'filename': filename}
ctx.obj['use_pattern'] = False
ctx.obj['no_signature'] = True
ctx.obj['apply_metadata_cleanup'] = True
ctx.obj['apply_metadata_normalization'] = True
ctx.obj['resource_limits'] = ctx.obj.get('resource_limits', {})
runTuiApp(ctx)
@ffx.command()
@click.pass_context
@click.argument('paths', nargs=-1)
@click.option('--prefix', type=str, required=True, help='Required target filename prefix')
@click.option('--season', type=int, default=None, help='Override target season index')
@click.option('--suffix', type=str, default='', help='Optional target filename suffix')
@click.option('--dry-run', is_flag=True, default=False, help='Only print planned renames')
def rename(ctx, paths, prefix, season, suffix, dry_run):
"""Rename matching episode files in place."""
from ffx.configuration_controller import ConfigurationController
resolvedPrefix = str(prefix).strip()
resolvedSuffix = str(suffix).strip()
effectiveDryRun = bool(ctx.obj.get('dry_run', False) or dry_run)
renameContext = {
'config': ctx.obj.get('config') or ConfigurationController(),
}
indicatorSeasonDigits, indicatorEpisodeDigits = resolveIndicatorDigitLengths(renameContext)
if not resolvedPrefix:
raise click.ClickException("Rename prefix must not be empty.")
processedCount = 0
for sourcePath in paths:
if not os.path.isfile(sourcePath):
continue
targetFilename = buildRenameTargetFilename(
sourcePath,
resolvedPrefix,
seasonOverride=season,
suffix=resolvedSuffix,
indicatorSeasonDigits=indicatorSeasonDigits,
indicatorEpisodeDigits=indicatorEpisodeDigits,
)
if targetFilename is None:
continue
sourceFilename = os.path.basename(sourcePath)
targetPath = os.path.join(os.path.dirname(sourcePath), targetFilename)
click.echo(f"{sourceFilename} -> {targetFilename}")
processedCount += 1
if effectiveDryRun or os.path.abspath(sourcePath) == os.path.abspath(targetPath):
continue
if os.path.exists(targetPath):
raise click.ClickException(f"Target file already exists: {targetPath}")
shutil.move(sourcePath, targetPath)
if processedCount == 0:
click.echo("No matching files found.")
def getUnmuxSequence(trackDescriptor: TrackDescriptor, sourcePath, targetPrefix, targetDirectory = ''):
from ffx.track_codec import TrackCodec
from ffx.track_type import TrackType
# executable and input file
commandTokens = list(FFMPEG_COMMAND_TOKENS) + ['-i', sourcePath]
trackType = trackDescriptor.getType()
trackCodec = trackDescriptor.getCodec()
targetPathBase = os.path.join(targetDirectory, targetPrefix) if targetDirectory else targetPrefix
# mapping
commandTokens += ['-map', f"0:{trackType.indicator()}:{trackDescriptor.getSubIndex()}"]
if trackType == TrackType.VIDEO and trackCodec == TrackCodec.H265:
commandTokens += ['-c:v', 'copy', '-bsf:v', 'hevc_mp4toannexb']
else:
commandTokens += ['-c', 'copy']
# output format
codecFormat = trackCodec.format()
if codecFormat is not None:
commandTokens += ['-f', codecFormat]
# output filename
commandTokens += [f"{targetPathBase}.{trackCodec.extension()}"]
return commandTokens
@ffx.command()
@click.pass_context
@click.argument('paths', nargs=-1)
@click.option('-l', '--label', type=str, default='', help='Label to be used as filename prefix')
@click.option("-o", "--output-directory", type=str, default='', help=UNMUX_OUTPUT_DIRECTORY_OPTION_HELP)
@click.option("-s", "--subtitles-only", is_flag=True, default=False)
@click.option(
'--nice',
type=int,
default=None,
callback=normalizeNicenessOption,
show_default='disabled',
help='Adjust niceness of started processes (-20..19). Omit to disable; 99 also disables.',
)
@click.option(
'--cpu',
type=str,
default=None,
callback=normalizeCpuOption,
show_default='disabled',
help=CPU_OPTION_HELP,
)
def unmux(ctx,
paths,
label,
output_directory,
subtitles_only,
nice,
cpu):
from ffx.file_properties import FileProperties
from ffx.process import executeProcess
from ffx.shifted_season_controller import ShiftedSeasonController
from ffx.track_disposition import TrackDisposition
from ffx.track_type import TrackType
existingSourcePaths = [p for p in paths if os.path.isfile(p)]
ctx.obj['logger'].debug(f"\nUnmuxing {len(existingSourcePaths)} files")
ctx.obj['resource_limits'] = {}
ctx.obj['resource_limits']['niceness'] = nice
ctx.obj['resource_limits']['cpu_limit'] = cpu
ctx.obj['resource_limits']['cpu_percent'] = cpu
output_directory, create_output_directory = resolveUnmuxOutputDirectory(
ctx.obj,
output_directory,
subtitles_only,
label,
)
if create_output_directory and existingSourcePaths and not ctx.obj.get('dry_run', False):
os.makedirs(output_directory, exist_ok=True)
shiftedSeasonController = ShiftedSeasonController(ctx.obj)
for sourcePath in existingSourcePaths:
fp = FileProperties(ctx.obj, sourcePath)
try:
sourceMediaDescriptor = fp.getMediaDescriptor()
currentPattern = fp.getPattern()
currentShowDescriptor = (
currentPattern.getShowDescriptor(ctx.obj) if currentPattern is not None else None
)
indicatorSeasonDigits, indicatorEpisodeDigits = resolveIndicatorDigitLengths(
ctx.obj,
currentShowDescriptor,
)
season, episode = shiftedSeasonController.shiftSeason(
fp.getShowId(),
season=fp.getSeason(),
episode=fp.getEpisode(),
patternId=currentPattern.getId() if currentPattern is not None else None,
)
#TODO: Recognition für alle Formate anpassen
targetLabel = label if label else fp.getFileBasename()
targetIndicator = (
f"_S{season:0{indicatorSeasonDigits}d}E{episode:0{indicatorEpisodeDigits}d}"
if label and season != -1 and episode != -1
else ''
)
if label and not targetIndicator:
ctx.obj['logger'].warning(f"Skipping file {fp.getFilename()}: Label set but no indicator recognized")
continue
else:
ctx.obj['logger'].info(f"\nUnmuxing file {fp.getFilename()}\n")
# for trackDescriptor in sourceMediaDescriptor.getAllTrackDescriptors():
for trackDescriptor in sourceMediaDescriptor.getTrackDescriptors():
if trackDescriptor.getType() == TrackType.SUBTITLE or not subtitles_only:
# SEASON_EPISODE_STREAM_LANGUAGE_DISPOSITIONS_MATCH = '[sS]([0-9]+)[eE]([0-9]+)_([0-9]+)_([a-z]{3})(?:_([A-Z]{3}))*'
targetPrefix = f"{targetLabel}{targetIndicator}_{trackDescriptor.getIndex()}_{trackDescriptor.getLanguage().threeLetter()}"
td: TrackDisposition
for td in sorted(trackDescriptor.getDispositionSet(), key=lambda d: d.index()):
targetPrefix += f"_{td.indicator()}"
unmuxSequence = getUnmuxSequence(trackDescriptor, sourcePath, targetPrefix, targetDirectory = output_directory)
if unmuxSequence:
if not ctx.obj['dry_run']:
#TODO #425: Codec Enum
ctx.obj['logger'].info(f"Unmuxing stream {trackDescriptor.getIndex()} into file {targetPrefix}.{trackDescriptor.getCodec().extension()}")
ctx.obj['logger'].debug(f"Executing unmuxing sequence")
out, err, rc = executeProcess(unmuxSequence, context = ctx.obj)
if rc:
ctx.obj['logger'].error(f"Unmuxing of stream {trackDescriptor.getIndex()} failed with error ({rc}) {err}")
else:
ctx.obj['logger'].warning(f"Skipping stream with unknown codec")
except Exception as ex:
ctx.obj['logger'].warning(f"Skipping File {sourcePath} ({ex})")
@ffx.command()
@click.pass_context
@click.argument('paths', nargs=-1)
@click.option(
'--nice',
type=int,
default=None,
callback=normalizeNicenessOption,
show_default='disabled',
help='Adjust niceness of started processes (-20..19). Omit to disable; 99 also disables.',
)
@click.option(
'--cpu',
type=str,
default=None,
callback=normalizeCpuOption,
show_default='disabled',
help=CPU_OPTION_HELP,
)
@click.option(
'--crop-seek',
type=click.IntRange(min=0),
default=DEFAULT_CROPDETECT_SEEK_SECONDS,
show_default=True,
help=CROPDETECT_SEEK_OPTION_HELP,
)
@click.option(
'--crop-duration',
type=click.IntRange(min=1),
default=DEFAULT_CROPDETECT_DURATION_SECONDS,
show_default=True,
help=CROPDETECT_DURATION_OPTION_HELP,
)
def cropdetect(ctx,
paths,
nice,
cpu,
crop_seek,
crop_duration):
from ffx.file_properties import FileProperties
existingSourcePaths = [p for p in paths if os.path.isfile(p)]
ctx.obj['logger'].debug(f"\nUnmuxing {len(existingSourcePaths)} files")
ctx.obj['resource_limits'] = {}
ctx.obj['resource_limits']['niceness'] = nice
ctx.obj['resource_limits']['cpu_limit'] = cpu
ctx.obj['resource_limits']['cpu_percent'] = cpu
ctx.obj['cropdetect'] = {
'seek_seconds': crop_seek,
'duration_seconds': crop_duration,
}
for sourcePath in existingSourcePaths:
try:
fp = FileProperties(ctx.obj, sourcePath)
cropParams = fp.findCropArguments()
click.echo(cropParams)
except Exception as ex:
ctx.obj['logger'].warning(f"Skipping File {sourcePath} ({ex})")
@ffx.command()
@click.pass_context
def shows(ctx):
ctx.obj['command'] = 'shows'
runTuiApp(ctx)
def checkUniqueDispositions(context, mediaDescriptor: MediaDescriptor):
from ffx.i18n import t
from ffx.track_disposition import TrackDisposition
from ffx.track_type import TrackType
# Check for multiple default or forced dispositions if not set by user input or database requirements
#
# Query user for the correct sub indices, then configure flags in track descriptors associated with media descriptor accordingly.
# The correct tokens should then be created by
if len([v for v in mediaDescriptor.getVideoTracks() if v.getDispositionFlag(TrackDisposition.DEFAULT)]) > 1:
if context['no_prompt']:
raise click.ClickException(t('More than one default video stream detected and no prompt set'))
defaultVideoTrackSubIndex = click.prompt(t("More than one default video stream detected! Please select stream"), type=int)
mediaDescriptor.setDefaultSubTrack(TrackType.VIDEO, defaultVideoTrackSubIndex)
if len([v for v in mediaDescriptor.getVideoTracks() if v.getDispositionFlag(TrackDisposition.FORCED)]) > 1:
if context['no_prompt']:
raise click.ClickException(t('More than one forced video stream detected and no prompt set'))
forcedVideoTrackSubIndex = click.prompt(t("More than one forced video stream detected! Please select stream"), type=int)
mediaDescriptor.setForcedSubTrack(TrackType.VIDEO, forcedVideoTrackSubIndex)
if len([a for a in mediaDescriptor.getAudioTracks() if a.getDispositionFlag(TrackDisposition.DEFAULT)]) > 1:
if context['no_prompt']:
raise click.ClickException(t('More than one default audio stream detected and no prompt set'))
defaultAudioTrackSubIndex = click.prompt(t("More than one default audio stream detected! Please select stream"), type=int)
mediaDescriptor.setDefaultSubTrack(TrackType.AUDIO, defaultAudioTrackSubIndex)
if len([a for a in mediaDescriptor.getAudioTracks() if a.getDispositionFlag(TrackDisposition.FORCED)]) > 1:
if context['no_prompt']:
raise click.ClickException(t('More than one forced audio stream detected and no prompt set'))
forcedAudioTrackSubIndex = click.prompt(t("More than one forced audio stream detected! Please select stream"), type=int)
mediaDescriptor.setForcedSubTrack(TrackType.AUDIO, forcedAudioTrackSubIndex)
if len([s for s in mediaDescriptor.getSubtitleTracks() if s.getDispositionFlag(TrackDisposition.DEFAULT)]) > 1:
if context['no_prompt']:
raise click.ClickException(t('More than one default subtitle stream detected and no prompt set'))
defaultSubtitleTrackSubIndex = click.prompt(t("More than one default subtitle stream detected! Please select stream"), type=int)
mediaDescriptor.setDefaultSubTrack(TrackType.SUBTITLE, defaultSubtitleTrackSubIndex)
if len([s for s in mediaDescriptor.getSubtitleTracks() if s.getDispositionFlag(TrackDisposition.FORCED)]) > 1:
if context['no_prompt']:
raise click.ClickException(t('More than one forced subtitle stream detected and no prompt set'))
forcedSubtitleTrackSubIndex = click.prompt(t("More than one forced subtitle stream detected! Please select stream"), type=int)
mediaDescriptor.setForcedSubTrack(TrackType.SUBTITLE, forcedSubtitleTrackSubIndex)
@ffx.command()
@click.pass_context
@click.argument('paths', nargs=-1)
@click.option('-l', '--label', type=str, default='', help='Label to be used as filename prefix')
@click.option('-v', '--video-encoder', type=str, default=DEFAULT_VIDEO_ENCODER_LABEL, help=f"Target video encoder (vp9, av1, h264 or copy)", show_default=True)
@click.option('-q', '--quality', type=str, default="", help=f"Quality settings to be used with VP9/H264 encoder")
@click.option('-p', '--preset', type=str, default="", help=f"Quality preset to be used with AV1 encoder")
@click.option('-a', '--stereo-bitrate', type=int, default=DEFAULT_STEREO_BANDWIDTH, help=f"Bitrate in kbit/s to be used to encode stereo audio streams", show_default=True)
@click.option('--ac3', type=int, default=DEFAULT_AC3_BANDWIDTH, help=f"Bitrate in kbit/s to be used to encode 5.1 audio streams", show_default=True)
@click.option('--dts', type=int, default=DEFAULT_DTS_BANDWIDTH, help=f"Bitrate in kbit/s to be used to encode 6.1 audio streams", show_default=True)
@click.option('--subtitle-directory', type=str, default='', help=SUBTITLE_DIRECTORY_OPTION_HELP)
@click.option('--subtitle-prefix', type=str, default='', help=SUBTITLE_PREFIX_OPTION_HELP)
@click.option('--language', type=str, multiple=True, help='Set stream language. Use format <stream index>:<3 letter iso code>')
@click.option('--title', type=str, multiple=True, help='Set stream title. Use format <stream index>:<title>')
@click.option('--default-video', type=int, default=-1, help='Index of default video stream')
@click.option('--forced-video', type=int, default=-1, help='Index of forced video stream')
@click.option('--default-audio', type=int, default=-1, help='Index of default audio stream')
@click.option('--forced-audio', type=int, default=-1, help='Index of forced audio stream')
@click.option('--default-subtitle', type=int, default=-1, help='Index of default subtitle stream')
@click.option('--forced-subtitle', type=int, default=-1, help='Index of forced subtitle stream')
@click.option('--rearrange-streams', type=str, default="", help='Rearrange output streams order. Use format comma separated integers')
@click.option("--crop", is_flag=False, flag_value="auto", default="none")
@click.option(
'--crop-seek',
type=click.IntRange(min=0),
default=DEFAULT_CROPDETECT_SEEK_SECONDS,
show_default=True,
help='When --crop auto is used, start crop detection this many seconds into the input.',
)
@click.option(
'--crop-duration',
type=click.IntRange(min=1),
default=DEFAULT_CROPDETECT_DURATION_SECONDS,
show_default=True,
help='When --crop auto is used, analyze this many seconds for crop detection.',
)
@click.option(
"--cut",
type=str,
metavar="DURATION|START,DURATION",
is_flag=False,
flag_value=DEFAULT_CUT_OPTION_VALUE,
default=None,
callback=normalizeCutOption,
help=CUT_OPTION_HELP,
)
@click.option("--output-directory", type=str, default='')
@click.option("--deinterlace", is_flag=False, flag_value="default", default="none")
@click.option("--denoise", is_flag=False, flag_value="default", default="none")
@click.option("--denoise-use-hw", is_flag=True, default=False)
@click.option('--denoise-strength', type=str, default='', help='Denoising strength, more blurring vs more details.')
@click.option('--denoise-patch-size', type=str, default='', help='Subimage size to apply filtering on luminosity plane. Reduces broader noise patterns but costly.')
@click.option('--denoise-chroma-patch-size', type=str, default='', help='Subimage size to apply filtering on chroma planes.')
@click.option('--denoise-research-window', type=str, default='', help='Range to search for comparable patches on luminosity plane. Better filtering but costly.')
@click.option('--denoise-chroma-research-window', type=str, default='', help='Range to search for comparable patches on chroma planes.')
@click.option('--show', type=int, default=-1, help='Set TMDB show identifier')
@click.option('--season', type=int, default=-1, help='Set season of show')
@click.option('--episode', type=int, default=-1, help='Set episode of show')
@click.option("--no-tmdb", is_flag=True, default=False)
@click.option("--no-pattern", is_flag=True, default=False)
@click.option("--dont-pass-dispositions", is_flag=True, default=False)
@click.option("--no-prompt", is_flag=True, default=False)
@click.option("--no-signature", is_flag=True, default=False)
@click.option("--keep-mkvmerge-metadata", is_flag=True, default=False)
@click.option(
'--nice',
type=int,
default=None,
callback=normalizeNicenessOption,
show_default='disabled',
help='Adjust niceness of started processes (-20..19). Omit to disable; 99 also disables.',
)
@click.option(
'--cpu',
type=str,
default=None,
callback=normalizeCpuOption,
show_default='disabled',
help=CPU_OPTION_HELP,
)
@click.option('--rename-only', is_flag=True, default=False, help='Only renaming and moving, no recoding')
def convert(ctx,
paths,
label,
video_encoder,
quality,
preset,
stereo_bitrate,
ac3,
dts,
subtitle_directory,
subtitle_prefix,
language,
title,
default_video,
forced_video,
default_audio,
forced_audio,
default_subtitle,
forced_subtitle,
rearrange_streams,
crop,
crop_seek,
crop_duration,
cut,
output_directory,
deinterlace,
denoise,
denoise_use_hw,
denoise_strength,
denoise_patch_size,
denoise_chroma_patch_size,
denoise_research_window,
denoise_chroma_research_window,
show,
season,
episode,
no_tmdb,
no_pattern,
dont_pass_dispositions,
no_prompt,
no_signature,
keep_mkvmerge_metadata,
nice,
cpu,
rename_only):
"""Batch conversion of audiovideo files in format suitable for web playback, e.g. jellyfin
Files found under PATHS will be converted according to parameters.
Filename extensions will be changed appropriately.
Suffices will we appended to filename in case of multiple created files
or if the filename has not changed."""
from ffx.ffx_controller import FfxController
from ffx.file_properties import FileProperties
from ffx.filter.crop_filter import CropFilter
from ffx.filter.deinterlace_filter import DeinterlaceFilter
from ffx.filter.nlmeans_filter import NlmeansFilter
from ffx.filter.preset_filter import PresetFilter
from ffx.filter.quality_filter import QualityFilter
from ffx.helper import filterFilename, getEpisodeFileBasename, substituteTmdbFilename
from ffx.shifted_season_controller import ShiftedSeasonController
from ffx.show_controller import ShowController
from ffx.show_descriptor import ShowDescriptor
from ffx.tmdb_controller import TmdbController
from ffx.track_codec import TrackCodec
from ffx.track_disposition import TrackDisposition
from ffx.video_encoder import VideoEncoder
startTime = time.perf_counter()
context = ctx.obj
context['video_encoder'] = VideoEncoder.fromLabel(video_encoder)
# HINT: quick and dirty override for h264, todo improve
if context['video_encoder'] in (VideoEncoder.H264, VideoEncoder.COPY):
targetFormat = ''
targetExtension = 'mkv'
else:
targetFormat = DEFAULT_CONTAINER_FORMAT
targetExtension = DEFAULT_CONTAINER_EXTENSION
context['use_tmdb'] = not no_tmdb
context['use_pattern'] = not no_pattern
context['no_prompt'] = no_prompt
context['no_signature'] = no_signature
context['keep_mkvmerge_metadata'] = keep_mkvmerge_metadata
context['resource_limits'] = {}
context['resource_limits']['niceness'] = nice
context['resource_limits']['cpu_limit'] = cpu
context['resource_limits']['cpu_percent'] = cpu
context['cropdetect'] = {
'seek_seconds': crop_seek,
'duration_seconds': crop_duration,
}
(
context['import_subtitles'],
resolvedSubtitleDirectory,
resolvedSubtitlePrefix,
) = resolveSubtitleImportOptions(
context,
subtitle_directory,
subtitle_prefix,
)
if context['import_subtitles']:
context['subtitle_directory'] = resolvedSubtitleDirectory
context['subtitle_prefix'] = resolvedSubtitlePrefix
existingSourcePaths = [p for p in paths if os.path.isfile(p) and p.split('.')[-1] in SUPPORTED_INPUT_FILE_EXTENSIONS]
# CLI Overrides
cliOverrides = {}
if language:
cliOverrides['languages'] = {}
for overLang in language:
olTokens = overLang.split(':')
if len(olTokens) == 2:
try:
cliOverrides['languages'][int(olTokens[0])] = olTokens[1]
except ValueError:
ctx.obj['logger'].warning(f"Ignoring non-integer language index {olTokens[0]}")
continue
if title:
cliOverrides['titles'] = {}
for overTitle in title:
otTokens = overTitle.split(':')
if len(otTokens) == 2:
try:
cliOverrides['titles'][int(otTokens[0])] = otTokens[1]
except ValueError:
ctx.obj['logger'].warning(f"Ignoring non-integer title index {otTokens[0]}")
continue
if default_video != -1:
cliOverrides['default_video'] = default_video
if forced_video != -1:
cliOverrides['forced_video'] = forced_video
if default_audio != -1:
cliOverrides['default_audio'] = default_audio
if forced_audio != -1:
cliOverrides['forced_audio'] = forced_audio
if default_subtitle != -1:
cliOverrides['default_subtitle'] = default_subtitle
if forced_subtitle != -1:
cliOverrides['forced_subtitle'] = forced_subtitle
if show != -1 or season != -1 or episode != -1:
if len(existingSourcePaths) > 1:
context['logger'].warning(f"Ignoring TMDB show, season, episode overrides, not supported for multiple source files")
else:
cliOverrides['tmdb'] = {}
if show != -1:
cliOverrides['tmdb']['show'] = show
if season != -1:
cliOverrides['tmdb']['season'] = season
if episode != -1:
cliOverrides['tmdb']['episode'] = episode
if cliOverrides:
context['overrides'] = cliOverrides
if rearrange_streams:
try:
cliOverrides['stream_order'] = [int(si) for si in rearrange_streams.split(",")]
except ValueError as ve:
errorMessage = "Non-integer in rearrange stream parameter"
ctx.obj['logger'].error(errorMessage)
raise click.Abort()
ctx.obj['logger'].debug(f"\nVideo encoder: {video_encoder}")
context['bitrates'] = {}
context['bitrates']['stereo'] = str(stereo_bitrate) if str(stereo_bitrate).endswith('k') else f"{stereo_bitrate}k"
context['bitrates']['ac3'] = str(ac3) if str(ac3).endswith('k') else f"{ac3}k"
context['bitrates']['dts'] = str(dts) if str(dts).endswith('k') else f"{dts}k"
ctx.obj['logger'].debug(f"Stereo bitrate: {context['bitrates']['stereo']}")
ctx.obj['logger'].debug(f"AC3 bitrate: {context['bitrates']['ac3']}")
ctx.obj['logger'].debug(f"DTS bitrate: {context['bitrates']['dts']}")
#->
# Process cut parameters
context['perform_cut'] = (cut is not None)
if context['perform_cut']:
context['cut_start'], context['cut_length'] = cut
click.echo(
f"Cutting enabled: start {context['cut_start']} s, duration {context['cut_length']} s."
)
ctx.obj['logger'].debug(
f"Cut start={context['cut_start']} length={context['cut_length']}"
)
tc = TmdbController() if context['use_tmdb'] else None
qualityKwargs = {QualityFilter.QUALITY_KEY: str(quality)}
qf = QualityFilter(**qualityKwargs)
if context['video_encoder'] == VideoEncoder.AV1 and preset:
presetKwargs = {PresetFilter.PRESET_KEY: preset}
PresetFilter(**presetKwargs)
cf = None
# if crop != 'none':
if crop == 'auto':
cropKwargs = {}
cf = CropFilter(**cropKwargs)
denoiseKwargs = {}
if denoise_strength:
denoiseKwargs[NlmeansFilter.STRENGTH_KEY] = denoise_strength
if denoise_patch_size:
denoiseKwargs[NlmeansFilter.PATCH_SIZE_KEY] = denoise_patch_size
if denoise_chroma_patch_size:
denoiseKwargs[NlmeansFilter.CHROMA_PATCH_SIZE_KEY] = denoise_chroma_patch_size
if denoise_research_window:
denoiseKwargs[NlmeansFilter.RESEARCH_WINDOW_KEY] = denoise_research_window
if denoise_chroma_research_window:
denoiseKwargs[NlmeansFilter.CHROMA_RESEARCH_WINDOW_KEY] = denoise_chroma_research_window
if denoise != 'none' or denoiseKwargs:
NlmeansFilter(**denoiseKwargs)
if deinterlace != 'none':
DeinterlaceFilter()
chainYield = list(qf.getChainYield())
ctx.obj['logger'].info(f"\nRunning {len(existingSourcePaths) * len(chainYield)} jobs")
jobIndex = 0
showController = ShowController(context)
for sourcePath in existingSourcePaths:
# Separate basedir, basename and extension for current source file
sourceDirectory = os.path.dirname(sourcePath)
sourceFilename = os.path.basename(sourcePath)
sourcePathTokens = sourceFilename.split('.')
sourceFileBasename = '.'.join(sourcePathTokens[:-1])
sourceFilenameExtension = sourcePathTokens[-1]
ctx.obj['logger'].info(f"\nProcessing file {sourcePath}")
targetSuffices = {}
mediaFileProperties = FileProperties(context, sourcePath)
# if not cf is None:
#
cropArguments = {} if cf is None else mediaFileProperties.findCropArguments()
#
# ctx.obj['logger'].info(f"\nSetting crop arguments: ouput width: {cropArguments[CropFilter.OUTPUT_WIDTH_KEY]} "
# + f"height: {cropArguments[CropFilter.OUTPUT_HEIGHT_KEY]} "
# + f"offset x: {cropArguments[CropFilter.OFFSET_X_KEY]} "
# + f"y: {cropArguments[CropFilter.OFFSET_Y_KEY]}")
#
# cf.setArguments(**cropArguments)
ssc = ShiftedSeasonController(context)
matchedShowId = mediaFileProperties.getShowId()
#HINT: -1 if not set
if 'tmdb' in cliOverrides.keys() and 'season' in cliOverrides['tmdb']:
showSeason = cliOverrides['tmdb']['season']
else:
showSeason = mediaFileProperties.getSeason()
if 'tmdb' in cliOverrides.keys() and 'episode' in cliOverrides['tmdb']:
showEpisode = cliOverrides['tmdb']['episode']
else:
showEpisode = mediaFileProperties.getEpisode()
ctx.obj['logger'].debug(f"Season={showSeason} Episode={showEpisode}")
sourceMediaDescriptor = mediaFileProperties.getMediaDescriptor()
if ([smd for smd in sourceMediaDescriptor.getSubtitleTracks()
if smd.getCodec() == TrackCodec.ASS]
and [amd for amd in sourceMediaDescriptor.getAttachmentTracks()
if amd.getCodec() == TrackCodec.TTF]):
targetFormat = ''
targetExtension = 'mkv'
#HINT: This is None if the filename did not match anything in database
currentPattern = mediaFileProperties.getPattern() if context['use_pattern'] else None
ctx.obj['logger'].debug(f"Pattern matching: {'No' if currentPattern is None else 'Yes'}")
# Setup FfxController accordingly depending on pattern matching is enabled and a pattern was matched
if currentPattern is None:
checkUniqueDispositions(context, sourceMediaDescriptor)
currentShowDescriptor = None
if context['import_subtitles']:
sourceMediaDescriptor.importSubtitles(context['subtitle_directory'],
context['subtitle_prefix'],
showSeason,
showEpisode)
if cliOverrides:
sourceMediaDescriptor.applyOverrides(cliOverrides)
fc = FfxController(context, sourceMediaDescriptor)
else:
targetMediaDescriptor = currentPattern.getMediaDescriptor(ctx.obj)
checkUniqueDispositions(context, targetMediaDescriptor)
currentShowDescriptor = currentPattern.getShowDescriptor(ctx.obj)
# Check if source and target track descriptors match
sourceTrackDescriptorList = sourceMediaDescriptor.getTrackDescriptors()
targetTrackDescriptorList = targetMediaDescriptor.getTrackDescriptors()
for ttd in targetTrackDescriptorList:
tti = ttd.getIndex()
ttsi = ttd.getSourceIndex()
stList = [st for st in sourceTrackDescriptorList if st.getIndex() == ttsi]
std = stList[0] if stList else None
if std is None:
raise click.ClickException(f"Target track #{tti} refering to non-existent source track #{ttsi}")
ttType = ttd.getType()
stType = std.getType()
if ttType != stType:
raise click.ClickException(f"Target track #{tti} type ({ttType.label()}) not matching source track #{ttsi} type ({stType.label()})")
if context['import_subtitles']:
targetMediaDescriptor.importSubtitles(context['subtitle_directory'],
context['subtitle_prefix'],
showSeason,
showEpisode,
preserve_dispositions=True)
# ctx.obj['logger'].debug(f"tmd subindices: {[t.getIndex() for t in targetMediaDescriptor.getAllTrackDescriptors()]} {[t.getSubIndex() for t in targetMediaDescriptor.getAllTrackDescriptors()]} {[t.getDispositionFlag(TrackDisposition.DEFAULT) for t in targetMediaDescriptor.getAllTrackDescriptors()]}")
ctx.obj['logger'].debug(f"tmd subindices: {[t.getIndex() for t in targetMediaDescriptor.getTrackDescriptors()]} {[t.getSubIndex() for t in targetMediaDescriptor.getTrackDescriptors()]} {[t.getDispositionFlag(TrackDisposition.DEFAULT) for t in targetMediaDescriptor.getTrackDescriptors()]}")
if cliOverrides:
targetMediaDescriptor.applyOverrides(cliOverrides)
# ctx.obj['logger'].debug(f"tmd subindices: {[t.getIndex() for t in targetMediaDescriptor.getAllTrackDescriptors()]} {[t.getSubIndex() for t in targetMediaDescriptor.getAllTrackDescriptors()]} {[t.getDispositionFlag(TrackDisposition.DEFAULT) for t in targetMediaDescriptor.getAllTrackDescriptors()]}")
ctx.obj['logger'].debug(f"tmd subindices: {[t.getIndex() for t in targetMediaDescriptor.getTrackDescriptors()]} {[t.getSubIndex() for t in targetMediaDescriptor.getTrackDescriptors()]} {[t.getDispositionFlag(TrackDisposition.DEFAULT) for t in targetMediaDescriptor.getTrackDescriptors()]}")
ctx.obj['logger'].debug(f"Input mapping tokens (2nd pass): {targetMediaDescriptor.getInputMappingTokens()}")
fc = FfxController(context, targetMediaDescriptor, sourceMediaDescriptor)
qualityShowId = (
cliOverrides['tmdb']['show']
if 'tmdb' in cliOverrides.keys() and 'show' in cliOverrides['tmdb']
else matchedShowId
)
if currentShowDescriptor is None and qualityShowId != -1:
currentShowDescriptor = showController.getShowDescriptor(qualityShowId)
defaultDigitLengths = ShowDescriptor.getDefaultDigitLengths(context)
indexSeasonDigits = currentShowDescriptor.getIndexSeasonDigits() if not currentPattern is None else defaultDigitLengths[ShowDescriptor.INDEX_SEASON_DIGITS_KEY]
indexEpisodeDigits = currentShowDescriptor.getIndexEpisodeDigits() if not currentPattern is None else defaultDigitLengths[ShowDescriptor.INDEX_EPISODE_DIGITS_KEY]
indicatorSeasonDigits = currentShowDescriptor.getIndicatorSeasonDigits() if not currentPattern is None else defaultDigitLengths[ShowDescriptor.INDICATOR_SEASON_DIGITS_KEY]
indicatorEpisodeDigits = currentShowDescriptor.getIndicatorEpisodeDigits() if not currentPattern is None else defaultDigitLengths[ShowDescriptor.INDICATOR_EPISODE_DIGITS_KEY]
showIdForShift = (
cliOverrides['tmdb']['show']
if 'tmdb' in cliOverrides.keys() and 'show' in cliOverrides['tmdb']
else matchedShowId
)
patternIdForShift = currentPattern.getId() if currentPattern is not None else None
hasExplicitTargetSeasonOrEpisode = (
'tmdb' in cliOverrides.keys()
and (
'season' in cliOverrides['tmdb']
or 'episode' in cliOverrides['tmdb']
)
)
# Shift season and episode if defined for the matched pattern or show
if (
not hasExplicitTargetSeasonOrEpisode
and showSeason != -1
and showEpisode != -1
):
shiftedShowSeason, shiftedShowEpisode = ssc.shiftSeason(
showIdForShift,
season=showSeason,
episode=showEpisode,
patternId=patternIdForShift,
)
else:
shiftedShowSeason = showSeason
shiftedShowEpisode = showEpisode
# Assemble target filename accordingly depending on TMDB lookup is enabled
#HINT: -1 if not set
showId = (
cliOverrides['tmdb']['show']
if 'tmdb' in cliOverrides.keys() and 'show' in cliOverrides['tmdb']
else (-1 if currentShowDescriptor is None else currentShowDescriptor.getId())
)
if context['use_tmdb'] and showId != -1 and shiftedShowSeason != -1 and shiftedShowEpisode != -1:
ctx.obj['logger'].debug(f"Querying TMDB for show_id={showId} season={shiftedShowSeason} episode{shiftedShowEpisode}")
if currentPattern is None:
sName, showYear = tc.getShowNameAndYear(showId)
showName = filterFilename(sName)
showFilenamePrefix = f"{showName} ({str(showYear)})"
else:
showFilenamePrefix = currentShowDescriptor.getFilenamePrefix()
tmdbEpisodeResult = tc.queryEpisode(showId, shiftedShowSeason, shiftedShowEpisode)
ctx.obj['logger'].debug(f"tmdbEpisodeResult={tmdbEpisodeResult}")
if tmdbEpisodeResult:
substitutedEpisodeName = filterFilename(substituteTmdbFilename(tmdbEpisodeResult['name']))
sourceFileBasename = getEpisodeFileBasename(showFilenamePrefix,
substitutedEpisodeName,
shiftedShowSeason,
shiftedShowEpisode,
indexSeasonDigits,
indexEpisodeDigits,
indicatorSeasonDigits,
indicatorEpisodeDigits,
context=ctx.obj)
if label:
if shiftedShowSeason > -1 and shiftedShowEpisode > -1:
targetSuffices['se'] = f"S{shiftedShowSeason:0{indicatorSeasonDigits}d}E{shiftedShowEpisode:0{indicatorEpisodeDigits}d}"
elif shiftedShowEpisode > -1:
targetSuffices['se'] = f"E{shiftedShowEpisode:0{indicatorEpisodeDigits}d}"
else:
if 'se' in targetSuffices.keys():
del targetSuffices['se']
ctx.obj['logger'].debug(f"fileBasename={sourceFileBasename}")
for chainIteration in chainYield:
ctx.obj['logger'].debug(f"\nchain iteration: {chainIteration}\n")
chainVariant = '-'.join([fy['variant'] for fy in chainIteration])
ctx.obj['logger'].debug(f"\nRunning job {jobIndex} file={sourcePath} variant={chainVariant}")
jobIndex += 1
ctx.obj['logger'].debug(f"label={label if label else 'Falsy'}")
ctx.obj['logger'].debug(f"sourceFileBasename={sourceFileBasename}")
# targetFileBasename = sourceFileBasename if context['use_tmdb'] and not label else label
targetFileBasename = (label or sourceFileBasename) if context['use_tmdb'] else sourceFileBasename
targetFilenameTokens = [targetFileBasename]
if 'se' in targetSuffices.keys():
targetFilenameTokens += [targetSuffices['se']]
for filterYield in chainIteration:
targetFilenameTokens += filterYield['suffices']
targetFilename = f"{'_'.join(targetFilenameTokens)}.{sourceFilenameExtension if rename_only else targetExtension}"
if sourceFilename == targetFilename:
targetFilename = f"out_{targetFilename}"
targetPath = os.path.join(output_directory, targetFilename) if output_directory else targetFilename
ctx.obj['logger'].info(f"Creating file {targetFilename}")
if rename_only:
shutil.move(sourcePath, targetPath)
else:
fc.runJob(sourcePath,
targetPath,
targetFormat,
chainIteration,
cropArguments,
currentPattern,
currentShowDescriptor)
endTime = time.perf_counter()
ctx.obj['logger'].info(f"\nDONE\nTime elapsed {endTime - startTime}")
if __name__ == '__main__':
ffx()