8 Commits

Author SHA1 Message Date
Javanaut
12be6e985a v0.4.2 2026-04-24 13:39:57 +02:00
Javanaut
12310942ae Fix inspect attachment subtracks 2026-04-24 10:34:43 +02:00
Javanaut
f913cb4fe3 ff 2026-04-24 08:49:48 +02:00
Javanaut
0a153280e3 ff 2026-04-24 08:49:30 +02:00
Javanaut
6ca0cd54b0 addendum 2026-04-23 22:16:03 +02:00
Javanaut
502a822bb4 prep 0.4.1 2026-04-23 22:09:36 +02:00
Javanaut
6cc21b5f36 Adds diagnostics/remedy system 2026-04-23 20:32:49 +02:00
Javanaut
0034f8ca97 ff 2026-04-23 16:37:47 +02:00
22 changed files with 1255 additions and 55 deletions

View File

@@ -99,6 +99,19 @@ TMDB-backed metadata enrichment requires `TMDB_API_KEY` to be set in the environ
## Version History ## Version History
### 0.4.2
- pattern details now show an inline `Show: <quality>` hint next to the quality field when the pattern itself has no stored quality but the selected show does
- inspect stream tables now show attachment format labels like `TTF` in the codec column and keep attachment language cells blank instead of showing an undefined language
- ffmpeg damaged-MP3 diagnostics now recognize additional corruption lines such as `invalid new backstep`, keeping them grouped under the `warn-corrupt-mpeg-audio` review summary
### 0.4.1
- `convert` now supports `--copy-video` and `--copy-audio` to keep the selected stream type in copy mode without applying the corresponding reencode flags, filters, or formatting options
- ffmpeg conversions now monitor diagnostics while the process is running, retry unset AVI packet timestamps once with `-fflags +genpts`, and stop early when a file should be skipped instead of waiting for the full job to finish
- end-of-run convert summaries now list only ffmpeg findings that still require review, including named remedy identifiers such as `warn-corrupt-mpeg-audio`
- `upgrade` now finishes by reporting the installed FFX version together with the active bundle branch
### 0.3.1 ### 0.3.1
- debug mode screen titles now append the active Textual screen class name, making screen-specific troubleshooting easier during inspect and edit flows - debug mode screen titles now append the active Textual screen class name, making screen-specific troubleshooting easier during inspect and edit flows

View File

@@ -69,15 +69,3 @@
## Delete When ## Delete When
- Delete this scratchpad once the optimization backlog is either converted into issues/work items or distilled into durable project guidance. - Delete this scratchpad once the optimization backlog is either converted into issues/work items or distilled into durable project guidance.
## Missing Timestamps
Detect ffmpeg warning "Timestamps are unset in a packet for stream 0. This is deprecated and will stop working in the future. Fix your code to set the timestamps properly" and try autofix by -fflags +genpts -> Warning if fails -> Error. Check if flags collide with anything.
<!--
## Source Formats
-->

View File

@@ -1,7 +1,7 @@
[project] [project]
name = "ffx" name = "ffx"
description = "FFX recoding and metadata managing tool" description = "FFX recoding and metadata managing tool"
version = "0.3.1" version = "0.4.2"
license = {file = "LICENSE.md"} license = {file = "LICENSE.md"}
dependencies = [ dependencies = [
"requests", "requests",

View File

@@ -393,6 +393,41 @@ def getTrackedGitChanges(repoPath):
return [line for line in completed.stdout.splitlines() if line.strip()] return [line for line in completed.stdout.splitlines() if line.strip()]
def getCurrentGitBranch(repoPath):
completed = subprocess.run(
['git', 'rev-parse', '--abbrev-ref', 'HEAD'],
cwd=repoPath,
capture_output=True,
text=True,
)
if completed.returncode != 0:
commandLabel = 'git rev-parse --abbrev-ref HEAD'
errorOutput = completed.stderr.strip() or completed.stdout.strip()
raise click.ClickException(
f"Unable to inspect bundle repository branch using '{commandLabel}': {errorOutput}"
)
return completed.stdout.strip() or "unknown"
def getBundleVersion(repoPath):
constantsPath = os.path.join(repoPath, 'src', 'ffx', 'constants.py')
try:
with open(constantsPath, encoding='utf-8') as constantsFile:
for line in constantsFile:
strippedLine = line.strip()
if strippedLine.startswith('VERSION=') or strippedLine.startswith('VERSION ='):
return strippedLine.split('=', 1)[1].strip().strip('"\'')
except OSError as ex:
raise click.ClickException(
f"Unable to inspect bundle version from {constantsPath}: {ex}"
) from ex
raise click.ClickException(f"Unable to inspect bundle version from {constantsPath}")
def runScriptWrapper(ctx, scriptPath, missingDescription, commandArgs): def runScriptWrapper(ctx, scriptPath, missingDescription, commandArgs):
if not os.path.isfile(scriptPath): if not os.path.isfile(scriptPath):
raise click.ClickException(f"{missingDescription} not found at {scriptPath}") raise click.ClickException(f"{missingDescription} not found at {scriptPath}")
@@ -515,6 +550,10 @@ def upgrade(ctx, branch):
if completed.returncode != 0: if completed.returncode != 0:
ctx.exit(completed.returncode) ctx.exit(completed.returncode)
upgradedBranch = getCurrentGitBranch(bundleRepoPath)
upgradedVersion = getBundleVersion(bundleRepoPath)
click.echo(f"Updated FFX to version {upgradedVersion} from branch {upgradedBranch}.")
@ffx.command() @ffx.command()
@click.pass_context @click.pass_context
@@ -1082,6 +1121,11 @@ def convert(ctx,
Suffices will we appended to filename in case of multiple created files Suffices will we appended to filename in case of multiple created files
or if the filename has not changed.""" or if the filename has not changed."""
from ffx.ffx_controller import FfxController from ffx.ffx_controller import FfxController
from ffx.diagnostics import (
FfmpegSkipFileWarning,
getUnremediedIssues,
iterUnremediedIssueSummaryLines,
)
from ffx.file_properties import FileProperties from ffx.file_properties import FileProperties
from ffx.filter.crop_filter import CropFilter from ffx.filter.crop_filter import CropFilter
from ffx.filter.deinterlace_filter import DeinterlaceFilter from ffx.filter.deinterlace_filter import DeinterlaceFilter
@@ -1561,6 +1605,7 @@ def convert(ctx,
if rename_only: if rename_only:
shutil.move(sourcePath, targetPath) shutil.move(sourcePath, targetPath)
else: else:
try:
fc.runJob(sourcePath, fc.runJob(sourcePath,
targetPath, targetPath,
targetFormat, targetFormat,
@@ -1568,11 +1613,22 @@ def convert(ctx,
cropArguments, cropArguments,
currentPattern, currentPattern,
currentShowDescriptor) currentShowDescriptor)
except FfmpegSkipFileWarning:
if os.path.exists(targetPath):
os.remove(targetPath)
continue
endTime = time.perf_counter() endTime = time.perf_counter()
ctx.obj['logger'].info(f"\nDONE\nTime elapsed {endTime - startTime}") ctx.obj['logger'].info(f"\nDONE\nTime elapsed {endTime - startTime}")
unremediedIssues = getUnremediedIssues(context)
if unremediedIssues:
ctx.obj['logger'].warning("\nFiles with ffmpeg findings that require review:")
for summaryLine in iterUnremediedIssueSummaryLines(context):
ctx.obj['logger'].warning(summaryLine)
else:
ctx.obj['logger'].info("All files converted with no issues.")
if __name__ == '__main__': if __name__ == '__main__':

View File

@@ -1,4 +1,4 @@
VERSION='0.3.1' VERSION='0.4.2'
DATABASE_VERSION = 3 DATABASE_VERSION = 3
DEFAULT_QUALITY = 32 DEFAULT_QUALITY = 32

View File

@@ -0,0 +1,24 @@
from .base import FfmpegRemedy, FfmpegRemedyDecision, FfmpegSkipFileWarning
from .monitor import FfmpegCommandRunner, FfmpegDiagnosticMonitor
from .retry_with_generated_pts import RetryWithGeneratedPtsRemedy
from .state import (
getDiagnosticsState,
getUnremediedIssues,
iterUnremediedIssueSummaryLines,
recordUnremediedIssue,
)
from .warn_corrupt_mpeg_audio import WarnCorruptMpegAudioRemedy
__all__ = [
"FfmpegCommandRunner",
"FfmpegDiagnosticMonitor",
"FfmpegRemedy",
"FfmpegRemedyDecision",
"FfmpegSkipFileWarning",
"RetryWithGeneratedPtsRemedy",
"WarnCorruptMpegAudioRemedy",
"getDiagnosticsState",
"getUnremediedIssues",
"iterUnremediedIssueSummaryLines",
"recordUnremediedIssue",
]

View File

@@ -0,0 +1,33 @@
from __future__ import annotations
from dataclasses import dataclass
class FfmpegSkipFileWarning(Exception):
pass
@dataclass(frozen=True)
class FfmpegRemedyDecision:
stop_process: bool = False
retry_input_tokens: tuple[str, ...] = ()
skip_file: bool = False
console_warning: str = ""
summary_identifier: str = ""
unremedied_issue_identifier: str = ""
@property
def retry_requested(self) -> bool:
return bool(self.retry_input_tokens)
class FfmpegRemedy:
identifier = "ffmpeg-remedy"
harmless = False
def inspect_line(
self,
line: str,
session: "FfmpegDiagnosticMonitor",
) -> FfmpegRemedyDecision | None:
raise NotImplementedError

View File

@@ -0,0 +1,222 @@
from __future__ import annotations
import re
from ffx.logging_utils import get_ffx_logger
from ffx.process import executeProcess
from .base import FfmpegSkipFileWarning, FfmpegRemedy
from .retry_with_generated_pts import RetryWithGeneratedPtsRemedy
from .state import recordUnremediedIssue
from .warn_corrupt_mpeg_audio import WarnCorruptMpegAudioRemedy
UNHANDLED_DIAGNOSTIC_PATTERNS = (
re.compile(r"\bwarning\b", re.IGNORECASE),
re.compile(r"\berror\b", re.IGNORECASE),
re.compile(r"\bfailed\b", re.IGNORECASE),
re.compile(r"\binvalid\b", re.IGNORECASE),
re.compile(r"\bmissing\b", re.IGNORECASE),
re.compile(r"\bcorrupt\b", re.IGNORECASE),
re.compile(r"\boverflow\b", re.IGNORECASE),
re.compile(r"\bdeprecated\b", re.IGNORECASE),
)
class FfmpegDiagnosticMonitor:
def __init__(
self,
context: dict | None,
command_sequence: list[str],
*,
remedies: list[FfmpegRemedy] | None = None,
emittedWarnings: set[str] | None = None,
):
self.context = context or {}
self.command_sequence = list(command_sequence)
self.logger = self.context.get("logger", get_ffx_logger())
self.source_path = str(self.context.get("current_source_path", "")).strip()
self.remedies = remedies or [
RetryWithGeneratedPtsRemedy(),
WarnCorruptMpegAudioRemedy(),
]
self._emittedWarnings = emittedWarnings if emittedWarnings is not None else set()
self.retry_input_tokens: tuple[str, ...] = ()
self.skip_file = False
self.skip_file_message = ""
def describe_source(self) -> str:
return self.source_path if self.source_path else "current file"
def command_contains_tokens(self, tokens: tuple[str, ...]) -> bool:
tokenCount = len(tokens)
if tokenCount == 0:
return True
return any(
tuple(self.command_sequence[index:index + tokenCount]) == tuple(tokens)
for index in range(len(self.command_sequence) - tokenCount + 1)
)
def emitConsoleWarning(self, warningMessage: str) -> None:
if warningMessage and warningMessage not in self._emittedWarnings:
self.logger.warning(warningMessage)
self._emittedWarnings.add(warningMessage)
def recordUnremediedIssue(self, issueIdentifier: str, issueLine: str) -> None:
isFirstIssueForFile = recordUnremediedIssue(
self.context,
self.describe_source(),
issueIdentifier,
)
if not isFirstIssueForFile:
return
self.emitConsoleWarning(
f"ffmpeg reported a diagnostic with no automatic remedy while converting "
+ f"{self.describe_source()}. FFX will continue, but review the output "
+ f"file. First unhandled line: {issueLine}"
)
def lineLooksLikeUnhandledDiagnostic(self, line: str) -> bool:
return any(pattern.search(line) for pattern in UNHANDLED_DIAGNOSTIC_PATTERNS)
def getUnhandledDiagnosticIdentifier(self, line: str) -> str:
loweredLine = str(line).lower()
if any(token in loweredLine for token in ("error", "failed", "invalid", "missing", "corrupt", "overflow")):
return "unhandled-error"
if any(token in loweredLine for token in ("warning", "deprecated")):
return "unhandled-warning"
return "unhandled-diagnostic"
def getSummaryIdentifier(
self,
remedy: FfmpegRemedy,
decision,
) -> str:
explicitIdentifier = str(decision.summary_identifier).strip()
if explicitIdentifier:
return explicitIdentifier
remedyIdentifier = str(getattr(remedy, "identifier", "")).strip()
if remedyIdentifier and remedyIdentifier != FfmpegRemedy.identifier:
return remedyIdentifier
return str(decision.unremedied_issue_identifier).strip()
def shouldRecordSummary(
self,
remedy: FfmpegRemedy,
decision,
) -> bool:
if getattr(remedy, "harmless", False):
return False
if decision.retry_requested and not decision.skip_file:
return False
return bool(self.getSummaryIdentifier(remedy, decision))
def handle_stderr_line(self, line: str) -> bool:
strippedLine = str(line).strip()
if not strippedLine:
return False
for remedy in self.remedies:
decision = remedy.inspect_line(strippedLine, self)
if decision is None:
continue
self.emitConsoleWarning(decision.console_warning)
if decision.retry_requested:
self.retry_input_tokens = tuple(decision.retry_input_tokens)
if self.shouldRecordSummary(remedy, decision):
recordUnremediedIssue(
self.context,
self.describe_source(),
self.getSummaryIdentifier(remedy, decision),
)
if decision.skip_file:
self.skip_file = True
self.skip_file_message = (
decision.console_warning
or f"Skipping file {self.describe_source()} because ffmpeg reported a fatal diagnostic."
)
return bool(decision.stop_process)
if self.lineLooksLikeUnhandledDiagnostic(strippedLine):
self.recordUnremediedIssue(
self.getUnhandledDiagnosticIdentifier(strippedLine),
strippedLine,
)
return False
@property
def retry_requested(self) -> bool:
return bool(self.retry_input_tokens)
def insertFfmpegInputOptions(
commandSequence: list[str],
extraTokens: tuple[str, ...],
) -> list[str]:
if not extraTokens:
return list(commandSequence)
if not commandSequence:
return list(extraTokens)
return [commandSequence[0]] + list(extraTokens) + list(commandSequence[1:])
class FfmpegCommandRunner:
def __init__(
self,
context: dict | None,
*,
remedies: list[FfmpegRemedy] | None = None,
):
self.__context = context or {}
self.__remedies = remedies
def execute(
self,
commandSequence: list[str],
*,
directory: str = None,
timeoutSeconds: float = None,
):
emittedWarnings: set[str] = set()
attemptCommandSequence = list(commandSequence)
while True:
monitor = FfmpegDiagnosticMonitor(
self.__context,
attemptCommandSequence,
remedies=self.__remedies,
emittedWarnings=emittedWarnings,
)
out, err, rc = executeProcess(
attemptCommandSequence,
directory=directory,
context=self.__context,
timeoutSeconds=timeoutSeconds,
stderrLineHandler=monitor.handle_stderr_line,
)
if monitor.retry_requested:
attemptCommandSequence = insertFfmpegInputOptions(
attemptCommandSequence,
monitor.retry_input_tokens,
)
continue
if monitor.skip_file:
raise FfmpegSkipFileWarning(monitor.skip_file_message)
return out, err, rc

View File

@@ -0,0 +1,41 @@
from __future__ import annotations
import re
from .base import FfmpegRemedy, FfmpegRemedyDecision
class RetryWithGeneratedPtsRemedy(FfmpegRemedy):
identifier = "retry-with-generated-pts"
RETRY_INPUT_TOKENS = ("-fflags", "+genpts")
TIMESTAMP_UNSET_PATTERN = re.compile(
r"Timestamps are unset in a packet for stream \d+"
)
def inspect_line(
self,
line: str,
session: "FfmpegDiagnosticMonitor",
) -> FfmpegRemedyDecision | None:
if self.TIMESTAMP_UNSET_PATTERN.search(line) is None:
return None
if session.command_contains_tokens(self.RETRY_INPUT_TOKENS):
return FfmpegRemedyDecision(
stop_process=True,
skip_file=True,
console_warning=(
f"Skipping file {session.describe_source()}: ffmpeg still reported "
+ "unset packet timestamps after retry with -fflags +genpts."
),
unremedied_issue_identifier="timestamp-unset-after-genpts",
)
return FfmpegRemedyDecision(
stop_process=True,
retry_input_tokens=self.RETRY_INPUT_TOKENS,
console_warning=(
f"ffmpeg reported unset packet timestamps for {session.describe_source()}. "
+ "Stopping early and retrying with -fflags +genpts."
),
)

View File

@@ -0,0 +1,53 @@
from __future__ import annotations
import os
DIAGNOSTICS_STATE_KEY = "diagnostics_state"
UNREMEDIED_ISSUES_KEY = "unremedied_issues"
def getDiagnosticsState(context: dict | None) -> dict:
if context is None:
return {UNREMEDIED_ISSUES_KEY: {}}
if DIAGNOSTICS_STATE_KEY not in context:
context[DIAGNOSTICS_STATE_KEY] = {
UNREMEDIED_ISSUES_KEY: {},
}
return context[DIAGNOSTICS_STATE_KEY]
def recordUnremediedIssue(
context: dict | None,
sourcePath: str,
identifier: str,
) -> bool:
if not sourcePath:
return False
diagnosticsState = getDiagnosticsState(context)
unremediedIssues = diagnosticsState[UNREMEDIED_ISSUES_KEY]
issueList = unremediedIssues.setdefault(sourcePath, [])
strippedIdentifier = str(identifier).strip()
if not strippedIdentifier or strippedIdentifier in issueList:
return False
issueList.append(strippedIdentifier)
return True
def getUnremediedIssues(context: dict | None) -> dict[str, list[str]]:
diagnosticsState = getDiagnosticsState(context)
return diagnosticsState.get(UNREMEDIED_ISSUES_KEY, {})
def iterUnremediedIssueSummaryLines(context: dict | None) -> list[str]:
summaryLines = []
unremediedIssues = getUnremediedIssues(context)
for sourcePath in sorted(unremediedIssues.keys()):
identifiers = unremediedIssues[sourcePath]
summaryLines.append(f"{os.path.basename(sourcePath)}: {', '.join(identifiers)}")
return summaryLines

View File

@@ -0,0 +1,35 @@
from __future__ import annotations
import re
from .base import FfmpegRemedy, FfmpegRemedyDecision
class WarnCorruptMpegAudioRemedy(FfmpegRemedy):
identifier = "warn-corrupt-mpeg-audio"
PATTERNS = (
re.compile(r"\[mp3float @ .*\] invalid block type", re.IGNORECASE),
re.compile(r"\[mp3float @ .*\] invalid new backstep -?\d+", re.IGNORECASE),
re.compile(r"\[mp3float @ .*\] Header missing"),
re.compile(r"\[mp3float @ .*\] overread, skip ", re.IGNORECASE),
re.compile(r"Error while decoding MPEG audio frame\."),
re.compile(
r"Error submitting packet to decoder: Invalid data found when processing input"
),
)
def inspect_line(
self,
line: str,
session: "FfmpegDiagnosticMonitor",
) -> FfmpegRemedyDecision | None:
if not any(pattern.search(line) for pattern in self.PATTERNS):
return None
return FfmpegRemedyDecision(
console_warning=(
f"ffmpeg reported damaged MPEG audio frames while converting "
+ f"{session.describe_source()}. FFX will continue, but the output "
+ "audio may contain gaps or glitches."
),
)

View File

@@ -0,0 +1,27 @@
from .diagnostics import (
FfmpegCommandRunner,
FfmpegDiagnosticMonitor,
FfmpegRemedy,
FfmpegRemedyDecision,
FfmpegSkipFileWarning,
RetryWithGeneratedPtsRemedy,
WarnCorruptMpegAudioRemedy,
getDiagnosticsState,
getUnremediedIssues,
iterUnremediedIssueSummaryLines,
recordUnremediedIssue,
)
__all__ = [
"FfmpegCommandRunner",
"FfmpegDiagnosticMonitor",
"FfmpegRemedy",
"FfmpegRemedyDecision",
"FfmpegSkipFileWarning",
"RetryWithGeneratedPtsRemedy",
"WarnCorruptMpegAudioRemedy",
"getDiagnosticsState",
"getUnremediedIssues",
"iterUnremediedIssueSummaryLines",
"recordUnremediedIssue",
]

View File

@@ -3,6 +3,7 @@ from functools import lru_cache
from logging import Logger from logging import Logger
from ffx.media_descriptor_change_set import MediaDescriptorChangeSet from ffx.media_descriptor_change_set import MediaDescriptorChangeSet
from ffx.diagnostics import FfmpegCommandRunner
from ffx.media_descriptor import MediaDescriptor from ffx.media_descriptor import MediaDescriptor
from ffx.audio_layout import AudioLayout from ffx.audio_layout import AudioLayout
@@ -63,6 +64,7 @@ class FfxController():
self.__logger: Logger = context['logger'] self.__logger: Logger = context['logger']
self.__warnedH264Fallback = False self.__warnedH264Fallback = False
self.__ffmpegCommandRunner = FfmpegCommandRunner(context)
@staticmethod @staticmethod
@@ -100,6 +102,12 @@ class FfxController():
def executeCommandSequence(self, commandSequence): def executeCommandSequence(self, commandSequence):
if commandSequence and str(commandSequence[0]).strip() == "ffmpeg":
out, err, rc = self.__ffmpegCommandRunner.execute(
commandSequence,
timeoutSeconds=None,
)
else:
out, err, rc = executeProcess(commandSequence, context=self.__context) out, err, rc = executeProcess(commandSequence, context=self.__context)
if rc: if rc:
raise click.ClickException(f"Command resulted in error: rc={rc} error={err}") raise click.ClickException(f"Command resulted in error: rc={rc} error={err}")
@@ -321,6 +329,7 @@ class FfxController():
videoEncoder: VideoEncoder = self.__context.get('video_encoder', VideoEncoder.VP9) videoEncoder: VideoEncoder = self.__context.get('video_encoder', VideoEncoder.VP9)
self.__context['current_source_path'] = sourcePath
copyVideo = self.__context.get('copy_video', False) or videoEncoder == VideoEncoder.COPY copyVideo = self.__context.get('copy_video', False) or videoEncoder == VideoEncoder.COPY

View File

@@ -6,6 +6,7 @@ from textual.screen import Screen
from textual.widgets import DataTable from textual.widgets import DataTable
from textual.widgets._data_table import CellDoesNotExist from textual.widgets._data_table import CellDoesNotExist
from ffx.attachment_format import AttachmentFormat
from ffx.audio_layout import AudioLayout from ffx.audio_layout import AudioLayout
from ffx.file_properties import FileProperties from ffx.file_properties import FileProperties
from ffx.helper import DIFF_ADDED_KEY, DIFF_CHANGED_KEY, DIFF_REMOVED_KEY from ffx.helper import DIFF_ADDED_KEY, DIFF_CHANGED_KEY, DIFF_REMOVED_KEY
@@ -127,9 +128,17 @@ class MediaWorkflowScreenBase(Screen):
def _track_codec_cell_value(self, trackDescriptor: TrackDescriptor) -> str: def _track_codec_cell_value(self, trackDescriptor: TrackDescriptor) -> str:
if trackDescriptor.getType() == TrackType.ATTACHMENT: if trackDescriptor.getType() == TrackType.ATTACHMENT:
return " " attachmentFormat = trackDescriptor.getAttachmentFormat()
if attachmentFormat == AttachmentFormat.UNKNOWN:
return attachmentFormat.identifier()
return attachmentFormat.label()
return trackDescriptor.getFormatDescriptor().label() return trackDescriptor.getFormatDescriptor().label()
def _track_language_cell_value(self, trackDescriptor: TrackDescriptor) -> str:
if trackDescriptor.getType() == TrackType.ATTACHMENT:
return " "
return trackDescriptor.getLanguage().label()
def _track_disposition_cell_value( def _track_disposition_cell_value(
self, self,
trackDescriptor: TrackDescriptor, trackDescriptor: TrackDescriptor,
@@ -244,7 +253,7 @@ class MediaWorkflowScreenBase(Screen):
if trackType == TrackType.AUDIO if trackType == TrackType.AUDIO
and audioLayout != AudioLayout.LAYOUT_UNDEFINED and audioLayout != AudioLayout.LAYOUT_UNDEFINED
else " ", else " ",
trackDescriptor.getLanguage().label(), self._track_language_cell_value(trackDescriptor),
trackTitle, trackTitle,
self._track_disposition_cell_value( self._track_disposition_cell_value(
trackDescriptor, trackDescriptor,

View File

@@ -88,6 +88,9 @@ class PatternDetailsScreen(Screen):
.three { .three {
column-span: 3; column-span: 3;
} }
.two {
column-span: 2;
}
.four { .four {
column-span: 4; column-span: 4;
@@ -114,7 +117,7 @@ class PatternDetailsScreen(Screen):
} }
.yellow { .yellow {
tint: yellow 40%; color: yellow;
} }
""" """
@@ -331,6 +334,7 @@ class PatternDetailsScreen(Screen):
if not self.__showDescriptor is None: if not self.__showDescriptor is None:
self.query_one("#showlabel", Static).update(f"{self.__showDescriptor.getId()} - {self.__showDescriptor.getName()} ({self.__showDescriptor.getYear()})") self.query_one("#showlabel", Static).update(f"{self.__showDescriptor.getId()} - {self.__showDescriptor.getName()} ({self.__showDescriptor.getYear()})")
self.updateShowQualityHint()
if self.__pattern is not None: if self.__pattern is not None:
@@ -350,6 +354,7 @@ class PatternDetailsScreen(Screen):
if not hasattr(self, "tracksTable") or not hasattr(self, "tagsTable"): if not hasattr(self, "tracksTable") or not hasattr(self, "tagsTable"):
return return
self.updateShowQualityHint()
self.updateTags() self.updateTags()
self.updateTracks() self.updateTracks()
@@ -415,7 +420,9 @@ class PatternDetailsScreen(Screen):
# Row 4 # Row 4
yield Static(t("Quality")) yield Static(t("Quality"))
yield Input(type="integer", id="quality_input") yield Input(type="integer", id="quality_input")
yield Static(' ', classes="five") yield Static(" ")
yield Static("", id="show_quality_hint", classes="two yellow")
yield Static(' ', classes="two")
# Row 5 # Row 5
@@ -504,6 +511,23 @@ class PatternDetailsScreen(Screen):
def getPatternFromInput(self): def getPatternFromInput(self):
return str(self.query_one("#pattern_input", Input).value) return str(self.query_one("#pattern_input", Input).value)
def getShowQualityHintText(self):
if self.__showDescriptor is None:
return ""
showQuality = int(self.__showDescriptor.getQuality() or 0)
if showQuality <= 0:
return ""
patternQuality = int(getattr(self.__pattern, "quality", 0) or 0)
if patternQuality > 0:
return ""
return f"{t('Show')}: {showQuality}"
def updateShowQualityHint(self):
self.query_one("#show_quality_hint", Static).update(self.getShowQualityHintText())
def getQualityFromInput(self): def getQualityFromInput(self):
try: try:
return int(self.query_one("#quality_input", Input).value) return int(self.query_one("#quality_input", Input).value)

View File

@@ -1,7 +1,10 @@
import os import os
import shlex import shlex
import signal
import subprocess import subprocess
from typing import Iterable, List import threading
import time
from typing import Callable, Iterable, List
from .logging_utils import get_ffx_logger from .logging_utils import get_ffx_logger
@@ -118,6 +121,8 @@ def executeProcess(
directory: str = None, directory: str = None,
context: dict = None, context: dict = None,
timeoutSeconds: float = None, timeoutSeconds: float = None,
stdoutLineHandler: Callable[[str], bool] | None = None,
stderrLineHandler: Callable[[str], bool] | None = None,
): ):
logger = context['logger'] if context is not None and 'logger' in context else get_ffx_logger() logger = context['logger'] if context is not None and 'logger' in context else get_ffx_logger()
@@ -131,6 +136,16 @@ def executeProcess(
formatCommandSequence(wrappedCommandSequence), formatCommandSequence(wrappedCommandSequence),
) )
if stdoutLineHandler is not None or stderrLineHandler is not None:
return executeStreamingProcess(
wrappedCommandSequence,
directory=directory,
logger=logger,
timeoutSeconds=timeoutSeconds,
stdoutLineHandler=stdoutLineHandler,
stderrLineHandler=stderrLineHandler,
)
try: try:
completed = subprocess.run( completed = subprocess.run(
wrappedCommandSequence, wrappedCommandSequence,
@@ -167,3 +182,162 @@ def executeProcess(
) )
return completed.stdout, completed.stderr, completed.returncode return completed.stdout, completed.stderr, completed.returncode
def terminateProcess(process: subprocess.Popen, *, killAfterSeconds: float = 1.0) -> None:
if process.poll() is not None:
return
try:
if hasattr(os, "killpg"):
os.killpg(process.pid, signal.SIGTERM)
else:
process.terminate()
except ProcessLookupError:
return
deadline = time.monotonic() + killAfterSeconds
while process.poll() is None and time.monotonic() < deadline:
time.sleep(0.05)
if process.poll() is not None:
return
try:
if hasattr(os, "killpg"):
os.killpg(process.pid, signal.SIGKILL)
else:
process.kill()
except ProcessLookupError:
return
def readProcessStream(
stream,
outputParts: list[str],
lineHandler: Callable[[str], bool] | None,
stopRequested: threading.Event,
logger,
) -> None:
try:
for line in iter(stream.readline, ''):
outputParts.append(line)
if lineHandler is None:
continue
try:
if lineHandler(line):
stopRequested.set()
except Exception:
logger.exception("Process line handler raised an exception")
finally:
stream.close()
def executeStreamingProcess(
commandSequence: List[str],
*,
directory: str = None,
logger = None,
timeoutSeconds: float = None,
stdoutLineHandler: Callable[[str], bool] | None = None,
stderrLineHandler: Callable[[str], bool] | None = None,
):
logger = logger or get_ffx_logger()
try:
process = subprocess.Popen(
commandSequence,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
cwd=directory,
bufsize=1,
start_new_session=True,
)
except FileNotFoundError as ex:
error = (
"Command not found while running "
+ f"{formatCommandSequence(commandSequence)}: {ex.filename or ex}"
)
logger.error(error)
return '', error, COMMAND_NOT_FOUND_RETURN_CODE
stdoutParts: list[str] = []
stderrParts: list[str] = []
stopRequested = threading.Event()
timedOut = False
stdoutThread = threading.Thread(
target=readProcessStream,
args=(
process.stdout,
stdoutParts,
stdoutLineHandler,
stopRequested,
logger,
),
daemon=True,
)
stderrThread = threading.Thread(
target=readProcessStream,
args=(
process.stderr,
stderrParts,
stderrLineHandler,
stopRequested,
logger,
),
daemon=True,
)
stdoutThread.start()
stderrThread.start()
deadline = (
time.monotonic() + float(timeoutSeconds)
if timeoutSeconds is not None
else None
)
terminationRequested = False
while process.poll() is None:
if stopRequested.is_set():
terminationRequested = True
terminateProcess(process)
break
if deadline is not None and time.monotonic() >= deadline:
timedOut = True
terminationRequested = True
terminateProcess(process)
break
time.sleep(0.05)
returnCode = process.wait()
stdoutThread.join()
stderrThread.join()
stdout = ''.join(stdoutParts)
stderr = ''.join(stderrParts)
if timedOut:
error = (
f"Command timed out after {timeoutSeconds} seconds while running "
+ formatCommandSequence(commandSequence)
)
if stderr:
error = f"{error}\n{stderr}"
logger.error(error)
return stdout, error, COMMAND_TIMED_OUT_RETURN_CODE
if returnCode != 0 and not terminationRequested:
logger.warning(
"executeProcess() rc=%s command=%s",
returnCode,
formatCommandSequence(commandSequence),
)
return stdout, stderr, returnCode

View File

@@ -0,0 +1,211 @@
from __future__ import annotations
import os
from pathlib import Path
import sys
import tempfile
import unittest
from unittest.mock import patch
from click.testing import CliRunner
SRC_ROOT = Path(__file__).resolve().parents[2] / "src"
if str(SRC_ROOT) not in sys.path:
sys.path.insert(0, str(SRC_ROOT))
from ffx import cli # noqa: E402
from ffx.diagnostics import FfmpegSkipFileWarning, recordUnremediedIssue # noqa: E402
from ffx.logging_utils import get_ffx_logger # noqa: E402
class _FakeMediaDescriptor:
def getVideoTracks(self):
return []
def getAudioTracks(self):
return []
def getSubtitleTracks(self):
return []
def getAttachmentTracks(self):
return []
def applyOverrides(self, overrides):
return None
class _FakeFileProperties:
def __init__(self, context, source_path):
self.source_path = source_path
def getShowId(self):
return -1
def getSeason(self):
return -1
def getEpisode(self):
return -1
def getMediaDescriptor(self):
return _FakeMediaDescriptor()
def getPattern(self):
return None
class _FakeShiftedSeasonController:
def __init__(self, context):
self.context = context
def shiftSeason(self, show_id, season, episode, patternId=None):
return season, episode
class _FakeShowController:
def __init__(self, context):
self.context = context
def getShowDescriptor(self, show_id):
return None
class _FakeFfxController:
calls: list[str] = []
mode = "skip_first"
def __init__(self, context, *args, **kwargs):
self.context = context
def runJob(self, sourcePath, *args, **kwargs):
self.calls.append(sourcePath)
if self.mode == "clean":
return
if self.mode == "warn_unhandled" and sourcePath.endswith("episode1.avi"):
recordUnremediedIssue(
self.context,
sourcePath,
"unhandled-warning",
)
return
if self.mode == "skip_first" and sourcePath.endswith("episode1.avi"):
message = (
f"Skipping file {sourcePath}: ffmpeg still reported unset packet "
+ "timestamps after retry with -fflags +genpts."
)
recordUnremediedIssue(
self.context,
sourcePath,
"retry-with-generated-pts",
)
self.context["logger"].warning(message)
raise FfmpegSkipFileWarning(message)
class ConvertDiagnosticCliTests(unittest.TestCase):
def setUp(self):
logger = get_ffx_logger()
for handler in list(logger.handlers):
logger.removeHandler(handler)
try:
handler.close()
except Exception:
pass
self.tempdir = tempfile.TemporaryDirectory()
self.home_dir = Path(self.tempdir.name) / "home"
self.home_dir.mkdir()
self.database_path = Path(self.tempdir.name) / "test.db"
self.source_dir = Path(self.tempdir.name) / "source"
self.source_dir.mkdir()
self.source_one = self.source_dir / "episode1.avi"
self.source_two = self.source_dir / "episode2.avi"
self.source_one.write_bytes(b"one")
self.source_two.write_bytes(b"two")
_FakeFfxController.calls = []
_FakeFfxController.mode = "skip_first"
def tearDown(self):
self.tempdir.cleanup()
def test_convert_continues_after_skipping_one_file_due_to_ffmpeg_diagnostic(self):
runner = CliRunner()
with (
patch("ffx.file_properties.FileProperties", _FakeFileProperties),
patch("ffx.ffx_controller.FfxController", _FakeFfxController),
patch(
"ffx.shifted_season_controller.ShiftedSeasonController",
_FakeShiftedSeasonController,
),
patch("ffx.show_controller.ShowController", _FakeShowController),
):
result = runner.invoke(
cli.ffx,
[
"--database-file",
str(self.database_path),
"convert",
"--no-tmdb",
"--no-pattern",
str(self.source_one),
str(self.source_two),
],
env={**os.environ, "HOME": str(self.home_dir)},
)
self.assertEqual(0, result.exit_code, result.output)
self.assertEqual(
[str(self.source_one), str(self.source_two)],
_FakeFfxController.calls,
)
self.assertIn("Skipping file", result.output)
self.assertIn("-fflags +genpts", result.output)
self.assertIn("Files with ffmpeg findings that require review:", result.output)
self.assertIn(
"episode1.avi: retry-with-generated-pts",
result.output,
)
def test_convert_prints_clean_summary_when_no_unremedied_issues_were_seen(self):
runner = CliRunner()
_FakeFfxController.mode = "clean"
with (
patch("ffx.file_properties.FileProperties", _FakeFileProperties),
patch("ffx.ffx_controller.FfxController", _FakeFfxController),
patch(
"ffx.shifted_season_controller.ShiftedSeasonController",
_FakeShiftedSeasonController,
),
patch("ffx.show_controller.ShowController", _FakeShowController),
):
result = runner.invoke(
cli.ffx,
[
"--database-file",
str(self.database_path),
"convert",
"--no-tmdb",
"--no-pattern",
str(self.source_one),
str(self.source_two),
],
env={**os.environ, "HOME": str(self.home_dir)},
)
self.assertEqual(0, result.exit_code, result.output)
self.assertIn(
"All files converted with no issues.",
result.output,
)
if __name__ == "__main__":
unittest.main()

View File

@@ -68,11 +68,14 @@ class UpgradeCommandTests(unittest.TestCase):
subprocess_calls.append((args, kwargs)) subprocess_calls.append((args, kwargs))
if args == ['git', 'status', '--porcelain', '--untracked-files=no']: if args == ['git', 'status', '--porcelain', '--untracked-files=no']:
return self.make_completed(args, stdout="M src/ffx/constants.py\n") return self.make_completed(args, stdout="M src/ffx/constants.py\n")
if args == ['git', 'rev-parse', '--abbrev-ref', 'HEAD']:
return self.make_completed(args, stdout="main\n")
return self.make_completed(args) return self.make_completed(args)
with ( with (
patch.object(cli, "getBundleRepoPath", return_value=repo_path), patch.object(cli, "getBundleRepoPath", return_value=repo_path),
patch.object(cli, "getBundlePipPath", return_value=pip_path), patch.object(cli, "getBundlePipPath", return_value=pip_path),
patch.object(cli, "getBundleVersion", return_value="0.3.2"),
patch.object(cli.os.path, "isdir", return_value=True), patch.object(cli.os.path, "isdir", return_value=True),
patch.object(cli.os.path, "isfile", return_value=True), patch.object(cli.os.path, "isfile", return_value=True),
patch.object(cli.subprocess, "run", side_effect=fake_run), patch.object(cli.subprocess, "run", side_effect=fake_run),
@@ -81,6 +84,7 @@ class UpgradeCommandTests(unittest.TestCase):
self.assertEqual(0, result.exit_code, result.output) self.assertEqual(0, result.exit_code, result.output)
self.assertIn("Tracked local changes detected in the bundle repository:", result.output) self.assertIn("Tracked local changes detected in the bundle repository:", result.output)
self.assertIn("Updated FFX to version 0.3.2 from branch main.", result.output)
self.assertEqual( self.assertEqual(
[ [
['git', 'status', '--porcelain', '--untracked-files=no'], ['git', 'status', '--porcelain', '--untracked-files=no'],
@@ -89,6 +93,7 @@ class UpgradeCommandTests(unittest.TestCase):
['git', 'checkout', '-B', 'main', 'FETCH_HEAD'], ['git', 'checkout', '-B', 'main', 'FETCH_HEAD'],
[pip_path, 'install', '--upgrade', 'pip', 'setuptools', 'wheel'], [pip_path, 'install', '--upgrade', 'pip', 'setuptools', 'wheel'],
[pip_path, 'install', '--editable', '.'], [pip_path, 'install', '--editable', '.'],
['git', 'rev-parse', '--abbrev-ref', 'HEAD'],
], ],
[call[0] for call in subprocess_calls], [call[0] for call in subprocess_calls],
) )
@@ -106,11 +111,14 @@ class UpgradeCommandTests(unittest.TestCase):
subprocess_calls.append((args, kwargs)) subprocess_calls.append((args, kwargs))
if args == ['git', 'status', '--porcelain', '--untracked-files=no']: if args == ['git', 'status', '--porcelain', '--untracked-files=no']:
return self.make_completed(args, stdout="") return self.make_completed(args, stdout="")
if args == ['git', 'rev-parse', '--abbrev-ref', 'HEAD']:
return self.make_completed(args, stdout="develop\n")
return self.make_completed(args) return self.make_completed(args)
with ( with (
patch.object(cli, "getBundleRepoPath", return_value=repo_path), patch.object(cli, "getBundleRepoPath", return_value=repo_path),
patch.object(cli, "getBundlePipPath", return_value=pip_path), patch.object(cli, "getBundlePipPath", return_value=pip_path),
patch.object(cli, "getBundleVersion", return_value="0.3.3"),
patch.object(cli.os.path, "isdir", return_value=True), patch.object(cli.os.path, "isdir", return_value=True),
patch.object(cli.os.path, "isfile", return_value=True), patch.object(cli.os.path, "isfile", return_value=True),
patch.object(cli.subprocess, "run", side_effect=fake_run), patch.object(cli.subprocess, "run", side_effect=fake_run),
@@ -118,12 +126,14 @@ class UpgradeCommandTests(unittest.TestCase):
result = runner.invoke(cli.ffx, ["upgrade"]) result = runner.invoke(cli.ffx, ["upgrade"])
self.assertEqual(0, result.exit_code, result.output) self.assertEqual(0, result.exit_code, result.output)
self.assertIn("Updated FFX to version 0.3.3 from branch develop.", result.output)
self.assertEqual( self.assertEqual(
[ [
['git', 'status', '--porcelain', '--untracked-files=no'], ['git', 'status', '--porcelain', '--untracked-files=no'],
['git', 'pull'], ['git', 'pull'],
[pip_path, 'install', '--upgrade', 'pip', 'setuptools', 'wheel'], [pip_path, 'install', '--upgrade', 'pip', 'setuptools', 'wheel'],
[pip_path, 'install', '--editable', '.'], [pip_path, 'install', '--editable', '.'],
['git', 'rev-parse', '--abbrev-ref', 'HEAD'],
], ],
[call[0] for call in subprocess_calls], [call[0] for call in subprocess_calls],
) )

View File

@@ -0,0 +1,196 @@
from __future__ import annotations
from pathlib import Path
import sys
import unittest
from unittest.mock import patch
SRC_ROOT = Path(__file__).resolve().parents[2] / "src"
if str(SRC_ROOT) not in sys.path:
sys.path.insert(0, str(SRC_ROOT))
from ffx.diagnostics import ( # noqa: E402
FfmpegCommandRunner,
FfmpegDiagnosticMonitor,
FfmpegSkipFileWarning,
getUnremediedIssues,
iterUnremediedIssueSummaryLines,
)
class RecordingLogger:
def __init__(self):
self.messages: list[str] = []
def warning(self, message, *args, **kwargs):
if args:
message = message % args
self.messages.append(str(message))
class FfmpegDiagnosticsTests(unittest.TestCase):
def test_command_runner_retries_with_genpts_after_timestamp_warning(self):
logger = RecordingLogger()
context = {
"logger": logger,
"current_source_path": "tests/assets/avi/conan_S01E754_amalgam.avi",
}
runner = FfmpegCommandRunner(context)
commands = []
def fake_execute(commandSequence, **kwargs):
commands.append(list(commandSequence))
stderrLineHandler = kwargs["stderrLineHandler"]
if len(commands) == 1:
self.assertTrue(
stderrLineHandler(
"[matroska @ 0x1] Timestamps are unset in a packet for stream 0. "
+ "This is deprecated and will stop working in the future."
)
)
return "", "timestamp warning\n", -15
return "done", "", 0
with patch("ffx.diagnostics.monitor.executeProcess", side_effect=fake_execute):
out, err, rc = runner.execute(["ffmpeg", "-y", "-i", "input.avi", "output.mkv"])
self.assertEqual("done", out)
self.assertEqual("", err)
self.assertEqual(0, rc)
self.assertEqual(
[
["ffmpeg", "-y", "-i", "input.avi", "output.mkv"],
["ffmpeg", "-fflags", "+genpts", "-y", "-i", "input.avi", "output.mkv"],
],
commands,
)
self.assertEqual(
[
"ffmpeg reported unset packet timestamps for tests/assets/avi/conan_S01E754_amalgam.avi. "
+ "Stopping early and retrying with -fflags +genpts."
],
logger.messages,
)
self.assertEqual({}, getUnremediedIssues(context))
def test_command_runner_skips_file_when_timestamp_warning_persists_after_genpts(self):
logger = RecordingLogger()
context = {
"logger": logger,
"current_source_path": "tests/assets/avi/conan_S01E754_amalgam.avi",
}
runner = FfmpegCommandRunner(context)
def fake_execute(commandSequence, **kwargs):
stderrLineHandler = kwargs["stderrLineHandler"]
self.assertTrue(
stderrLineHandler(
"[matroska @ 0x1] Timestamps are unset in a packet for stream 0. "
+ "This is deprecated and will stop working in the future."
)
)
return "", "timestamp warning\n", -15
with patch("ffx.diagnostics.monitor.executeProcess", side_effect=fake_execute):
with self.assertRaises(FfmpegSkipFileWarning):
runner.execute(
["ffmpeg", "-fflags", "+genpts", "-y", "-i", "input.avi", "output.mkv"]
)
self.assertEqual(
[
"Skipping file tests/assets/avi/conan_S01E754_amalgam.avi: ffmpeg still reported "
+ "unset packet timestamps after retry with -fflags +genpts."
],
logger.messages,
)
self.assertEqual(
{
"tests/assets/avi/conan_S01E754_amalgam.avi": ["retry-with-generated-pts"]
},
getUnremediedIssues(context),
)
def test_monitor_tracks_non_harmless_corrupt_mpeg_audio_remedy_in_summary(self):
logger = RecordingLogger()
context = {
"logger": logger,
"current_source_path": "tests/assets/avi/conan_S01E763_amalgam.avi",
}
monitor = FfmpegDiagnosticMonitor(
context,
["ffmpeg", "-y", "-i", "input.avi", "output.mkv"],
)
self.assertFalse(
monitor.handle_stderr_line("[mp3float @ 0x1] invalid new backstep -1")
)
self.assertFalse(monitor.handle_stderr_line("[mp3float @ 0x1] invalid block type"))
self.assertFalse(
monitor.handle_stderr_line(
"[aist#0:1/mp3 @ 0x2] [dec:mp3float @ 0x3] Error submitting packet to decoder: "
+ "Invalid data found when processing input"
)
)
self.assertEqual(
[
"ffmpeg reported damaged MPEG audio frames while converting "
+ "tests/assets/avi/conan_S01E763_amalgam.avi. FFX will continue, but the "
+ "output audio may contain gaps or glitches."
],
logger.messages,
)
self.assertEqual(
{
"tests/assets/avi/conan_S01E763_amalgam.avi": ["warn-corrupt-mpeg-audio"]
},
getUnremediedIssues(context),
)
self.assertEqual(
["conan_S01E763_amalgam.avi: warn-corrupt-mpeg-audio"],
iterUnremediedIssueSummaryLines(context),
)
def test_monitor_tracks_unhandled_diagnostic_for_summary(self):
context = {
"logger": RecordingLogger(),
"current_source_path": "tests/assets/avi/example.avi",
}
monitor = FfmpegDiagnosticMonitor(
context,
["ffmpeg", "-y", "-i", "input.avi", "output.mkv"],
)
self.assertFalse(
monitor.handle_stderr_line(
"[avi @ 0x1] Strange warning with no automatic remedy is present"
)
)
self.assertEqual(
{
"tests/assets/avi/example.avi": ["unhandled-warning"]
},
getUnremediedIssues(context),
)
self.assertEqual(
["example.avi: unhandled-warning"],
iterUnremediedIssueSummaryLines(context),
)
self.assertEqual(
[
"ffmpeg reported a diagnostic with no automatic remedy while converting "
+ "tests/assets/avi/example.avi. FFX will continue, but review the output "
+ "file. First unhandled line: [avi @ 0x1] Strange warning with no automatic remedy is present"
],
context["logger"].messages,
)
if __name__ == "__main__":
unittest.main()

View File

@@ -2,6 +2,7 @@ from __future__ import annotations
from pathlib import Path from pathlib import Path
import sys import sys
import time
import unittest import unittest
from unittest.mock import patch from unittest.mock import patch
@@ -51,6 +52,33 @@ class ProcessTests(unittest.TestCase):
self.assertIn("Command timed out", err) self.assertIn("Command timed out", err)
self.assertIn(sys.executable, err) self.assertIn(sys.executable, err)
def test_execute_process_can_stop_early_while_streaming_stderr(self):
start = time.monotonic()
observed_lines = []
out, err, rc = executeProcess(
[
sys.executable,
"-c",
(
"import sys, time; "
"sys.stderr.write('fatal warning\\n'); sys.stderr.flush(); "
"time.sleep(2); "
"sys.stderr.write('late line\\n'); sys.stderr.flush()"
),
],
stderrLineHandler=lambda line: observed_lines.append(line) or ("fatal warning" in line),
)
elapsed = time.monotonic() - start
self.assertLess(elapsed, 1.5)
self.assertNotEqual(0, rc)
self.assertEqual("", out)
self.assertIn("fatal warning", err)
self.assertNotIn("late line", err)
self.assertEqual(["fatal warning\n"], observed_lines)
def test_get_wrapped_command_sequence_leaves_command_unwrapped_when_limits_disabled(self): def test_get_wrapped_command_sequence_leaves_command_unwrapped_when_limits_disabled(self):
wrapped = getWrappedCommandSequence( wrapped = getWrappedCommandSequence(
["ffmpeg", "-i", "input.mkv"], ["ffmpeg", "-i", "input.mkv"],

View File

@@ -548,6 +548,11 @@ class TagTableScreenStateTests(unittest.TestCase):
screen.tagsTable = FakeTagTable() screen.tagsTable = FakeTagTable()
screen.shiftedSeasonsTable = FakeTagTable() screen.shiftedSeasonsTable = FakeTagTable()
screen._PatternDetailsScreen__pattern = object() screen._PatternDetailsScreen__pattern = object()
screen._PatternDetailsScreen__showDescriptor = None
widgets = {
"#show_quality_hint": FakeStaticWidget(),
}
screen.query_one = lambda selector, _type=None: widgets[selector]
calls = [] calls = []
screen.updateTags = lambda: calls.append("updateTags") screen.updateTags = lambda: calls.append("updateTags")
@@ -561,6 +566,48 @@ class TagTableScreenStateTests(unittest.TestCase):
calls, calls,
) )
def test_pattern_details_screen_on_mount_shows_show_quality_hint_for_new_pattern(self):
set_current_language("en")
screen = object.__new__(PatternDetailsScreen)
screen.context = {}
screen._PatternDetailsScreen__showDescriptor = ShowDescriptor(
id=7,
name="Demo",
year=1999,
quality=23,
)
screen._PatternDetailsScreen__pattern = None
widgets = {
"#showlabel": FakeStaticWidget(),
"#show_quality_hint": FakeStaticWidget(),
}
screen.query_one = lambda selector, _type=None: widgets[selector]
screen.on_mount()
self.assertEqual("7 - Demo (1999)", widgets["#showlabel"].value)
self.assertEqual("Show: 23", widgets["#show_quality_hint"].value)
def test_pattern_details_screen_show_quality_hint_is_hidden_when_pattern_quality_exists(self):
set_current_language("en")
screen = object.__new__(PatternDetailsScreen)
screen._PatternDetailsScreen__showDescriptor = ShowDescriptor(
id=7,
name="Demo",
year=1999,
quality=23,
)
screen._PatternDetailsScreen__pattern = type(
"_Pattern",
(),
{"quality": 19},
)()
self.assertEqual("", screen.getShowQualityHintText())
def test_inspect_details_screen_handle_edit_pattern_refreshes_even_without_result(self): def test_inspect_details_screen_handle_edit_pattern_refreshes_even_without_result(self):
screen = object.__new__(InspectDetailsScreen) screen = object.__new__(InspectDetailsScreen)
@@ -722,7 +769,7 @@ class TagTableScreenStateTests(unittest.TestCase):
self.assertIn("English Full", screen.tracksTable.rows["row-0"]) self.assertIn("English Full", screen.tracksTable.rows["row-0"])
self.assertIs(target_track, screen.getSelectedTrackDescriptor()) self.assertIs(target_track, screen.getSelectedTrackDescriptor())
def test_inspect_details_screen_update_tracks_blanks_irrelevant_attachment_fields(self): def test_inspect_details_screen_update_tracks_shows_attachment_format_and_blanks_language(self):
attachment_track = TrackDescriptor( attachment_track = TrackDescriptor(
index=4, index=4,
source_index=4, source_index=4,
@@ -745,10 +792,36 @@ class TagTableScreenStateTests(unittest.TestCase):
row = screen.tracksTable.rows["row-0"] row = screen.tracksTable.rows["row-0"]
self.assertEqual("4", row[0]) self.assertEqual("4", row[0])
self.assertEqual(" ", row[3]) self.assertEqual("TTF", row[3])
self.assertEqual(" ", row[5])
self.assertEqual(" ", row[7]) self.assertEqual(" ", row[7])
self.assertEqual(" ", row[8]) self.assertEqual(" ", row[8])
def test_inspect_details_screen_update_tracks_shows_unknown_for_unknown_attachment_format(self):
attachment_track = TrackDescriptor(
index=5,
source_index=5,
sub_index=0,
track_type=TrackType.ATTACHMENT,
attachment_format=AttachmentFormat.UNKNOWN,
tags={"filename": "blob.bin", "mimetype": "application/octet-stream"},
)
screen = object.__new__(InspectDetailsScreen)
screen.tracksTable = FakeTagTable()
screen._sourceMediaDescriptor = FakeMediaDescriptor([attachment_track])
screen._targetMediaDescriptor = None
screen._currentPattern = None
screen._trackRowData = {}
screen._applyNormalization = False
screen.updateTracks()
row = screen.tracksTable.rows["row-0"]
self.assertEqual("unknown", row[3])
self.assertEqual(" ", row[5])
def test_inspect_details_screen_maps_target_selection_back_to_source_track(self): def test_inspect_details_screen_maps_target_selection_back_to_source_track(self):
source_track = TrackDescriptor( source_track = TrackDescriptor(
index=3, index=3,

View File

@@ -1,6 +1,5 @@
from __future__ import annotations from __future__ import annotations
import json
from pathlib import Path from pathlib import Path
import sys import sys
import unittest import unittest
@@ -13,15 +12,11 @@ if str(SRC_ROOT) not in sys.path:
from ffx.attachment_format import AttachmentFormat # noqa: E402 from ffx.attachment_format import AttachmentFormat # noqa: E402
from ffx.media_descriptor import MediaDescriptor # noqa: E402
from ffx.track_codec import TrackCodec # noqa: E402 from ffx.track_codec import TrackCodec # noqa: E402
from ffx.track_descriptor import TrackDescriptor # noqa: E402 from ffx.track_descriptor import TrackDescriptor # noqa: E402
from ffx.track_type import TrackType # noqa: E402 from ffx.track_type import TrackType # noqa: E402
ASSETS_ROOT = Path(__file__).resolve().parents[1] / "assets"
class TrackDescriptorProbeTests(unittest.TestCase): class TrackDescriptorProbeTests(unittest.TestCase):
def test_attachment_without_codec_name_uses_font_metadata_to_identify_ttf(self): def test_attachment_without_codec_name_uses_font_metadata_to_identify_ttf(self):
descriptor = TrackDescriptor.fromFfprobe( descriptor = TrackDescriptor.fromFfprobe(
@@ -62,26 +57,5 @@ class TrackDescriptorProbeTests(unittest.TestCase):
self.assertEqual(AttachmentFormat.UNKNOWN, descriptor.getAttachmentFormat()) self.assertEqual(AttachmentFormat.UNKNOWN, descriptor.getAttachmentFormat())
self.assertEqual(TrackCodec.UNKNOWN, descriptor.getCodec()) self.assertEqual(TrackCodec.UNKNOWN, descriptor.getCodec())
def test_media_descriptor_from_boruto_probe_json_handles_attachment_streams_without_codec_name(self):
probe_payload = json.loads(
(ASSETS_ROOT / "ffprobe.out.json").read_text(encoding="utf-8")
)
descriptor = MediaDescriptor.fromFfprobe(
{"logger": None},
probe_payload["format"],
probe_payload["streams"],
)
track_descriptors = descriptor.getTrackDescriptors()
attachment_tracks = descriptor.getAttachmentTracks()
self.assertEqual(14, len(track_descriptors))
self.assertEqual(10, len(attachment_tracks))
self.assertTrue(
all(track.getAttachmentFormat() == AttachmentFormat.TTF for track in attachment_tracks)
)
if __name__ == "__main__": if __name__ == "__main__":
unittest.main() unittest.main()