10 Commits

Author SHA1 Message Date
Javanaut
12be6e985a v0.4.2 2026-04-24 13:39:57 +02:00
Javanaut
12310942ae Fix inspect attachment subtracks 2026-04-24 10:34:43 +02:00
Javanaut
f913cb4fe3 ff 2026-04-24 08:49:48 +02:00
Javanaut
0a153280e3 ff 2026-04-24 08:49:30 +02:00
Javanaut
6ca0cd54b0 addendum 2026-04-23 22:16:03 +02:00
Javanaut
502a822bb4 prep 0.4.1 2026-04-23 22:09:36 +02:00
Javanaut
6cc21b5f36 Adds diagnostics/remedy system 2026-04-23 20:32:49 +02:00
Javanaut
0034f8ca97 ff 2026-04-23 16:37:47 +02:00
Javanaut
eedcbaed0a Merge branch 'dev' of gitea.maveno.de:Javanaut/ffx into dev 2026-04-23 16:31:19 +02:00
Javanaut
653ce7b417 Copy audio and video flags 2026-04-23 16:30:15 +02:00
24 changed files with 1533 additions and 87 deletions

View File

@@ -99,6 +99,19 @@ TMDB-backed metadata enrichment requires `TMDB_API_KEY` to be set in the environ
## Version History ## Version History
### 0.4.2
- pattern details now show an inline `Show: <quality>` hint next to the quality field when the pattern itself has no stored quality but the selected show does
- inspect stream tables now show attachment format labels like `TTF` in the codec column and keep attachment language cells blank instead of showing an undefined language
- ffmpeg damaged-MP3 diagnostics now recognize additional corruption lines such as `invalid new backstep`, keeping them grouped under the `warn-corrupt-mpeg-audio` review summary
### 0.4.1
- `convert` now supports `--copy-video` and `--copy-audio` to keep the selected stream type in copy mode without applying the corresponding reencode flags, filters, or formatting options
- ffmpeg conversions now monitor diagnostics while the process is running, retry unset AVI packet timestamps once with `-fflags +genpts`, and stop early when a file should be skipped instead of waiting for the full job to finish
- end-of-run convert summaries now list only ffmpeg findings that still require review, including named remedy identifiers such as `warn-corrupt-mpeg-audio`
- `upgrade` now finishes by reporting the installed FFX version together with the active bundle branch
### 0.3.1 ### 0.3.1
- debug mode screen titles now append the active Textual screen class name, making screen-specific troubleshooting easier during inspect and edit flows - debug mode screen titles now append the active Textual screen class name, making screen-specific troubleshooting easier during inspect and edit flows

View File

@@ -69,15 +69,3 @@
## Delete When ## Delete When
- Delete this scratchpad once the optimization backlog is either converted into issues/work items or distilled into durable project guidance. - Delete this scratchpad once the optimization backlog is either converted into issues/work items or distilled into durable project guidance.
## Missing Timestamps
Detect ffmpeg warning "Timestamps are unset in a packet for stream 0. This is deprecated and will stop working in the future. Fix your code to set the timestamps properly" and try autofix by -fflags +genpts -> Warning if fails -> Error. Check if flags collide with anything.
<!--
## Source Formats
-->

View File

@@ -1,7 +1,7 @@
[project] [project]
name = "ffx" name = "ffx"
description = "FFX recoding and metadata managing tool" description = "FFX recoding and metadata managing tool"
version = "0.3.1" version = "0.4.2"
license = {file = "LICENSE.md"} license = {file = "LICENSE.md"}
dependencies = [ dependencies = [
"requests", "requests",

View File

@@ -68,6 +68,14 @@ CUT_OPTION_HELP = (
+ "or --cut START,DURATION for an explicit start and duration. " + "or --cut START,DURATION for an explicit start and duration. "
+ "Omit to disable." + "Omit to disable."
) )
COPY_VIDEO_OPTION_HELP = (
"Copy video streams without re-encoding. Skips video encoder options "
+ "and video filters."
)
COPY_AUDIO_OPTION_HELP = (
"Copy audio streams without re-encoding. Skips audio encoder options "
+ "and audio filters."
)
def normalizeNicenessOption(ctx, param, value): def normalizeNicenessOption(ctx, param, value):
@@ -385,6 +393,41 @@ def getTrackedGitChanges(repoPath):
return [line for line in completed.stdout.splitlines() if line.strip()] return [line for line in completed.stdout.splitlines() if line.strip()]
def getCurrentGitBranch(repoPath):
completed = subprocess.run(
['git', 'rev-parse', '--abbrev-ref', 'HEAD'],
cwd=repoPath,
capture_output=True,
text=True,
)
if completed.returncode != 0:
commandLabel = 'git rev-parse --abbrev-ref HEAD'
errorOutput = completed.stderr.strip() or completed.stdout.strip()
raise click.ClickException(
f"Unable to inspect bundle repository branch using '{commandLabel}': {errorOutput}"
)
return completed.stdout.strip() or "unknown"
def getBundleVersion(repoPath):
constantsPath = os.path.join(repoPath, 'src', 'ffx', 'constants.py')
try:
with open(constantsPath, encoding='utf-8') as constantsFile:
for line in constantsFile:
strippedLine = line.strip()
if strippedLine.startswith('VERSION=') or strippedLine.startswith('VERSION ='):
return strippedLine.split('=', 1)[1].strip().strip('"\'')
except OSError as ex:
raise click.ClickException(
f"Unable to inspect bundle version from {constantsPath}: {ex}"
) from ex
raise click.ClickException(f"Unable to inspect bundle version from {constantsPath}")
def runScriptWrapper(ctx, scriptPath, missingDescription, commandArgs): def runScriptWrapper(ctx, scriptPath, missingDescription, commandArgs):
if not os.path.isfile(scriptPath): if not os.path.isfile(scriptPath):
raise click.ClickException(f"{missingDescription} not found at {scriptPath}") raise click.ClickException(f"{missingDescription} not found at {scriptPath}")
@@ -507,6 +550,10 @@ def upgrade(ctx, branch):
if completed.returncode != 0: if completed.returncode != 0:
ctx.exit(completed.returncode) ctx.exit(completed.returncode)
upgradedBranch = getCurrentGitBranch(bundleRepoPath)
upgradedVersion = getBundleVersion(bundleRepoPath)
click.echo(f"Updated FFX to version {upgradedVersion} from branch {upgradedBranch}.")
@ffx.command() @ffx.command()
@click.pass_context @click.pass_context
@@ -915,6 +962,8 @@ def checkUniqueDispositions(context, mediaDescriptor: MediaDescriptor):
@click.option('-l', '--label', type=str, default='', help='Label to be used as filename prefix') @click.option('-l', '--label', type=str, default='', help='Label to be used as filename prefix')
@click.option('-v', '--video-encoder', type=str, default=DEFAULT_VIDEO_ENCODER_LABEL, help=f"Target video encoder (vp9, av1, h264 or copy)", show_default=True) @click.option('-v', '--video-encoder', type=str, default=DEFAULT_VIDEO_ENCODER_LABEL, help=f"Target video encoder (vp9, av1, h264 or copy)", show_default=True)
@click.option('--copy-video', is_flag=True, default=False, help=COPY_VIDEO_OPTION_HELP)
@click.option('--copy-audio', is_flag=True, default=False, help=COPY_AUDIO_OPTION_HELP)
@click.option('-q', '--quality', type=str, default="", help=f"Quality settings to be used with VP9/H264 encoder") @click.option('-q', '--quality', type=str, default="", help=f"Quality settings to be used with VP9/H264 encoder")
@click.option('-p', '--preset', type=str, default="", help=f"Quality preset to be used with AV1 encoder") @click.option('-p', '--preset', type=str, default="", help=f"Quality preset to be used with AV1 encoder")
@@ -1011,6 +1060,8 @@ def convert(ctx,
paths, paths,
label, label,
video_encoder, video_encoder,
copy_video,
copy_audio,
quality, quality,
preset, preset,
stereo_bitrate, stereo_bitrate,
@@ -1070,6 +1121,11 @@ def convert(ctx,
Suffices will we appended to filename in case of multiple created files Suffices will we appended to filename in case of multiple created files
or if the filename has not changed.""" or if the filename has not changed."""
from ffx.ffx_controller import FfxController from ffx.ffx_controller import FfxController
from ffx.diagnostics import (
FfmpegSkipFileWarning,
getUnremediedIssues,
iterUnremediedIssueSummaryLines,
)
from ffx.file_properties import FileProperties from ffx.file_properties import FileProperties
from ffx.filter.crop_filter import CropFilter from ffx.filter.crop_filter import CropFilter
from ffx.filter.deinterlace_filter import DeinterlaceFilter from ffx.filter.deinterlace_filter import DeinterlaceFilter
@@ -1090,9 +1146,12 @@ def convert(ctx,
context = ctx.obj context = ctx.obj
context['video_encoder'] = VideoEncoder.fromLabel(video_encoder) context['video_encoder'] = VideoEncoder.fromLabel(video_encoder)
context['copy_video'] = copy_video
context['copy_audio'] = copy_audio
copyVideoEffective = copy_video or context['video_encoder'] == VideoEncoder.COPY
# HINT: quick and dirty override for h264, todo improve # HINT: quick and dirty override for h264, todo improve
if context['video_encoder'] in (VideoEncoder.H264, VideoEncoder.COPY): if context['video_encoder'] in (VideoEncoder.H264, VideoEncoder.COPY) or copy_video or copy_audio:
targetFormat = '' targetFormat = ''
targetExtension = 'mkv' targetExtension = 'mkv'
else: else:
@@ -1225,36 +1284,54 @@ def convert(ctx,
tc = TmdbController() if context['use_tmdb'] else None tc = TmdbController() if context['use_tmdb'] else None
qualityKwargs = {QualityFilter.QUALITY_KEY: str(quality)} if copyVideoEffective and quality:
ctx.obj['logger'].warning("Ignoring quality settings because video is being copied")
qualityKwargs = {
QualityFilter.QUALITY_KEY: "" if copyVideoEffective else str(quality)
}
qf = QualityFilter(**qualityKwargs) qf = QualityFilter(**qualityKwargs)
if context['video_encoder'] == VideoEncoder.AV1 and preset: if context['video_encoder'] == VideoEncoder.AV1 and preset and not copyVideoEffective:
presetKwargs = {PresetFilter.PRESET_KEY: preset} presetKwargs = {PresetFilter.PRESET_KEY: preset}
PresetFilter(**presetKwargs) PresetFilter(**presetKwargs)
cf = None cf = None
# if crop != 'none': # if crop != 'none':
if crop == 'auto': videoFilterOptionsRequested = (
crop != 'none'
or deinterlace != 'none'
or denoise != 'none'
or denoise_strength
or denoise_patch_size
or denoise_chroma_patch_size
or denoise_research_window
or denoise_chroma_research_window
)
if copyVideoEffective and videoFilterOptionsRequested:
ctx.obj['logger'].warning("Ignoring video filter options because video is being copied")
if crop == 'auto' and not copyVideoEffective:
cropKwargs = {} cropKwargs = {}
cf = CropFilter(**cropKwargs) cf = CropFilter(**cropKwargs)
denoiseKwargs = {} denoiseKwargs = {}
if denoise_strength: if denoise_strength and not copyVideoEffective:
denoiseKwargs[NlmeansFilter.STRENGTH_KEY] = denoise_strength denoiseKwargs[NlmeansFilter.STRENGTH_KEY] = denoise_strength
if denoise_patch_size: if denoise_patch_size and not copyVideoEffective:
denoiseKwargs[NlmeansFilter.PATCH_SIZE_KEY] = denoise_patch_size denoiseKwargs[NlmeansFilter.PATCH_SIZE_KEY] = denoise_patch_size
if denoise_chroma_patch_size: if denoise_chroma_patch_size and not copyVideoEffective:
denoiseKwargs[NlmeansFilter.CHROMA_PATCH_SIZE_KEY] = denoise_chroma_patch_size denoiseKwargs[NlmeansFilter.CHROMA_PATCH_SIZE_KEY] = denoise_chroma_patch_size
if denoise_research_window: if denoise_research_window and not copyVideoEffective:
denoiseKwargs[NlmeansFilter.RESEARCH_WINDOW_KEY] = denoise_research_window denoiseKwargs[NlmeansFilter.RESEARCH_WINDOW_KEY] = denoise_research_window
if denoise_chroma_research_window: if denoise_chroma_research_window and not copyVideoEffective:
denoiseKwargs[NlmeansFilter.CHROMA_RESEARCH_WINDOW_KEY] = denoise_chroma_research_window denoiseKwargs[NlmeansFilter.CHROMA_RESEARCH_WINDOW_KEY] = denoise_chroma_research_window
if denoise != 'none' or denoiseKwargs: if not copyVideoEffective and (denoise != 'none' or denoiseKwargs):
NlmeansFilter(**denoiseKwargs) NlmeansFilter(**denoiseKwargs)
if deinterlace != 'none': if deinterlace != 'none' and not copyVideoEffective:
DeinterlaceFilter() DeinterlaceFilter()
chainYield = list(qf.getChainYield()) chainYield = list(qf.getChainYield())
@@ -1528,18 +1605,30 @@ def convert(ctx,
if rename_only: if rename_only:
shutil.move(sourcePath, targetPath) shutil.move(sourcePath, targetPath)
else: else:
fc.runJob(sourcePath, try:
targetPath, fc.runJob(sourcePath,
targetFormat, targetPath,
chainIteration, targetFormat,
cropArguments, chainIteration,
currentPattern, cropArguments,
currentShowDescriptor) currentPattern,
currentShowDescriptor)
except FfmpegSkipFileWarning:
if os.path.exists(targetPath):
os.remove(targetPath)
continue
endTime = time.perf_counter() endTime = time.perf_counter()
ctx.obj['logger'].info(f"\nDONE\nTime elapsed {endTime - startTime}") ctx.obj['logger'].info(f"\nDONE\nTime elapsed {endTime - startTime}")
unremediedIssues = getUnremediedIssues(context)
if unremediedIssues:
ctx.obj['logger'].warning("\nFiles with ffmpeg findings that require review:")
for summaryLine in iterUnremediedIssueSummaryLines(context):
ctx.obj['logger'].warning(summaryLine)
else:
ctx.obj['logger'].info("All files converted with no issues.")
if __name__ == '__main__': if __name__ == '__main__':

View File

@@ -1,4 +1,4 @@
VERSION='0.3.1' VERSION='0.4.2'
DATABASE_VERSION = 3 DATABASE_VERSION = 3
DEFAULT_QUALITY = 32 DEFAULT_QUALITY = 32

View File

@@ -0,0 +1,24 @@
from .base import FfmpegRemedy, FfmpegRemedyDecision, FfmpegSkipFileWarning
from .monitor import FfmpegCommandRunner, FfmpegDiagnosticMonitor
from .retry_with_generated_pts import RetryWithGeneratedPtsRemedy
from .state import (
getDiagnosticsState,
getUnremediedIssues,
iterUnremediedIssueSummaryLines,
recordUnremediedIssue,
)
from .warn_corrupt_mpeg_audio import WarnCorruptMpegAudioRemedy
__all__ = [
"FfmpegCommandRunner",
"FfmpegDiagnosticMonitor",
"FfmpegRemedy",
"FfmpegRemedyDecision",
"FfmpegSkipFileWarning",
"RetryWithGeneratedPtsRemedy",
"WarnCorruptMpegAudioRemedy",
"getDiagnosticsState",
"getUnremediedIssues",
"iterUnremediedIssueSummaryLines",
"recordUnremediedIssue",
]

View File

@@ -0,0 +1,33 @@
from __future__ import annotations
from dataclasses import dataclass
class FfmpegSkipFileWarning(Exception):
pass
@dataclass(frozen=True)
class FfmpegRemedyDecision:
stop_process: bool = False
retry_input_tokens: tuple[str, ...] = ()
skip_file: bool = False
console_warning: str = ""
summary_identifier: str = ""
unremedied_issue_identifier: str = ""
@property
def retry_requested(self) -> bool:
return bool(self.retry_input_tokens)
class FfmpegRemedy:
identifier = "ffmpeg-remedy"
harmless = False
def inspect_line(
self,
line: str,
session: "FfmpegDiagnosticMonitor",
) -> FfmpegRemedyDecision | None:
raise NotImplementedError

View File

@@ -0,0 +1,222 @@
from __future__ import annotations
import re
from ffx.logging_utils import get_ffx_logger
from ffx.process import executeProcess
from .base import FfmpegSkipFileWarning, FfmpegRemedy
from .retry_with_generated_pts import RetryWithGeneratedPtsRemedy
from .state import recordUnremediedIssue
from .warn_corrupt_mpeg_audio import WarnCorruptMpegAudioRemedy
UNHANDLED_DIAGNOSTIC_PATTERNS = (
re.compile(r"\bwarning\b", re.IGNORECASE),
re.compile(r"\berror\b", re.IGNORECASE),
re.compile(r"\bfailed\b", re.IGNORECASE),
re.compile(r"\binvalid\b", re.IGNORECASE),
re.compile(r"\bmissing\b", re.IGNORECASE),
re.compile(r"\bcorrupt\b", re.IGNORECASE),
re.compile(r"\boverflow\b", re.IGNORECASE),
re.compile(r"\bdeprecated\b", re.IGNORECASE),
)
class FfmpegDiagnosticMonitor:
def __init__(
self,
context: dict | None,
command_sequence: list[str],
*,
remedies: list[FfmpegRemedy] | None = None,
emittedWarnings: set[str] | None = None,
):
self.context = context or {}
self.command_sequence = list(command_sequence)
self.logger = self.context.get("logger", get_ffx_logger())
self.source_path = str(self.context.get("current_source_path", "")).strip()
self.remedies = remedies or [
RetryWithGeneratedPtsRemedy(),
WarnCorruptMpegAudioRemedy(),
]
self._emittedWarnings = emittedWarnings if emittedWarnings is not None else set()
self.retry_input_tokens: tuple[str, ...] = ()
self.skip_file = False
self.skip_file_message = ""
def describe_source(self) -> str:
return self.source_path if self.source_path else "current file"
def command_contains_tokens(self, tokens: tuple[str, ...]) -> bool:
tokenCount = len(tokens)
if tokenCount == 0:
return True
return any(
tuple(self.command_sequence[index:index + tokenCount]) == tuple(tokens)
for index in range(len(self.command_sequence) - tokenCount + 1)
)
def emitConsoleWarning(self, warningMessage: str) -> None:
if warningMessage and warningMessage not in self._emittedWarnings:
self.logger.warning(warningMessage)
self._emittedWarnings.add(warningMessage)
def recordUnremediedIssue(self, issueIdentifier: str, issueLine: str) -> None:
isFirstIssueForFile = recordUnremediedIssue(
self.context,
self.describe_source(),
issueIdentifier,
)
if not isFirstIssueForFile:
return
self.emitConsoleWarning(
f"ffmpeg reported a diagnostic with no automatic remedy while converting "
+ f"{self.describe_source()}. FFX will continue, but review the output "
+ f"file. First unhandled line: {issueLine}"
)
def lineLooksLikeUnhandledDiagnostic(self, line: str) -> bool:
return any(pattern.search(line) for pattern in UNHANDLED_DIAGNOSTIC_PATTERNS)
def getUnhandledDiagnosticIdentifier(self, line: str) -> str:
loweredLine = str(line).lower()
if any(token in loweredLine for token in ("error", "failed", "invalid", "missing", "corrupt", "overflow")):
return "unhandled-error"
if any(token in loweredLine for token in ("warning", "deprecated")):
return "unhandled-warning"
return "unhandled-diagnostic"
def getSummaryIdentifier(
self,
remedy: FfmpegRemedy,
decision,
) -> str:
explicitIdentifier = str(decision.summary_identifier).strip()
if explicitIdentifier:
return explicitIdentifier
remedyIdentifier = str(getattr(remedy, "identifier", "")).strip()
if remedyIdentifier and remedyIdentifier != FfmpegRemedy.identifier:
return remedyIdentifier
return str(decision.unremedied_issue_identifier).strip()
def shouldRecordSummary(
self,
remedy: FfmpegRemedy,
decision,
) -> bool:
if getattr(remedy, "harmless", False):
return False
if decision.retry_requested and not decision.skip_file:
return False
return bool(self.getSummaryIdentifier(remedy, decision))
def handle_stderr_line(self, line: str) -> bool:
strippedLine = str(line).strip()
if not strippedLine:
return False
for remedy in self.remedies:
decision = remedy.inspect_line(strippedLine, self)
if decision is None:
continue
self.emitConsoleWarning(decision.console_warning)
if decision.retry_requested:
self.retry_input_tokens = tuple(decision.retry_input_tokens)
if self.shouldRecordSummary(remedy, decision):
recordUnremediedIssue(
self.context,
self.describe_source(),
self.getSummaryIdentifier(remedy, decision),
)
if decision.skip_file:
self.skip_file = True
self.skip_file_message = (
decision.console_warning
or f"Skipping file {self.describe_source()} because ffmpeg reported a fatal diagnostic."
)
return bool(decision.stop_process)
if self.lineLooksLikeUnhandledDiagnostic(strippedLine):
self.recordUnremediedIssue(
self.getUnhandledDiagnosticIdentifier(strippedLine),
strippedLine,
)
return False
@property
def retry_requested(self) -> bool:
return bool(self.retry_input_tokens)
def insertFfmpegInputOptions(
commandSequence: list[str],
extraTokens: tuple[str, ...],
) -> list[str]:
if not extraTokens:
return list(commandSequence)
if not commandSequence:
return list(extraTokens)
return [commandSequence[0]] + list(extraTokens) + list(commandSequence[1:])
class FfmpegCommandRunner:
def __init__(
self,
context: dict | None,
*,
remedies: list[FfmpegRemedy] | None = None,
):
self.__context = context or {}
self.__remedies = remedies
def execute(
self,
commandSequence: list[str],
*,
directory: str = None,
timeoutSeconds: float = None,
):
emittedWarnings: set[str] = set()
attemptCommandSequence = list(commandSequence)
while True:
monitor = FfmpegDiagnosticMonitor(
self.__context,
attemptCommandSequence,
remedies=self.__remedies,
emittedWarnings=emittedWarnings,
)
out, err, rc = executeProcess(
attemptCommandSequence,
directory=directory,
context=self.__context,
timeoutSeconds=timeoutSeconds,
stderrLineHandler=monitor.handle_stderr_line,
)
if monitor.retry_requested:
attemptCommandSequence = insertFfmpegInputOptions(
attemptCommandSequence,
monitor.retry_input_tokens,
)
continue
if monitor.skip_file:
raise FfmpegSkipFileWarning(monitor.skip_file_message)
return out, err, rc

View File

@@ -0,0 +1,41 @@
from __future__ import annotations
import re
from .base import FfmpegRemedy, FfmpegRemedyDecision
class RetryWithGeneratedPtsRemedy(FfmpegRemedy):
identifier = "retry-with-generated-pts"
RETRY_INPUT_TOKENS = ("-fflags", "+genpts")
TIMESTAMP_UNSET_PATTERN = re.compile(
r"Timestamps are unset in a packet for stream \d+"
)
def inspect_line(
self,
line: str,
session: "FfmpegDiagnosticMonitor",
) -> FfmpegRemedyDecision | None:
if self.TIMESTAMP_UNSET_PATTERN.search(line) is None:
return None
if session.command_contains_tokens(self.RETRY_INPUT_TOKENS):
return FfmpegRemedyDecision(
stop_process=True,
skip_file=True,
console_warning=(
f"Skipping file {session.describe_source()}: ffmpeg still reported "
+ "unset packet timestamps after retry with -fflags +genpts."
),
unremedied_issue_identifier="timestamp-unset-after-genpts",
)
return FfmpegRemedyDecision(
stop_process=True,
retry_input_tokens=self.RETRY_INPUT_TOKENS,
console_warning=(
f"ffmpeg reported unset packet timestamps for {session.describe_source()}. "
+ "Stopping early and retrying with -fflags +genpts."
),
)

View File

@@ -0,0 +1,53 @@
from __future__ import annotations
import os
DIAGNOSTICS_STATE_KEY = "diagnostics_state"
UNREMEDIED_ISSUES_KEY = "unremedied_issues"
def getDiagnosticsState(context: dict | None) -> dict:
if context is None:
return {UNREMEDIED_ISSUES_KEY: {}}
if DIAGNOSTICS_STATE_KEY not in context:
context[DIAGNOSTICS_STATE_KEY] = {
UNREMEDIED_ISSUES_KEY: {},
}
return context[DIAGNOSTICS_STATE_KEY]
def recordUnremediedIssue(
context: dict | None,
sourcePath: str,
identifier: str,
) -> bool:
if not sourcePath:
return False
diagnosticsState = getDiagnosticsState(context)
unremediedIssues = diagnosticsState[UNREMEDIED_ISSUES_KEY]
issueList = unremediedIssues.setdefault(sourcePath, [])
strippedIdentifier = str(identifier).strip()
if not strippedIdentifier or strippedIdentifier in issueList:
return False
issueList.append(strippedIdentifier)
return True
def getUnremediedIssues(context: dict | None) -> dict[str, list[str]]:
diagnosticsState = getDiagnosticsState(context)
return diagnosticsState.get(UNREMEDIED_ISSUES_KEY, {})
def iterUnremediedIssueSummaryLines(context: dict | None) -> list[str]:
summaryLines = []
unremediedIssues = getUnremediedIssues(context)
for sourcePath in sorted(unremediedIssues.keys()):
identifiers = unremediedIssues[sourcePath]
summaryLines.append(f"{os.path.basename(sourcePath)}: {', '.join(identifiers)}")
return summaryLines

View File

@@ -0,0 +1,35 @@
from __future__ import annotations
import re
from .base import FfmpegRemedy, FfmpegRemedyDecision
class WarnCorruptMpegAudioRemedy(FfmpegRemedy):
identifier = "warn-corrupt-mpeg-audio"
PATTERNS = (
re.compile(r"\[mp3float @ .*\] invalid block type", re.IGNORECASE),
re.compile(r"\[mp3float @ .*\] invalid new backstep -?\d+", re.IGNORECASE),
re.compile(r"\[mp3float @ .*\] Header missing"),
re.compile(r"\[mp3float @ .*\] overread, skip ", re.IGNORECASE),
re.compile(r"Error while decoding MPEG audio frame\."),
re.compile(
r"Error submitting packet to decoder: Invalid data found when processing input"
),
)
def inspect_line(
self,
line: str,
session: "FfmpegDiagnosticMonitor",
) -> FfmpegRemedyDecision | None:
if not any(pattern.search(line) for pattern in self.PATTERNS):
return None
return FfmpegRemedyDecision(
console_warning=(
f"ffmpeg reported damaged MPEG audio frames while converting "
+ f"{session.describe_source()}. FFX will continue, but the output "
+ "audio may contain gaps or glitches."
),
)

View File

@@ -0,0 +1,27 @@
from .diagnostics import (
FfmpegCommandRunner,
FfmpegDiagnosticMonitor,
FfmpegRemedy,
FfmpegRemedyDecision,
FfmpegSkipFileWarning,
RetryWithGeneratedPtsRemedy,
WarnCorruptMpegAudioRemedy,
getDiagnosticsState,
getUnremediedIssues,
iterUnremediedIssueSummaryLines,
recordUnremediedIssue,
)
__all__ = [
"FfmpegCommandRunner",
"FfmpegDiagnosticMonitor",
"FfmpegRemedy",
"FfmpegRemedyDecision",
"FfmpegSkipFileWarning",
"RetryWithGeneratedPtsRemedy",
"WarnCorruptMpegAudioRemedy",
"getDiagnosticsState",
"getUnremediedIssues",
"iterUnremediedIssueSummaryLines",
"recordUnremediedIssue",
]

View File

@@ -3,6 +3,7 @@ from functools import lru_cache
from logging import Logger from logging import Logger
from ffx.media_descriptor_change_set import MediaDescriptorChangeSet from ffx.media_descriptor_change_set import MediaDescriptorChangeSet
from ffx.diagnostics import FfmpegCommandRunner
from ffx.media_descriptor import MediaDescriptor from ffx.media_descriptor import MediaDescriptor
from ffx.audio_layout import AudioLayout from ffx.audio_layout import AudioLayout
@@ -63,6 +64,7 @@ class FfxController():
self.__logger: Logger = context['logger'] self.__logger: Logger = context['logger']
self.__warnedH264Fallback = False self.__warnedH264Fallback = False
self.__ffmpegCommandRunner = FfmpegCommandRunner(context)
@staticmethod @staticmethod
@@ -100,7 +102,13 @@ class FfxController():
def executeCommandSequence(self, commandSequence): def executeCommandSequence(self, commandSequence):
out, err, rc = executeProcess(commandSequence, context=self.__context) if commandSequence and str(commandSequence[0]).strip() == "ffmpeg":
out, err, rc = self.__ffmpegCommandRunner.execute(
commandSequence,
timeoutSeconds=None,
)
else:
out, err, rc = executeProcess(commandSequence, context=self.__context)
if rc: if rc:
raise click.ClickException(f"Command resulted in error: rc={rc} error={err}") raise click.ClickException(f"Command resulted in error: rc={rc} error={err}")
return out, err, rc return out, err, rc
@@ -172,6 +180,16 @@ class FfxController():
def generateAudioCopyTokens(self, subIndex): def generateAudioCopyTokens(self, subIndex):
return [f"-c:a:{int(subIndex)}", 'copy'] return [f"-c:a:{int(subIndex)}", 'copy']
def generateVideoCopyAllTokens(self):
if self.__targetMediaDescriptor.getTrackDescriptors(trackType=TrackType.VIDEO):
return ["-c:v", "copy"]
return []
def generateAudioCopyAllTokens(self):
if self.__targetMediaDescriptor.getTrackDescriptors(trackType=TrackType.AUDIO):
return ["-c:a", "copy"]
return []
def generateSubtitleCopyTokens(self, subIndex): def generateSubtitleCopyTokens(self, subIndex):
return [f"-c:s:{int(subIndex)}", 'copy'] return [f"-c:s:{int(subIndex)}", 'copy']
@@ -292,6 +310,12 @@ class FfxController():
return audioTokens return audioTokens
def generateAudioProcessingTokens(self):
if self.__context.get('copy_audio', False):
return self.generateAudioCopyAllTokens()
return self.generateAudioEncodingTokens()
def runJob(self, def runJob(self,
sourcePath, sourcePath,
targetPath, targetPath,
@@ -305,6 +329,8 @@ class FfxController():
videoEncoder: VideoEncoder = self.__context.get('video_encoder', VideoEncoder.VP9) videoEncoder: VideoEncoder = self.__context.get('video_encoder', VideoEncoder.VP9)
self.__context['current_source_path'] = sourcePath
copyVideo = self.__context.get('copy_video', False) or videoEncoder == VideoEncoder.COPY
qualityFilters = [fy for fy in chainIteration if fy['identifier'] == 'quality'] qualityFilters = [fy for fy in chainIteration if fy['identifier'] == 'quality']
@@ -315,30 +341,35 @@ class FfxController():
deinterlaceFilters = [fy for fy in chainIteration if fy['identifier'] == 'bwdif'] deinterlaceFilters = [fy for fy in chainIteration if fy['identifier'] == 'bwdif']
if qualityFilters and (quality := qualityFilters[0]['parameters']['quality']): if copyVideo:
self.__logger.info(f"Setting quality {quality} from command line") quality = None
elif currentPattern is not None and (quality := currentPattern.quality): self.__context['encoding_metadata_tags'] = {}
self.__logger.info(f"Setting quality {quality} from pattern")
elif currentShowDescriptor is not None and (quality := currentShowDescriptor.getQuality()):
self.__logger.info(f"Setting quality {quality} from show")
else: else:
quality = (QualityFilter.DEFAULT_H264_QUALITY if qualityFilters and (quality := qualityFilters[0]['parameters']['quality']):
if (videoEncoder == VideoEncoder.H264) self.__logger.info(f"Setting quality {quality} from command line")
else QualityFilter.DEFAULT_VP9_QUALITY) elif currentPattern is not None and (quality := currentPattern.quality):
self.__logger.info(f"Setting quality {quality} from default") self.__logger.info(f"Setting quality {quality} from pattern")
elif currentShowDescriptor is not None and (quality := currentShowDescriptor.getQuality()):
self.__logger.info(f"Setting quality {quality} from show")
else:
quality = (QualityFilter.DEFAULT_H264_QUALITY
if (videoEncoder == VideoEncoder.H264)
else QualityFilter.DEFAULT_VP9_QUALITY)
self.__logger.info(f"Setting quality {quality} from default")
preset = presetFilters[0]['parameters']['preset'] if presetFilters else PresetFilter.DEFAULT_PRESET preset = presetFilters[0]['parameters']['preset'] if presetFilters else PresetFilter.DEFAULT_PRESET
self.__context['encoding_metadata_tags'] = self.generateEncodingMetadataTags( if not copyVideo:
videoEncoder, self.__context['encoding_metadata_tags'] = self.generateEncodingMetadataTags(
quality, videoEncoder,
preset, quality,
) preset,
)
filterParamTokens = [] filterParamTokens = []
if cropArguments: if cropArguments and not copyVideo:
cropParams = (f"crop=" cropParams = (f"crop="
+ f"{cropArguments[CropFilter.OUTPUT_WIDTH_KEY]}" + f"{cropArguments[CropFilter.OUTPUT_WIDTH_KEY]}"
@@ -348,8 +379,9 @@ class FfxController():
filterParamTokens.append(cropParams) filterParamTokens.append(cropParams)
filterParamTokens.extend(denoiseFilters[0]['tokens'] if denoiseFilters else []) if not copyVideo:
filterParamTokens.extend(deinterlaceFilters[0]['tokens'] if deinterlaceFilters else []) filterParamTokens.extend(denoiseFilters[0]['tokens'] if denoiseFilters else [])
filterParamTokens.extend(deinterlaceFilters[0]['tokens'] if deinterlaceFilters else [])
deinterlaceFilters deinterlaceFilters
@@ -380,6 +412,29 @@ class FfxController():
self.executeCommandSequence(commandSequence) self.executeCommandSequence(commandSequence)
return return
if copyVideo:
commandSequence = (commandTokens
+ self.__targetMediaDescriptor.getImportFileTokens()
+ self.__targetMediaDescriptor.getInputMappingTokens(sourceMediaDescriptor = self.__sourceMediaDescriptor)
+ self.__mdcs.generateDispositionTokens())
commandSequence += self.__mdcs.generateMetadataTokens()
commandSequence += self.generateVideoCopyAllTokens()
commandSequence += self.generateAudioProcessingTokens()
if self.__context['perform_cut']:
commandSequence += self.generateCropTokens()
commandSequence += self.generateOutputTokens(targetPath,
targetFormat)
self.__logger.debug("FfxController.runJob(): Running command sequence")
if not self.__context['dry_run']:
self.executeCommandSequence(commandSequence)
return
if videoEncoder == VideoEncoder.AV1: if videoEncoder == VideoEncoder.AV1:
commandSequence = (commandTokens commandSequence = (commandTokens
@@ -396,7 +451,7 @@ class FfxController():
if td.getCodec != TrackCodec.PNG: if td.getCodec != TrackCodec.PNG:
commandSequence += self.generateAV1Tokens(int(quality), int(preset)) commandSequence += self.generateAV1Tokens(int(quality), int(preset))
commandSequence += self.generateAudioEncodingTokens() commandSequence += self.generateAudioProcessingTokens()
if self.__context['perform_cut']: if self.__context['perform_cut']:
commandSequence += self.generateCropTokens() commandSequence += self.generateCropTokens()
@@ -426,7 +481,7 @@ class FfxController():
if td.getCodec != TrackCodec.PNG: if td.getCodec != TrackCodec.PNG:
commandSequence += self.generateH264Tokens(int(quality)) commandSequence += self.generateH264Tokens(int(quality))
commandSequence += self.generateAudioEncodingTokens() commandSequence += self.generateAudioProcessingTokens()
if self.__context['perform_cut']: if self.__context['perform_cut']:
commandSequence += self.generateCropTokens() commandSequence += self.generateCropTokens()
@@ -485,7 +540,7 @@ class FfxController():
if td.getCodec != TrackCodec.PNG: if td.getCodec != TrackCodec.PNG:
commandSequence2 += self.generateVP9Pass2Tokens(int(quality)) commandSequence2 += self.generateVP9Pass2Tokens(int(quality))
commandSequence2 += self.generateAudioEncodingTokens() commandSequence2 += self.generateAudioProcessingTokens()
if self.__context['perform_cut']: if self.__context['perform_cut']:
commandSequence2 += self.generateCropTokens() commandSequence2 += self.generateCropTokens()

View File

@@ -6,6 +6,7 @@ from textual.screen import Screen
from textual.widgets import DataTable from textual.widgets import DataTable
from textual.widgets._data_table import CellDoesNotExist from textual.widgets._data_table import CellDoesNotExist
from ffx.attachment_format import AttachmentFormat
from ffx.audio_layout import AudioLayout from ffx.audio_layout import AudioLayout
from ffx.file_properties import FileProperties from ffx.file_properties import FileProperties
from ffx.helper import DIFF_ADDED_KEY, DIFF_CHANGED_KEY, DIFF_REMOVED_KEY from ffx.helper import DIFF_ADDED_KEY, DIFF_CHANGED_KEY, DIFF_REMOVED_KEY
@@ -127,9 +128,17 @@ class MediaWorkflowScreenBase(Screen):
def _track_codec_cell_value(self, trackDescriptor: TrackDescriptor) -> str: def _track_codec_cell_value(self, trackDescriptor: TrackDescriptor) -> str:
if trackDescriptor.getType() == TrackType.ATTACHMENT: if trackDescriptor.getType() == TrackType.ATTACHMENT:
return " " attachmentFormat = trackDescriptor.getAttachmentFormat()
if attachmentFormat == AttachmentFormat.UNKNOWN:
return attachmentFormat.identifier()
return attachmentFormat.label()
return trackDescriptor.getFormatDescriptor().label() return trackDescriptor.getFormatDescriptor().label()
def _track_language_cell_value(self, trackDescriptor: TrackDescriptor) -> str:
if trackDescriptor.getType() == TrackType.ATTACHMENT:
return " "
return trackDescriptor.getLanguage().label()
def _track_disposition_cell_value( def _track_disposition_cell_value(
self, self,
trackDescriptor: TrackDescriptor, trackDescriptor: TrackDescriptor,
@@ -244,7 +253,7 @@ class MediaWorkflowScreenBase(Screen):
if trackType == TrackType.AUDIO if trackType == TrackType.AUDIO
and audioLayout != AudioLayout.LAYOUT_UNDEFINED and audioLayout != AudioLayout.LAYOUT_UNDEFINED
else " ", else " ",
trackDescriptor.getLanguage().label(), self._track_language_cell_value(trackDescriptor),
trackTitle, trackTitle,
self._track_disposition_cell_value( self._track_disposition_cell_value(
trackDescriptor, trackDescriptor,

View File

@@ -88,6 +88,9 @@ class PatternDetailsScreen(Screen):
.three { .three {
column-span: 3; column-span: 3;
} }
.two {
column-span: 2;
}
.four { .four {
column-span: 4; column-span: 4;
@@ -114,7 +117,7 @@ class PatternDetailsScreen(Screen):
} }
.yellow { .yellow {
tint: yellow 40%; color: yellow;
} }
""" """
@@ -331,6 +334,7 @@ class PatternDetailsScreen(Screen):
if not self.__showDescriptor is None: if not self.__showDescriptor is None:
self.query_one("#showlabel", Static).update(f"{self.__showDescriptor.getId()} - {self.__showDescriptor.getName()} ({self.__showDescriptor.getYear()})") self.query_one("#showlabel", Static).update(f"{self.__showDescriptor.getId()} - {self.__showDescriptor.getName()} ({self.__showDescriptor.getYear()})")
self.updateShowQualityHint()
if self.__pattern is not None: if self.__pattern is not None:
@@ -350,6 +354,7 @@ class PatternDetailsScreen(Screen):
if not hasattr(self, "tracksTable") or not hasattr(self, "tagsTable"): if not hasattr(self, "tracksTable") or not hasattr(self, "tagsTable"):
return return
self.updateShowQualityHint()
self.updateTags() self.updateTags()
self.updateTracks() self.updateTracks()
@@ -415,7 +420,9 @@ class PatternDetailsScreen(Screen):
# Row 4 # Row 4
yield Static(t("Quality")) yield Static(t("Quality"))
yield Input(type="integer", id="quality_input") yield Input(type="integer", id="quality_input")
yield Static(' ', classes="five") yield Static(" ")
yield Static("", id="show_quality_hint", classes="two yellow")
yield Static(' ', classes="two")
# Row 5 # Row 5
@@ -504,6 +511,23 @@ class PatternDetailsScreen(Screen):
def getPatternFromInput(self): def getPatternFromInput(self):
return str(self.query_one("#pattern_input", Input).value) return str(self.query_one("#pattern_input", Input).value)
def getShowQualityHintText(self):
if self.__showDescriptor is None:
return ""
showQuality = int(self.__showDescriptor.getQuality() or 0)
if showQuality <= 0:
return ""
patternQuality = int(getattr(self.__pattern, "quality", 0) or 0)
if patternQuality > 0:
return ""
return f"{t('Show')}: {showQuality}"
def updateShowQualityHint(self):
self.query_one("#show_quality_hint", Static).update(self.getShowQualityHintText())
def getQualityFromInput(self): def getQualityFromInput(self):
try: try:
return int(self.query_one("#quality_input", Input).value) return int(self.query_one("#quality_input", Input).value)

View File

@@ -1,7 +1,10 @@
import os import os
import shlex import shlex
import signal
import subprocess import subprocess
from typing import Iterable, List import threading
import time
from typing import Callable, Iterable, List
from .logging_utils import get_ffx_logger from .logging_utils import get_ffx_logger
@@ -118,6 +121,8 @@ def executeProcess(
directory: str = None, directory: str = None,
context: dict = None, context: dict = None,
timeoutSeconds: float = None, timeoutSeconds: float = None,
stdoutLineHandler: Callable[[str], bool] | None = None,
stderrLineHandler: Callable[[str], bool] | None = None,
): ):
logger = context['logger'] if context is not None and 'logger' in context else get_ffx_logger() logger = context['logger'] if context is not None and 'logger' in context else get_ffx_logger()
@@ -131,6 +136,16 @@ def executeProcess(
formatCommandSequence(wrappedCommandSequence), formatCommandSequence(wrappedCommandSequence),
) )
if stdoutLineHandler is not None or stderrLineHandler is not None:
return executeStreamingProcess(
wrappedCommandSequence,
directory=directory,
logger=logger,
timeoutSeconds=timeoutSeconds,
stdoutLineHandler=stdoutLineHandler,
stderrLineHandler=stderrLineHandler,
)
try: try:
completed = subprocess.run( completed = subprocess.run(
wrappedCommandSequence, wrappedCommandSequence,
@@ -167,3 +182,162 @@ def executeProcess(
) )
return completed.stdout, completed.stderr, completed.returncode return completed.stdout, completed.stderr, completed.returncode
def terminateProcess(process: subprocess.Popen, *, killAfterSeconds: float = 1.0) -> None:
if process.poll() is not None:
return
try:
if hasattr(os, "killpg"):
os.killpg(process.pid, signal.SIGTERM)
else:
process.terminate()
except ProcessLookupError:
return
deadline = time.monotonic() + killAfterSeconds
while process.poll() is None and time.monotonic() < deadline:
time.sleep(0.05)
if process.poll() is not None:
return
try:
if hasattr(os, "killpg"):
os.killpg(process.pid, signal.SIGKILL)
else:
process.kill()
except ProcessLookupError:
return
def readProcessStream(
stream,
outputParts: list[str],
lineHandler: Callable[[str], bool] | None,
stopRequested: threading.Event,
logger,
) -> None:
try:
for line in iter(stream.readline, ''):
outputParts.append(line)
if lineHandler is None:
continue
try:
if lineHandler(line):
stopRequested.set()
except Exception:
logger.exception("Process line handler raised an exception")
finally:
stream.close()
def executeStreamingProcess(
commandSequence: List[str],
*,
directory: str = None,
logger = None,
timeoutSeconds: float = None,
stdoutLineHandler: Callable[[str], bool] | None = None,
stderrLineHandler: Callable[[str], bool] | None = None,
):
logger = logger or get_ffx_logger()
try:
process = subprocess.Popen(
commandSequence,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
cwd=directory,
bufsize=1,
start_new_session=True,
)
except FileNotFoundError as ex:
error = (
"Command not found while running "
+ f"{formatCommandSequence(commandSequence)}: {ex.filename or ex}"
)
logger.error(error)
return '', error, COMMAND_NOT_FOUND_RETURN_CODE
stdoutParts: list[str] = []
stderrParts: list[str] = []
stopRequested = threading.Event()
timedOut = False
stdoutThread = threading.Thread(
target=readProcessStream,
args=(
process.stdout,
stdoutParts,
stdoutLineHandler,
stopRequested,
logger,
),
daemon=True,
)
stderrThread = threading.Thread(
target=readProcessStream,
args=(
process.stderr,
stderrParts,
stderrLineHandler,
stopRequested,
logger,
),
daemon=True,
)
stdoutThread.start()
stderrThread.start()
deadline = (
time.monotonic() + float(timeoutSeconds)
if timeoutSeconds is not None
else None
)
terminationRequested = False
while process.poll() is None:
if stopRequested.is_set():
terminationRequested = True
terminateProcess(process)
break
if deadline is not None and time.monotonic() >= deadline:
timedOut = True
terminationRequested = True
terminateProcess(process)
break
time.sleep(0.05)
returnCode = process.wait()
stdoutThread.join()
stderrThread.join()
stdout = ''.join(stdoutParts)
stderr = ''.join(stderrParts)
if timedOut:
error = (
f"Command timed out after {timeoutSeconds} seconds while running "
+ formatCommandSequence(commandSequence)
)
if stderr:
error = f"{error}\n{stderr}"
logger.error(error)
return stdout, error, COMMAND_TIMED_OUT_RETURN_CODE
if returnCode != 0 and not terminationRequested:
logger.warning(
"executeProcess() rc=%s command=%s",
returnCode,
formatCommandSequence(commandSequence),
)
return stdout, stderr, returnCode

View File

@@ -0,0 +1,211 @@
from __future__ import annotations
import os
from pathlib import Path
import sys
import tempfile
import unittest
from unittest.mock import patch
from click.testing import CliRunner
SRC_ROOT = Path(__file__).resolve().parents[2] / "src"
if str(SRC_ROOT) not in sys.path:
sys.path.insert(0, str(SRC_ROOT))
from ffx import cli # noqa: E402
from ffx.diagnostics import FfmpegSkipFileWarning, recordUnremediedIssue # noqa: E402
from ffx.logging_utils import get_ffx_logger # noqa: E402
class _FakeMediaDescriptor:
def getVideoTracks(self):
return []
def getAudioTracks(self):
return []
def getSubtitleTracks(self):
return []
def getAttachmentTracks(self):
return []
def applyOverrides(self, overrides):
return None
class _FakeFileProperties:
def __init__(self, context, source_path):
self.source_path = source_path
def getShowId(self):
return -1
def getSeason(self):
return -1
def getEpisode(self):
return -1
def getMediaDescriptor(self):
return _FakeMediaDescriptor()
def getPattern(self):
return None
class _FakeShiftedSeasonController:
def __init__(self, context):
self.context = context
def shiftSeason(self, show_id, season, episode, patternId=None):
return season, episode
class _FakeShowController:
def __init__(self, context):
self.context = context
def getShowDescriptor(self, show_id):
return None
class _FakeFfxController:
calls: list[str] = []
mode = "skip_first"
def __init__(self, context, *args, **kwargs):
self.context = context
def runJob(self, sourcePath, *args, **kwargs):
self.calls.append(sourcePath)
if self.mode == "clean":
return
if self.mode == "warn_unhandled" and sourcePath.endswith("episode1.avi"):
recordUnremediedIssue(
self.context,
sourcePath,
"unhandled-warning",
)
return
if self.mode == "skip_first" and sourcePath.endswith("episode1.avi"):
message = (
f"Skipping file {sourcePath}: ffmpeg still reported unset packet "
+ "timestamps after retry with -fflags +genpts."
)
recordUnremediedIssue(
self.context,
sourcePath,
"retry-with-generated-pts",
)
self.context["logger"].warning(message)
raise FfmpegSkipFileWarning(message)
class ConvertDiagnosticCliTests(unittest.TestCase):
def setUp(self):
logger = get_ffx_logger()
for handler in list(logger.handlers):
logger.removeHandler(handler)
try:
handler.close()
except Exception:
pass
self.tempdir = tempfile.TemporaryDirectory()
self.home_dir = Path(self.tempdir.name) / "home"
self.home_dir.mkdir()
self.database_path = Path(self.tempdir.name) / "test.db"
self.source_dir = Path(self.tempdir.name) / "source"
self.source_dir.mkdir()
self.source_one = self.source_dir / "episode1.avi"
self.source_two = self.source_dir / "episode2.avi"
self.source_one.write_bytes(b"one")
self.source_two.write_bytes(b"two")
_FakeFfxController.calls = []
_FakeFfxController.mode = "skip_first"
def tearDown(self):
self.tempdir.cleanup()
def test_convert_continues_after_skipping_one_file_due_to_ffmpeg_diagnostic(self):
runner = CliRunner()
with (
patch("ffx.file_properties.FileProperties", _FakeFileProperties),
patch("ffx.ffx_controller.FfxController", _FakeFfxController),
patch(
"ffx.shifted_season_controller.ShiftedSeasonController",
_FakeShiftedSeasonController,
),
patch("ffx.show_controller.ShowController", _FakeShowController),
):
result = runner.invoke(
cli.ffx,
[
"--database-file",
str(self.database_path),
"convert",
"--no-tmdb",
"--no-pattern",
str(self.source_one),
str(self.source_two),
],
env={**os.environ, "HOME": str(self.home_dir)},
)
self.assertEqual(0, result.exit_code, result.output)
self.assertEqual(
[str(self.source_one), str(self.source_two)],
_FakeFfxController.calls,
)
self.assertIn("Skipping file", result.output)
self.assertIn("-fflags +genpts", result.output)
self.assertIn("Files with ffmpeg findings that require review:", result.output)
self.assertIn(
"episode1.avi: retry-with-generated-pts",
result.output,
)
def test_convert_prints_clean_summary_when_no_unremedied_issues_were_seen(self):
runner = CliRunner()
_FakeFfxController.mode = "clean"
with (
patch("ffx.file_properties.FileProperties", _FakeFileProperties),
patch("ffx.ffx_controller.FfxController", _FakeFfxController),
patch(
"ffx.shifted_season_controller.ShiftedSeasonController",
_FakeShiftedSeasonController,
),
patch("ffx.show_controller.ShowController", _FakeShowController),
):
result = runner.invoke(
cli.ffx,
[
"--database-file",
str(self.database_path),
"convert",
"--no-tmdb",
"--no-pattern",
str(self.source_one),
str(self.source_two),
],
env={**os.environ, "HOME": str(self.home_dir)},
)
self.assertEqual(0, result.exit_code, result.output)
self.assertIn(
"All files converted with no issues.",
result.output,
)
if __name__ == "__main__":
unittest.main()

View File

@@ -263,6 +263,47 @@ class CliLazyImportTests(unittest.TestCase):
result["modules"], result["modules"],
) )
def test_convert_copy_flags_parse_without_loading_runtime_modules(self):
result = self.run_python(
textwrap.dedent(
f"""
import click
import json
import sys
sys.path.insert(0, {str(SRC_ROOT)!r})
import ffx.cli
context = ffx.cli.convert.make_context(
"convert",
["--copy-video", "--copy-audio"],
resilient_parsing=True,
)
help_output = ffx.cli.convert.get_help(click.Context(ffx.cli.convert))
print(json.dumps({{
"copy_video": context.params["copy_video"],
"copy_audio": context.params["copy_audio"],
"output": help_output,
"modules": {{
module_name: module_name in sys.modules
for module_name in {HEAVY_MODULES!r}
}},
}}))
"""
)
)
self.assertTrue(result["copy_video"])
self.assertTrue(result["copy_audio"])
self.assertIn("--copy-video", result["output"])
self.assertIn("--copy-audio", result["output"])
self.assertTrue(
all(not is_loaded for is_loaded in result["modules"].values()),
result["modules"],
)
def test_edit_command_avoids_database_bootstrap(self): def test_edit_command_avoids_database_bootstrap(self):
result = self.run_python( result = self.run_python(
textwrap.dedent( textwrap.dedent(

View File

@@ -68,11 +68,14 @@ class UpgradeCommandTests(unittest.TestCase):
subprocess_calls.append((args, kwargs)) subprocess_calls.append((args, kwargs))
if args == ['git', 'status', '--porcelain', '--untracked-files=no']: if args == ['git', 'status', '--porcelain', '--untracked-files=no']:
return self.make_completed(args, stdout="M src/ffx/constants.py\n") return self.make_completed(args, stdout="M src/ffx/constants.py\n")
if args == ['git', 'rev-parse', '--abbrev-ref', 'HEAD']:
return self.make_completed(args, stdout="main\n")
return self.make_completed(args) return self.make_completed(args)
with ( with (
patch.object(cli, "getBundleRepoPath", return_value=repo_path), patch.object(cli, "getBundleRepoPath", return_value=repo_path),
patch.object(cli, "getBundlePipPath", return_value=pip_path), patch.object(cli, "getBundlePipPath", return_value=pip_path),
patch.object(cli, "getBundleVersion", return_value="0.3.2"),
patch.object(cli.os.path, "isdir", return_value=True), patch.object(cli.os.path, "isdir", return_value=True),
patch.object(cli.os.path, "isfile", return_value=True), patch.object(cli.os.path, "isfile", return_value=True),
patch.object(cli.subprocess, "run", side_effect=fake_run), patch.object(cli.subprocess, "run", side_effect=fake_run),
@@ -81,6 +84,7 @@ class UpgradeCommandTests(unittest.TestCase):
self.assertEqual(0, result.exit_code, result.output) self.assertEqual(0, result.exit_code, result.output)
self.assertIn("Tracked local changes detected in the bundle repository:", result.output) self.assertIn("Tracked local changes detected in the bundle repository:", result.output)
self.assertIn("Updated FFX to version 0.3.2 from branch main.", result.output)
self.assertEqual( self.assertEqual(
[ [
['git', 'status', '--porcelain', '--untracked-files=no'], ['git', 'status', '--porcelain', '--untracked-files=no'],
@@ -89,6 +93,7 @@ class UpgradeCommandTests(unittest.TestCase):
['git', 'checkout', '-B', 'main', 'FETCH_HEAD'], ['git', 'checkout', '-B', 'main', 'FETCH_HEAD'],
[pip_path, 'install', '--upgrade', 'pip', 'setuptools', 'wheel'], [pip_path, 'install', '--upgrade', 'pip', 'setuptools', 'wheel'],
[pip_path, 'install', '--editable', '.'], [pip_path, 'install', '--editable', '.'],
['git', 'rev-parse', '--abbrev-ref', 'HEAD'],
], ],
[call[0] for call in subprocess_calls], [call[0] for call in subprocess_calls],
) )
@@ -106,11 +111,14 @@ class UpgradeCommandTests(unittest.TestCase):
subprocess_calls.append((args, kwargs)) subprocess_calls.append((args, kwargs))
if args == ['git', 'status', '--porcelain', '--untracked-files=no']: if args == ['git', 'status', '--porcelain', '--untracked-files=no']:
return self.make_completed(args, stdout="") return self.make_completed(args, stdout="")
if args == ['git', 'rev-parse', '--abbrev-ref', 'HEAD']:
return self.make_completed(args, stdout="develop\n")
return self.make_completed(args) return self.make_completed(args)
with ( with (
patch.object(cli, "getBundleRepoPath", return_value=repo_path), patch.object(cli, "getBundleRepoPath", return_value=repo_path),
patch.object(cli, "getBundlePipPath", return_value=pip_path), patch.object(cli, "getBundlePipPath", return_value=pip_path),
patch.object(cli, "getBundleVersion", return_value="0.3.3"),
patch.object(cli.os.path, "isdir", return_value=True), patch.object(cli.os.path, "isdir", return_value=True),
patch.object(cli.os.path, "isfile", return_value=True), patch.object(cli.os.path, "isfile", return_value=True),
patch.object(cli.subprocess, "run", side_effect=fake_run), patch.object(cli.subprocess, "run", side_effect=fake_run),
@@ -118,12 +126,14 @@ class UpgradeCommandTests(unittest.TestCase):
result = runner.invoke(cli.ffx, ["upgrade"]) result = runner.invoke(cli.ffx, ["upgrade"])
self.assertEqual(0, result.exit_code, result.output) self.assertEqual(0, result.exit_code, result.output)
self.assertIn("Updated FFX to version 0.3.3 from branch develop.", result.output)
self.assertEqual( self.assertEqual(
[ [
['git', 'status', '--porcelain', '--untracked-files=no'], ['git', 'status', '--porcelain', '--untracked-files=no'],
['git', 'pull'], ['git', 'pull'],
[pip_path, 'install', '--upgrade', 'pip', 'setuptools', 'wheel'], [pip_path, 'install', '--upgrade', 'pip', 'setuptools', 'wheel'],
[pip_path, 'install', '--editable', '.'], [pip_path, 'install', '--editable', '.'],
['git', 'rev-parse', '--abbrev-ref', 'HEAD'],
], ],
[call[0] for call in subprocess_calls], [call[0] for call in subprocess_calls],
) )

View File

@@ -0,0 +1,196 @@
from __future__ import annotations
from pathlib import Path
import sys
import unittest
from unittest.mock import patch
SRC_ROOT = Path(__file__).resolve().parents[2] / "src"
if str(SRC_ROOT) not in sys.path:
sys.path.insert(0, str(SRC_ROOT))
from ffx.diagnostics import ( # noqa: E402
FfmpegCommandRunner,
FfmpegDiagnosticMonitor,
FfmpegSkipFileWarning,
getUnremediedIssues,
iterUnremediedIssueSummaryLines,
)
class RecordingLogger:
def __init__(self):
self.messages: list[str] = []
def warning(self, message, *args, **kwargs):
if args:
message = message % args
self.messages.append(str(message))
class FfmpegDiagnosticsTests(unittest.TestCase):
def test_command_runner_retries_with_genpts_after_timestamp_warning(self):
logger = RecordingLogger()
context = {
"logger": logger,
"current_source_path": "tests/assets/avi/conan_S01E754_amalgam.avi",
}
runner = FfmpegCommandRunner(context)
commands = []
def fake_execute(commandSequence, **kwargs):
commands.append(list(commandSequence))
stderrLineHandler = kwargs["stderrLineHandler"]
if len(commands) == 1:
self.assertTrue(
stderrLineHandler(
"[matroska @ 0x1] Timestamps are unset in a packet for stream 0. "
+ "This is deprecated and will stop working in the future."
)
)
return "", "timestamp warning\n", -15
return "done", "", 0
with patch("ffx.diagnostics.monitor.executeProcess", side_effect=fake_execute):
out, err, rc = runner.execute(["ffmpeg", "-y", "-i", "input.avi", "output.mkv"])
self.assertEqual("done", out)
self.assertEqual("", err)
self.assertEqual(0, rc)
self.assertEqual(
[
["ffmpeg", "-y", "-i", "input.avi", "output.mkv"],
["ffmpeg", "-fflags", "+genpts", "-y", "-i", "input.avi", "output.mkv"],
],
commands,
)
self.assertEqual(
[
"ffmpeg reported unset packet timestamps for tests/assets/avi/conan_S01E754_amalgam.avi. "
+ "Stopping early and retrying with -fflags +genpts."
],
logger.messages,
)
self.assertEqual({}, getUnremediedIssues(context))
def test_command_runner_skips_file_when_timestamp_warning_persists_after_genpts(self):
logger = RecordingLogger()
context = {
"logger": logger,
"current_source_path": "tests/assets/avi/conan_S01E754_amalgam.avi",
}
runner = FfmpegCommandRunner(context)
def fake_execute(commandSequence, **kwargs):
stderrLineHandler = kwargs["stderrLineHandler"]
self.assertTrue(
stderrLineHandler(
"[matroska @ 0x1] Timestamps are unset in a packet for stream 0. "
+ "This is deprecated and will stop working in the future."
)
)
return "", "timestamp warning\n", -15
with patch("ffx.diagnostics.monitor.executeProcess", side_effect=fake_execute):
with self.assertRaises(FfmpegSkipFileWarning):
runner.execute(
["ffmpeg", "-fflags", "+genpts", "-y", "-i", "input.avi", "output.mkv"]
)
self.assertEqual(
[
"Skipping file tests/assets/avi/conan_S01E754_amalgam.avi: ffmpeg still reported "
+ "unset packet timestamps after retry with -fflags +genpts."
],
logger.messages,
)
self.assertEqual(
{
"tests/assets/avi/conan_S01E754_amalgam.avi": ["retry-with-generated-pts"]
},
getUnremediedIssues(context),
)
def test_monitor_tracks_non_harmless_corrupt_mpeg_audio_remedy_in_summary(self):
logger = RecordingLogger()
context = {
"logger": logger,
"current_source_path": "tests/assets/avi/conan_S01E763_amalgam.avi",
}
monitor = FfmpegDiagnosticMonitor(
context,
["ffmpeg", "-y", "-i", "input.avi", "output.mkv"],
)
self.assertFalse(
monitor.handle_stderr_line("[mp3float @ 0x1] invalid new backstep -1")
)
self.assertFalse(monitor.handle_stderr_line("[mp3float @ 0x1] invalid block type"))
self.assertFalse(
monitor.handle_stderr_line(
"[aist#0:1/mp3 @ 0x2] [dec:mp3float @ 0x3] Error submitting packet to decoder: "
+ "Invalid data found when processing input"
)
)
self.assertEqual(
[
"ffmpeg reported damaged MPEG audio frames while converting "
+ "tests/assets/avi/conan_S01E763_amalgam.avi. FFX will continue, but the "
+ "output audio may contain gaps or glitches."
],
logger.messages,
)
self.assertEqual(
{
"tests/assets/avi/conan_S01E763_amalgam.avi": ["warn-corrupt-mpeg-audio"]
},
getUnremediedIssues(context),
)
self.assertEqual(
["conan_S01E763_amalgam.avi: warn-corrupt-mpeg-audio"],
iterUnremediedIssueSummaryLines(context),
)
def test_monitor_tracks_unhandled_diagnostic_for_summary(self):
context = {
"logger": RecordingLogger(),
"current_source_path": "tests/assets/avi/example.avi",
}
monitor = FfmpegDiagnosticMonitor(
context,
["ffmpeg", "-y", "-i", "input.avi", "output.mkv"],
)
self.assertFalse(
monitor.handle_stderr_line(
"[avi @ 0x1] Strange warning with no automatic remedy is present"
)
)
self.assertEqual(
{
"tests/assets/avi/example.avi": ["unhandled-warning"]
},
getUnremediedIssues(context),
)
self.assertEqual(
["example.avi: unhandled-warning"],
iterUnremediedIssueSummaryLines(context),
)
self.assertEqual(
[
"ffmpeg reported a diagnostic with no automatic remedy while converting "
+ "tests/assets/avi/example.avi. FFX will continue, but review the output "
+ "file. First unhandled line: [avi @ 0x1] Strange warning with no automatic remedy is present"
],
context["logger"].messages,
)
if __name__ == "__main__":
unittest.main()

View File

@@ -15,6 +15,7 @@ if str(SRC_ROOT) not in sys.path:
from ffx.ffx_controller import FfxController # noqa: E402 from ffx.ffx_controller import FfxController # noqa: E402
from ffx.audio_layout import AudioLayout # noqa: E402
from ffx.logging_utils import get_ffx_logger # noqa: E402 from ffx.logging_utils import get_ffx_logger # noqa: E402
from ffx.media_descriptor import MediaDescriptor # noqa: E402 from ffx.media_descriptor import MediaDescriptor # noqa: E402
from ffx.show_descriptor import ShowDescriptor # noqa: E402 from ffx.show_descriptor import ShowDescriptor # noqa: E402
@@ -43,6 +44,8 @@ class FfxControllerTests(unittest.TestCase):
"video_encoder": video_encoder, "video_encoder": video_encoder,
"dry_run": False, "dry_run": False,
"perform_cut": False, "perform_cut": False,
"copy_video": False,
"copy_audio": False,
"bitrates": { "bitrates": {
"stereo": "112k", "stereo": "112k",
"ac3": "256k", "ac3": "256k",
@@ -75,6 +78,56 @@ class FfxControllerTests(unittest.TestCase):
) )
return descriptor, source_descriptor return descriptor, source_descriptor
def make_media_descriptors_with_audio(
self,
audio_layout: AudioLayout = AudioLayout.LAYOUT_STEREO,
) -> tuple[MediaDescriptor, MediaDescriptor]:
descriptor = MediaDescriptor(
track_descriptors=[
TrackDescriptor(
index=0,
source_index=0,
sub_index=0,
track_type=TrackType.VIDEO,
codec_name=TrackCodec.H264,
),
TrackDescriptor(
index=1,
source_index=1,
sub_index=0,
track_type=TrackType.AUDIO,
codec_name=TrackCodec.AAC,
audio_layout=audio_layout,
),
]
)
source_descriptor = MediaDescriptor(
track_descriptors=[
TrackDescriptor(
index=0,
source_index=0,
sub_index=0,
track_type=TrackType.VIDEO,
codec_name=TrackCodec.H264,
),
TrackDescriptor(
index=1,
source_index=1,
sub_index=0,
track_type=TrackType.AUDIO,
codec_name=TrackCodec.AAC,
audio_layout=audio_layout,
),
]
)
return descriptor, source_descriptor
def assert_token_pair(self, command: list[str], first: str, second: str):
self.assertTrue(
any(command[index:index + 2] == [first, second] for index in range(len(command) - 1)),
command,
)
def test_vp9_run_job_emits_file_level_encoding_quality_metadata(self): def test_vp9_run_job_emits_file_level_encoding_quality_metadata(self):
context = self.make_context(VideoEncoder.VP9) context = self.make_context(VideoEncoder.VP9)
target_descriptor, source_descriptor = self.make_media_descriptors() target_descriptor, source_descriptor = self.make_media_descriptors()
@@ -196,6 +249,79 @@ class FfxControllerTests(unittest.TestCase):
self.assertIn("ENCODING_QUALITY=19", commands[0]) self.assertIn("ENCODING_QUALITY=19", commands[0])
mocked_info.assert_any_call("Setting quality 19 from pattern") mocked_info.assert_any_call("Setting quality 19 from pattern")
def test_copy_video_uses_single_copy_command_without_video_encoding_options(self):
context = self.make_context(VideoEncoder.VP9)
context["copy_video"] = True
target_descriptor, source_descriptor = self.make_media_descriptors_with_audio()
controller = FfxController(context, target_descriptor, source_descriptor)
commands = []
with patch.object(
controller,
"executeCommandSequence",
side_effect=lambda command: commands.append(command) or ("", "", 0),
):
controller.runJob(
"input.mkv",
"output.mkv",
chainIteration=[
{
"identifier": "quality",
"parameters": {"quality": 27},
},
{
"identifier": "nlmeans",
"parameters": {},
"tokens": ["nlmeans=s=2.0"],
},
],
cropArguments={
"output_width": 1280,
"output_height": 720,
"x_offset": 0,
"y_offset": 0,
},
)
self.assertEqual(1, len(commands))
self.assert_token_pair(commands[0], "-c:v", "copy")
self.assertIn("libopus", commands[0])
self.assertNotIn("libvpx-vp9", commands[0])
self.assertNotIn("-pass", commands[0])
self.assertNotIn("-vf", commands[0])
self.assertFalse(any(token.startswith("ENCODING_QUALITY=") for token in commands[0]))
def test_copy_audio_uses_audio_copy_without_audio_encoding_options(self):
context = self.make_context(VideoEncoder.H264)
context["copy_audio"] = True
target_descriptor, source_descriptor = self.make_media_descriptors_with_audio(
AudioLayout.LAYOUT_5_1
)
controller = FfxController(context, target_descriptor, source_descriptor)
commands = []
with patch.object(
controller,
"executeCommandSequence",
side_effect=lambda command: commands.append(command) or ("", "", 0),
):
controller.runJob(
"input.mkv",
"output.mkv",
chainIteration=[
{
"identifier": "quality",
"parameters": {"quality": 21},
}
],
)
self.assertEqual(1, len(commands))
self.assert_token_pair(commands[0], "-c:a", "copy")
self.assertIn("libx264", commands[0])
self.assertNotIn("libopus", commands[0])
self.assertFalse(any(token.startswith("-b:a") for token in commands[0]))
self.assertFalse(any(token.startswith("-filter:a") for token in commands[0]))
def test_generate_h264_tokens_prefers_libx264_when_available(self): def test_generate_h264_tokens_prefers_libx264_when_available(self):
context = self.make_context(VideoEncoder.H264) context = self.make_context(VideoEncoder.H264)
target_descriptor, source_descriptor = self.make_media_descriptors() target_descriptor, source_descriptor = self.make_media_descriptors()

View File

@@ -2,6 +2,7 @@ from __future__ import annotations
from pathlib import Path from pathlib import Path
import sys import sys
import time
import unittest import unittest
from unittest.mock import patch from unittest.mock import patch
@@ -51,6 +52,33 @@ class ProcessTests(unittest.TestCase):
self.assertIn("Command timed out", err) self.assertIn("Command timed out", err)
self.assertIn(sys.executable, err) self.assertIn(sys.executable, err)
def test_execute_process_can_stop_early_while_streaming_stderr(self):
start = time.monotonic()
observed_lines = []
out, err, rc = executeProcess(
[
sys.executable,
"-c",
(
"import sys, time; "
"sys.stderr.write('fatal warning\\n'); sys.stderr.flush(); "
"time.sleep(2); "
"sys.stderr.write('late line\\n'); sys.stderr.flush()"
),
],
stderrLineHandler=lambda line: observed_lines.append(line) or ("fatal warning" in line),
)
elapsed = time.monotonic() - start
self.assertLess(elapsed, 1.5)
self.assertNotEqual(0, rc)
self.assertEqual("", out)
self.assertIn("fatal warning", err)
self.assertNotIn("late line", err)
self.assertEqual(["fatal warning\n"], observed_lines)
def test_get_wrapped_command_sequence_leaves_command_unwrapped_when_limits_disabled(self): def test_get_wrapped_command_sequence_leaves_command_unwrapped_when_limits_disabled(self):
wrapped = getWrappedCommandSequence( wrapped = getWrappedCommandSequence(
["ffmpeg", "-i", "input.mkv"], ["ffmpeg", "-i", "input.mkv"],

View File

@@ -548,6 +548,11 @@ class TagTableScreenStateTests(unittest.TestCase):
screen.tagsTable = FakeTagTable() screen.tagsTable = FakeTagTable()
screen.shiftedSeasonsTable = FakeTagTable() screen.shiftedSeasonsTable = FakeTagTable()
screen._PatternDetailsScreen__pattern = object() screen._PatternDetailsScreen__pattern = object()
screen._PatternDetailsScreen__showDescriptor = None
widgets = {
"#show_quality_hint": FakeStaticWidget(),
}
screen.query_one = lambda selector, _type=None: widgets[selector]
calls = [] calls = []
screen.updateTags = lambda: calls.append("updateTags") screen.updateTags = lambda: calls.append("updateTags")
@@ -561,6 +566,48 @@ class TagTableScreenStateTests(unittest.TestCase):
calls, calls,
) )
def test_pattern_details_screen_on_mount_shows_show_quality_hint_for_new_pattern(self):
set_current_language("en")
screen = object.__new__(PatternDetailsScreen)
screen.context = {}
screen._PatternDetailsScreen__showDescriptor = ShowDescriptor(
id=7,
name="Demo",
year=1999,
quality=23,
)
screen._PatternDetailsScreen__pattern = None
widgets = {
"#showlabel": FakeStaticWidget(),
"#show_quality_hint": FakeStaticWidget(),
}
screen.query_one = lambda selector, _type=None: widgets[selector]
screen.on_mount()
self.assertEqual("7 - Demo (1999)", widgets["#showlabel"].value)
self.assertEqual("Show: 23", widgets["#show_quality_hint"].value)
def test_pattern_details_screen_show_quality_hint_is_hidden_when_pattern_quality_exists(self):
set_current_language("en")
screen = object.__new__(PatternDetailsScreen)
screen._PatternDetailsScreen__showDescriptor = ShowDescriptor(
id=7,
name="Demo",
year=1999,
quality=23,
)
screen._PatternDetailsScreen__pattern = type(
"_Pattern",
(),
{"quality": 19},
)()
self.assertEqual("", screen.getShowQualityHintText())
def test_inspect_details_screen_handle_edit_pattern_refreshes_even_without_result(self): def test_inspect_details_screen_handle_edit_pattern_refreshes_even_without_result(self):
screen = object.__new__(InspectDetailsScreen) screen = object.__new__(InspectDetailsScreen)
@@ -722,7 +769,7 @@ class TagTableScreenStateTests(unittest.TestCase):
self.assertIn("English Full", screen.tracksTable.rows["row-0"]) self.assertIn("English Full", screen.tracksTable.rows["row-0"])
self.assertIs(target_track, screen.getSelectedTrackDescriptor()) self.assertIs(target_track, screen.getSelectedTrackDescriptor())
def test_inspect_details_screen_update_tracks_blanks_irrelevant_attachment_fields(self): def test_inspect_details_screen_update_tracks_shows_attachment_format_and_blanks_language(self):
attachment_track = TrackDescriptor( attachment_track = TrackDescriptor(
index=4, index=4,
source_index=4, source_index=4,
@@ -745,10 +792,36 @@ class TagTableScreenStateTests(unittest.TestCase):
row = screen.tracksTable.rows["row-0"] row = screen.tracksTable.rows["row-0"]
self.assertEqual("4", row[0]) self.assertEqual("4", row[0])
self.assertEqual(" ", row[3]) self.assertEqual("TTF", row[3])
self.assertEqual(" ", row[5])
self.assertEqual(" ", row[7]) self.assertEqual(" ", row[7])
self.assertEqual(" ", row[8]) self.assertEqual(" ", row[8])
def test_inspect_details_screen_update_tracks_shows_unknown_for_unknown_attachment_format(self):
attachment_track = TrackDescriptor(
index=5,
source_index=5,
sub_index=0,
track_type=TrackType.ATTACHMENT,
attachment_format=AttachmentFormat.UNKNOWN,
tags={"filename": "blob.bin", "mimetype": "application/octet-stream"},
)
screen = object.__new__(InspectDetailsScreen)
screen.tracksTable = FakeTagTable()
screen._sourceMediaDescriptor = FakeMediaDescriptor([attachment_track])
screen._targetMediaDescriptor = None
screen._currentPattern = None
screen._trackRowData = {}
screen._applyNormalization = False
screen.updateTracks()
row = screen.tracksTable.rows["row-0"]
self.assertEqual("unknown", row[3])
self.assertEqual(" ", row[5])
def test_inspect_details_screen_maps_target_selection_back_to_source_track(self): def test_inspect_details_screen_maps_target_selection_back_to_source_track(self):
source_track = TrackDescriptor( source_track = TrackDescriptor(
index=3, index=3,

View File

@@ -1,6 +1,5 @@
from __future__ import annotations from __future__ import annotations
import json
from pathlib import Path from pathlib import Path
import sys import sys
import unittest import unittest
@@ -13,15 +12,11 @@ if str(SRC_ROOT) not in sys.path:
from ffx.attachment_format import AttachmentFormat # noqa: E402 from ffx.attachment_format import AttachmentFormat # noqa: E402
from ffx.media_descriptor import MediaDescriptor # noqa: E402
from ffx.track_codec import TrackCodec # noqa: E402 from ffx.track_codec import TrackCodec # noqa: E402
from ffx.track_descriptor import TrackDescriptor # noqa: E402 from ffx.track_descriptor import TrackDescriptor # noqa: E402
from ffx.track_type import TrackType # noqa: E402 from ffx.track_type import TrackType # noqa: E402
ASSETS_ROOT = Path(__file__).resolve().parents[1] / "assets"
class TrackDescriptorProbeTests(unittest.TestCase): class TrackDescriptorProbeTests(unittest.TestCase):
def test_attachment_without_codec_name_uses_font_metadata_to_identify_ttf(self): def test_attachment_without_codec_name_uses_font_metadata_to_identify_ttf(self):
descriptor = TrackDescriptor.fromFfprobe( descriptor = TrackDescriptor.fromFfprobe(
@@ -62,26 +57,5 @@ class TrackDescriptorProbeTests(unittest.TestCase):
self.assertEqual(AttachmentFormat.UNKNOWN, descriptor.getAttachmentFormat()) self.assertEqual(AttachmentFormat.UNKNOWN, descriptor.getAttachmentFormat())
self.assertEqual(TrackCodec.UNKNOWN, descriptor.getCodec()) self.assertEqual(TrackCodec.UNKNOWN, descriptor.getCodec())
def test_media_descriptor_from_boruto_probe_json_handles_attachment_streams_without_codec_name(self):
probe_payload = json.loads(
(ASSETS_ROOT / "ffprobe.out.json").read_text(encoding="utf-8")
)
descriptor = MediaDescriptor.fromFfprobe(
{"logger": None},
probe_payload["format"],
probe_payload["streams"],
)
track_descriptors = descriptor.getTrackDescriptors()
attachment_tracks = descriptor.getAttachmentTracks()
self.assertEqual(14, len(track_descriptors))
self.assertEqual(10, len(attachment_tracks))
self.assertTrue(
all(track.getAttachmentFormat() == AttachmentFormat.TTF for track in attachment_tracks)
)
if __name__ == "__main__": if __name__ == "__main__":
unittest.main() unittest.main()