diff --git a/SCRATCHPAD.md b/SCRATCHPAD.md index 667e3e4..89eaddd 100644 --- a/SCRATCHPAD.md +++ b/SCRATCHPAD.md @@ -69,15 +69,3 @@ ## Delete When - Delete this scratchpad once the optimization backlog is either converted into issues/work items or distilled into durable project guidance. - - -## Missing Timestamps - -Detect ffmpeg warning "Timestamps are unset in a packet for stream 0. This is deprecated and will stop working in the future. Fix your code to set the timestamps properly" and try autofix by -fflags +genpts -> Warning if fails -> Error. Check if flags collide with anything. - - - diff --git a/src/ffx/cli.py b/src/ffx/cli.py index 30a119b..f1268a3 100755 --- a/src/ffx/cli.py +++ b/src/ffx/cli.py @@ -1121,6 +1121,11 @@ def convert(ctx, Suffices will we appended to filename in case of multiple created files or if the filename has not changed.""" from ffx.ffx_controller import FfxController + from ffx.diagnostics import ( + FfmpegSkipFileWarning, + getUnremediedIssues, + iterUnremediedIssueSummaryLines, + ) from ffx.file_properties import FileProperties from ffx.filter.crop_filter import CropFilter from ffx.filter.deinterlace_filter import DeinterlaceFilter @@ -1600,18 +1605,30 @@ def convert(ctx, if rename_only: shutil.move(sourcePath, targetPath) else: - fc.runJob(sourcePath, - targetPath, - targetFormat, - chainIteration, - cropArguments, - currentPattern, - currentShowDescriptor) + try: + fc.runJob(sourcePath, + targetPath, + targetFormat, + chainIteration, + cropArguments, + currentPattern, + currentShowDescriptor) + except FfmpegSkipFileWarning: + if os.path.exists(targetPath): + os.remove(targetPath) + continue endTime = time.perf_counter() ctx.obj['logger'].info(f"\nDONE\nTime elapsed {endTime - startTime}") + unremediedIssues = getUnremediedIssues(context) + if unremediedIssues: + ctx.obj['logger'].warning("\nFiles with ffmpeg findings that require review:") + for summaryLine in iterUnremediedIssueSummaryLines(context): + ctx.obj['logger'].warning(summaryLine) + else: + ctx.obj['logger'].info("All files converted with no ffmpeg findings requiring review.") if __name__ == '__main__': diff --git a/src/ffx/diagnostics/__init__.py b/src/ffx/diagnostics/__init__.py new file mode 100644 index 0000000..c020585 --- /dev/null +++ b/src/ffx/diagnostics/__init__.py @@ -0,0 +1,24 @@ +from .base import FfmpegRemedy, FfmpegRemedyDecision, FfmpegSkipFileWarning +from .monitor import FfmpegCommandRunner, FfmpegDiagnosticMonitor +from .retry_with_generated_pts import RetryWithGeneratedPtsRemedy +from .state import ( + getDiagnosticsState, + getUnremediedIssues, + iterUnremediedIssueSummaryLines, + recordUnremediedIssue, +) +from .warn_corrupt_mpeg_audio import WarnCorruptMpegAudioRemedy + +__all__ = [ + "FfmpegCommandRunner", + "FfmpegDiagnosticMonitor", + "FfmpegRemedy", + "FfmpegRemedyDecision", + "FfmpegSkipFileWarning", + "RetryWithGeneratedPtsRemedy", + "WarnCorruptMpegAudioRemedy", + "getDiagnosticsState", + "getUnremediedIssues", + "iterUnremediedIssueSummaryLines", + "recordUnremediedIssue", +] diff --git a/src/ffx/diagnostics/base.py b/src/ffx/diagnostics/base.py new file mode 100644 index 0000000..458c306 --- /dev/null +++ b/src/ffx/diagnostics/base.py @@ -0,0 +1,33 @@ +from __future__ import annotations + +from dataclasses import dataclass + + +class FfmpegSkipFileWarning(Exception): + pass + + +@dataclass(frozen=True) +class FfmpegRemedyDecision: + stop_process: bool = False + retry_input_tokens: tuple[str, ...] = () + skip_file: bool = False + console_warning: str = "" + summary_identifier: str = "" + unremedied_issue_identifier: str = "" + + @property + def retry_requested(self) -> bool: + return bool(self.retry_input_tokens) + + +class FfmpegRemedy: + identifier = "ffmpeg-remedy" + harmless = False + + def inspect_line( + self, + line: str, + session: "FfmpegDiagnosticMonitor", + ) -> FfmpegRemedyDecision | None: + raise NotImplementedError diff --git a/src/ffx/diagnostics/monitor.py b/src/ffx/diagnostics/monitor.py new file mode 100644 index 0000000..361025d --- /dev/null +++ b/src/ffx/diagnostics/monitor.py @@ -0,0 +1,222 @@ +from __future__ import annotations + +import re + +from ffx.logging_utils import get_ffx_logger +from ffx.process import executeProcess + +from .base import FfmpegSkipFileWarning, FfmpegRemedy +from .retry_with_generated_pts import RetryWithGeneratedPtsRemedy +from .state import recordUnremediedIssue +from .warn_corrupt_mpeg_audio import WarnCorruptMpegAudioRemedy + +UNHANDLED_DIAGNOSTIC_PATTERNS = ( + re.compile(r"\bwarning\b", re.IGNORECASE), + re.compile(r"\berror\b", re.IGNORECASE), + re.compile(r"\bfailed\b", re.IGNORECASE), + re.compile(r"\binvalid\b", re.IGNORECASE), + re.compile(r"\bmissing\b", re.IGNORECASE), + re.compile(r"\bcorrupt\b", re.IGNORECASE), + re.compile(r"\boverflow\b", re.IGNORECASE), + re.compile(r"\bdeprecated\b", re.IGNORECASE), +) + + +class FfmpegDiagnosticMonitor: + def __init__( + self, + context: dict | None, + command_sequence: list[str], + *, + remedies: list[FfmpegRemedy] | None = None, + emittedWarnings: set[str] | None = None, + ): + self.context = context or {} + self.command_sequence = list(command_sequence) + self.logger = self.context.get("logger", get_ffx_logger()) + self.source_path = str(self.context.get("current_source_path", "")).strip() + self.remedies = remedies or [ + RetryWithGeneratedPtsRemedy(), + WarnCorruptMpegAudioRemedy(), + ] + self._emittedWarnings = emittedWarnings if emittedWarnings is not None else set() + self.retry_input_tokens: tuple[str, ...] = () + self.skip_file = False + self.skip_file_message = "" + + def describe_source(self) -> str: + return self.source_path if self.source_path else "current file" + + def command_contains_tokens(self, tokens: tuple[str, ...]) -> bool: + tokenCount = len(tokens) + if tokenCount == 0: + return True + + return any( + tuple(self.command_sequence[index:index + tokenCount]) == tuple(tokens) + for index in range(len(self.command_sequence) - tokenCount + 1) + ) + + def emitConsoleWarning(self, warningMessage: str) -> None: + if warningMessage and warningMessage not in self._emittedWarnings: + self.logger.warning(warningMessage) + self._emittedWarnings.add(warningMessage) + + def recordUnremediedIssue(self, issueIdentifier: str, issueLine: str) -> None: + isFirstIssueForFile = recordUnremediedIssue( + self.context, + self.describe_source(), + issueIdentifier, + ) + if not isFirstIssueForFile: + return + + self.emitConsoleWarning( + f"ffmpeg reported a diagnostic with no automatic remedy while converting " + + f"{self.describe_source()}. FFX will continue, but review the output " + + f"file. First unhandled line: {issueLine}" + ) + + def lineLooksLikeUnhandledDiagnostic(self, line: str) -> bool: + return any(pattern.search(line) for pattern in UNHANDLED_DIAGNOSTIC_PATTERNS) + + def getUnhandledDiagnosticIdentifier(self, line: str) -> str: + loweredLine = str(line).lower() + + if any(token in loweredLine for token in ("error", "failed", "invalid", "missing", "corrupt", "overflow")): + return "unhandled-error" + if any(token in loweredLine for token in ("warning", "deprecated")): + return "unhandled-warning" + return "unhandled-diagnostic" + + def getSummaryIdentifier( + self, + remedy: FfmpegRemedy, + decision, + ) -> str: + explicitIdentifier = str(decision.summary_identifier).strip() + if explicitIdentifier: + return explicitIdentifier + + remedyIdentifier = str(getattr(remedy, "identifier", "")).strip() + if remedyIdentifier and remedyIdentifier != FfmpegRemedy.identifier: + return remedyIdentifier + + return str(decision.unremedied_issue_identifier).strip() + + def shouldRecordSummary( + self, + remedy: FfmpegRemedy, + decision, + ) -> bool: + if getattr(remedy, "harmless", False): + return False + + if decision.retry_requested and not decision.skip_file: + return False + + return bool(self.getSummaryIdentifier(remedy, decision)) + + def handle_stderr_line(self, line: str) -> bool: + strippedLine = str(line).strip() + if not strippedLine: + return False + + for remedy in self.remedies: + decision = remedy.inspect_line(strippedLine, self) + if decision is None: + continue + + self.emitConsoleWarning(decision.console_warning) + + if decision.retry_requested: + self.retry_input_tokens = tuple(decision.retry_input_tokens) + + if self.shouldRecordSummary(remedy, decision): + recordUnremediedIssue( + self.context, + self.describe_source(), + self.getSummaryIdentifier(remedy, decision), + ) + + if decision.skip_file: + self.skip_file = True + self.skip_file_message = ( + decision.console_warning + or f"Skipping file {self.describe_source()} because ffmpeg reported a fatal diagnostic." + ) + + return bool(decision.stop_process) + + if self.lineLooksLikeUnhandledDiagnostic(strippedLine): + self.recordUnremediedIssue( + self.getUnhandledDiagnosticIdentifier(strippedLine), + strippedLine, + ) + + return False + + @property + def retry_requested(self) -> bool: + return bool(self.retry_input_tokens) + + +def insertFfmpegInputOptions( + commandSequence: list[str], + extraTokens: tuple[str, ...], +) -> list[str]: + if not extraTokens: + return list(commandSequence) + + if not commandSequence: + return list(extraTokens) + + return [commandSequence[0]] + list(extraTokens) + list(commandSequence[1:]) + + +class FfmpegCommandRunner: + def __init__( + self, + context: dict | None, + *, + remedies: list[FfmpegRemedy] | None = None, + ): + self.__context = context or {} + self.__remedies = remedies + + def execute( + self, + commandSequence: list[str], + *, + directory: str = None, + timeoutSeconds: float = None, + ): + emittedWarnings: set[str] = set() + attemptCommandSequence = list(commandSequence) + + while True: + monitor = FfmpegDiagnosticMonitor( + self.__context, + attemptCommandSequence, + remedies=self.__remedies, + emittedWarnings=emittedWarnings, + ) + out, err, rc = executeProcess( + attemptCommandSequence, + directory=directory, + context=self.__context, + timeoutSeconds=timeoutSeconds, + stderrLineHandler=monitor.handle_stderr_line, + ) + + if monitor.retry_requested: + attemptCommandSequence = insertFfmpegInputOptions( + attemptCommandSequence, + monitor.retry_input_tokens, + ) + continue + + if monitor.skip_file: + raise FfmpegSkipFileWarning(monitor.skip_file_message) + + return out, err, rc diff --git a/src/ffx/diagnostics/retry_with_generated_pts.py b/src/ffx/diagnostics/retry_with_generated_pts.py new file mode 100644 index 0000000..4f52080 --- /dev/null +++ b/src/ffx/diagnostics/retry_with_generated_pts.py @@ -0,0 +1,41 @@ +from __future__ import annotations + +import re + +from .base import FfmpegRemedy, FfmpegRemedyDecision + + +class RetryWithGeneratedPtsRemedy(FfmpegRemedy): + identifier = "retry-with-generated-pts" + RETRY_INPUT_TOKENS = ("-fflags", "+genpts") + TIMESTAMP_UNSET_PATTERN = re.compile( + r"Timestamps are unset in a packet for stream \d+" + ) + + def inspect_line( + self, + line: str, + session: "FfmpegDiagnosticMonitor", + ) -> FfmpegRemedyDecision | None: + if self.TIMESTAMP_UNSET_PATTERN.search(line) is None: + return None + + if session.command_contains_tokens(self.RETRY_INPUT_TOKENS): + return FfmpegRemedyDecision( + stop_process=True, + skip_file=True, + console_warning=( + f"Skipping file {session.describe_source()}: ffmpeg still reported " + + "unset packet timestamps after retry with -fflags +genpts." + ), + unremedied_issue_identifier="timestamp-unset-after-genpts", + ) + + return FfmpegRemedyDecision( + stop_process=True, + retry_input_tokens=self.RETRY_INPUT_TOKENS, + console_warning=( + f"ffmpeg reported unset packet timestamps for {session.describe_source()}. " + + "Stopping early and retrying with -fflags +genpts." + ), + ) diff --git a/src/ffx/diagnostics/state.py b/src/ffx/diagnostics/state.py new file mode 100644 index 0000000..7fcffb4 --- /dev/null +++ b/src/ffx/diagnostics/state.py @@ -0,0 +1,53 @@ +from __future__ import annotations + +import os + + +DIAGNOSTICS_STATE_KEY = "diagnostics_state" +UNREMEDIED_ISSUES_KEY = "unremedied_issues" + + +def getDiagnosticsState(context: dict | None) -> dict: + if context is None: + return {UNREMEDIED_ISSUES_KEY: {}} + + if DIAGNOSTICS_STATE_KEY not in context: + context[DIAGNOSTICS_STATE_KEY] = { + UNREMEDIED_ISSUES_KEY: {}, + } + + return context[DIAGNOSTICS_STATE_KEY] + + +def recordUnremediedIssue( + context: dict | None, + sourcePath: str, + identifier: str, +) -> bool: + if not sourcePath: + return False + + diagnosticsState = getDiagnosticsState(context) + unremediedIssues = diagnosticsState[UNREMEDIED_ISSUES_KEY] + issueList = unremediedIssues.setdefault(sourcePath, []) + strippedIdentifier = str(identifier).strip() + + if not strippedIdentifier or strippedIdentifier in issueList: + return False + + issueList.append(strippedIdentifier) + return True + + +def getUnremediedIssues(context: dict | None) -> dict[str, list[str]]: + diagnosticsState = getDiagnosticsState(context) + return diagnosticsState.get(UNREMEDIED_ISSUES_KEY, {}) + + +def iterUnremediedIssueSummaryLines(context: dict | None) -> list[str]: + summaryLines = [] + unremediedIssues = getUnremediedIssues(context) + for sourcePath in sorted(unremediedIssues.keys()): + identifiers = unremediedIssues[sourcePath] + summaryLines.append(f"{os.path.basename(sourcePath)}: {', '.join(identifiers)}") + return summaryLines diff --git a/src/ffx/diagnostics/warn_corrupt_mpeg_audio.py b/src/ffx/diagnostics/warn_corrupt_mpeg_audio.py new file mode 100644 index 0000000..1184f5d --- /dev/null +++ b/src/ffx/diagnostics/warn_corrupt_mpeg_audio.py @@ -0,0 +1,34 @@ +from __future__ import annotations + +import re + +from .base import FfmpegRemedy, FfmpegRemedyDecision + + +class WarnCorruptMpegAudioRemedy(FfmpegRemedy): + identifier = "warn-corrupt-mpeg-audio" + PATTERNS = ( + re.compile(r"\[mp3float @ .*\] invalid block type", re.IGNORECASE), + re.compile(r"\[mp3float @ .*\] Header missing"), + re.compile(r"\[mp3float @ .*\] overread, skip ", re.IGNORECASE), + re.compile(r"Error while decoding MPEG audio frame\."), + re.compile( + r"Error submitting packet to decoder: Invalid data found when processing input" + ), + ) + + def inspect_line( + self, + line: str, + session: "FfmpegDiagnosticMonitor", + ) -> FfmpegRemedyDecision | None: + if not any(pattern.search(line) for pattern in self.PATTERNS): + return None + + return FfmpegRemedyDecision( + console_warning=( + f"ffmpeg reported damaged MPEG audio frames while converting " + + f"{session.describe_source()}. FFX will continue, but the output " + + "audio may contain gaps or glitches." + ), + ) diff --git a/src/ffx/ffmpeg_diagnostics.py b/src/ffx/ffmpeg_diagnostics.py new file mode 100644 index 0000000..4b9298c --- /dev/null +++ b/src/ffx/ffmpeg_diagnostics.py @@ -0,0 +1,27 @@ +from .diagnostics import ( + FfmpegCommandRunner, + FfmpegDiagnosticMonitor, + FfmpegRemedy, + FfmpegRemedyDecision, + FfmpegSkipFileWarning, + RetryWithGeneratedPtsRemedy, + WarnCorruptMpegAudioRemedy, + getDiagnosticsState, + getUnremediedIssues, + iterUnremediedIssueSummaryLines, + recordUnremediedIssue, +) + +__all__ = [ + "FfmpegCommandRunner", + "FfmpegDiagnosticMonitor", + "FfmpegRemedy", + "FfmpegRemedyDecision", + "FfmpegSkipFileWarning", + "RetryWithGeneratedPtsRemedy", + "WarnCorruptMpegAudioRemedy", + "getDiagnosticsState", + "getUnremediedIssues", + "iterUnremediedIssueSummaryLines", + "recordUnremediedIssue", +] diff --git a/src/ffx/ffx_controller.py b/src/ffx/ffx_controller.py index ea5142a..ea70090 100644 --- a/src/ffx/ffx_controller.py +++ b/src/ffx/ffx_controller.py @@ -3,6 +3,7 @@ from functools import lru_cache from logging import Logger from ffx.media_descriptor_change_set import MediaDescriptorChangeSet +from ffx.diagnostics import FfmpegCommandRunner from ffx.media_descriptor import MediaDescriptor from ffx.audio_layout import AudioLayout @@ -63,6 +64,7 @@ class FfxController(): self.__logger: Logger = context['logger'] self.__warnedH264Fallback = False + self.__ffmpegCommandRunner = FfmpegCommandRunner(context) @staticmethod @@ -100,7 +102,13 @@ class FfxController(): def executeCommandSequence(self, commandSequence): - out, err, rc = executeProcess(commandSequence, context=self.__context) + if commandSequence and str(commandSequence[0]).strip() == "ffmpeg": + out, err, rc = self.__ffmpegCommandRunner.execute( + commandSequence, + timeoutSeconds=None, + ) + else: + out, err, rc = executeProcess(commandSequence, context=self.__context) if rc: raise click.ClickException(f"Command resulted in error: rc={rc} error={err}") return out, err, rc @@ -321,6 +329,7 @@ class FfxController(): videoEncoder: VideoEncoder = self.__context.get('video_encoder', VideoEncoder.VP9) + self.__context['current_source_path'] = sourcePath copyVideo = self.__context.get('copy_video', False) or videoEncoder == VideoEncoder.COPY diff --git a/src/ffx/process.py b/src/ffx/process.py index 429961c..de42561 100644 --- a/src/ffx/process.py +++ b/src/ffx/process.py @@ -1,7 +1,10 @@ import os import shlex +import signal import subprocess -from typing import Iterable, List +import threading +import time +from typing import Callable, Iterable, List from .logging_utils import get_ffx_logger @@ -118,6 +121,8 @@ def executeProcess( directory: str = None, context: dict = None, timeoutSeconds: float = None, + stdoutLineHandler: Callable[[str], bool] | None = None, + stderrLineHandler: Callable[[str], bool] | None = None, ): logger = context['logger'] if context is not None and 'logger' in context else get_ffx_logger() @@ -131,6 +136,16 @@ def executeProcess( formatCommandSequence(wrappedCommandSequence), ) + if stdoutLineHandler is not None or stderrLineHandler is not None: + return executeStreamingProcess( + wrappedCommandSequence, + directory=directory, + logger=logger, + timeoutSeconds=timeoutSeconds, + stdoutLineHandler=stdoutLineHandler, + stderrLineHandler=stderrLineHandler, + ) + try: completed = subprocess.run( wrappedCommandSequence, @@ -167,3 +182,162 @@ def executeProcess( ) return completed.stdout, completed.stderr, completed.returncode + + +def terminateProcess(process: subprocess.Popen, *, killAfterSeconds: float = 1.0) -> None: + if process.poll() is not None: + return + + try: + if hasattr(os, "killpg"): + os.killpg(process.pid, signal.SIGTERM) + else: + process.terminate() + except ProcessLookupError: + return + + deadline = time.monotonic() + killAfterSeconds + while process.poll() is None and time.monotonic() < deadline: + time.sleep(0.05) + + if process.poll() is not None: + return + + try: + if hasattr(os, "killpg"): + os.killpg(process.pid, signal.SIGKILL) + else: + process.kill() + except ProcessLookupError: + return + + +def readProcessStream( + stream, + outputParts: list[str], + lineHandler: Callable[[str], bool] | None, + stopRequested: threading.Event, + logger, +) -> None: + try: + for line in iter(stream.readline, ''): + outputParts.append(line) + + if lineHandler is None: + continue + + try: + if lineHandler(line): + stopRequested.set() + except Exception: + logger.exception("Process line handler raised an exception") + finally: + stream.close() + + +def executeStreamingProcess( + commandSequence: List[str], + *, + directory: str = None, + logger = None, + timeoutSeconds: float = None, + stdoutLineHandler: Callable[[str], bool] | None = None, + stderrLineHandler: Callable[[str], bool] | None = None, +): + logger = logger or get_ffx_logger() + + try: + process = subprocess.Popen( + commandSequence, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True, + cwd=directory, + bufsize=1, + start_new_session=True, + ) + except FileNotFoundError as ex: + error = ( + "Command not found while running " + + f"{formatCommandSequence(commandSequence)}: {ex.filename or ex}" + ) + logger.error(error) + return '', error, COMMAND_NOT_FOUND_RETURN_CODE + + stdoutParts: list[str] = [] + stderrParts: list[str] = [] + stopRequested = threading.Event() + timedOut = False + + stdoutThread = threading.Thread( + target=readProcessStream, + args=( + process.stdout, + stdoutParts, + stdoutLineHandler, + stopRequested, + logger, + ), + daemon=True, + ) + stderrThread = threading.Thread( + target=readProcessStream, + args=( + process.stderr, + stderrParts, + stderrLineHandler, + stopRequested, + logger, + ), + daemon=True, + ) + + stdoutThread.start() + stderrThread.start() + + deadline = ( + time.monotonic() + float(timeoutSeconds) + if timeoutSeconds is not None + else None + ) + terminationRequested = False + + while process.poll() is None: + if stopRequested.is_set(): + terminationRequested = True + terminateProcess(process) + break + + if deadline is not None and time.monotonic() >= deadline: + timedOut = True + terminationRequested = True + terminateProcess(process) + break + + time.sleep(0.05) + + returnCode = process.wait() + stdoutThread.join() + stderrThread.join() + + stdout = ''.join(stdoutParts) + stderr = ''.join(stderrParts) + + if timedOut: + error = ( + f"Command timed out after {timeoutSeconds} seconds while running " + + formatCommandSequence(commandSequence) + ) + if stderr: + error = f"{error}\n{stderr}" + logger.error(error) + return stdout, error, COMMAND_TIMED_OUT_RETURN_CODE + + if returnCode != 0 and not terminationRequested: + logger.warning( + "executeProcess() rc=%s command=%s", + returnCode, + formatCommandSequence(commandSequence), + ) + + return stdout, stderr, returnCode diff --git a/tests/unit/test_cli_convert_diagnostics.py b/tests/unit/test_cli_convert_diagnostics.py new file mode 100644 index 0000000..3267262 --- /dev/null +++ b/tests/unit/test_cli_convert_diagnostics.py @@ -0,0 +1,211 @@ +from __future__ import annotations + +import os +from pathlib import Path +import sys +import tempfile +import unittest +from unittest.mock import patch + +from click.testing import CliRunner + + +SRC_ROOT = Path(__file__).resolve().parents[2] / "src" + +if str(SRC_ROOT) not in sys.path: + sys.path.insert(0, str(SRC_ROOT)) + + +from ffx import cli # noqa: E402 +from ffx.diagnostics import FfmpegSkipFileWarning, recordUnremediedIssue # noqa: E402 +from ffx.logging_utils import get_ffx_logger # noqa: E402 + + +class _FakeMediaDescriptor: + def getVideoTracks(self): + return [] + + def getAudioTracks(self): + return [] + + def getSubtitleTracks(self): + return [] + + def getAttachmentTracks(self): + return [] + + def applyOverrides(self, overrides): + return None + + +class _FakeFileProperties: + def __init__(self, context, source_path): + self.source_path = source_path + + def getShowId(self): + return -1 + + def getSeason(self): + return -1 + + def getEpisode(self): + return -1 + + def getMediaDescriptor(self): + return _FakeMediaDescriptor() + + def getPattern(self): + return None + + +class _FakeShiftedSeasonController: + def __init__(self, context): + self.context = context + + def shiftSeason(self, show_id, season, episode, patternId=None): + return season, episode + + +class _FakeShowController: + def __init__(self, context): + self.context = context + + def getShowDescriptor(self, show_id): + return None + + +class _FakeFfxController: + calls: list[str] = [] + mode = "skip_first" + + def __init__(self, context, *args, **kwargs): + self.context = context + + def runJob(self, sourcePath, *args, **kwargs): + self.calls.append(sourcePath) + if self.mode == "clean": + return + + if self.mode == "warn_unhandled" and sourcePath.endswith("episode1.avi"): + recordUnremediedIssue( + self.context, + sourcePath, + "unhandled-warning", + ) + return + + if self.mode == "skip_first" and sourcePath.endswith("episode1.avi"): + message = ( + f"Skipping file {sourcePath}: ffmpeg still reported unset packet " + + "timestamps after retry with -fflags +genpts." + ) + recordUnremediedIssue( + self.context, + sourcePath, + "retry-with-generated-pts", + ) + self.context["logger"].warning(message) + raise FfmpegSkipFileWarning(message) + + +class ConvertDiagnosticCliTests(unittest.TestCase): + def setUp(self): + logger = get_ffx_logger() + for handler in list(logger.handlers): + logger.removeHandler(handler) + try: + handler.close() + except Exception: + pass + + self.tempdir = tempfile.TemporaryDirectory() + self.home_dir = Path(self.tempdir.name) / "home" + self.home_dir.mkdir() + self.database_path = Path(self.tempdir.name) / "test.db" + self.source_dir = Path(self.tempdir.name) / "source" + self.source_dir.mkdir() + self.source_one = self.source_dir / "episode1.avi" + self.source_two = self.source_dir / "episode2.avi" + self.source_one.write_bytes(b"one") + self.source_two.write_bytes(b"two") + _FakeFfxController.calls = [] + _FakeFfxController.mode = "skip_first" + + def tearDown(self): + self.tempdir.cleanup() + + def test_convert_continues_after_skipping_one_file_due_to_ffmpeg_diagnostic(self): + runner = CliRunner() + + with ( + patch("ffx.file_properties.FileProperties", _FakeFileProperties), + patch("ffx.ffx_controller.FfxController", _FakeFfxController), + patch( + "ffx.shifted_season_controller.ShiftedSeasonController", + _FakeShiftedSeasonController, + ), + patch("ffx.show_controller.ShowController", _FakeShowController), + ): + result = runner.invoke( + cli.ffx, + [ + "--database-file", + str(self.database_path), + "convert", + "--no-tmdb", + "--no-pattern", + str(self.source_one), + str(self.source_two), + ], + env={**os.environ, "HOME": str(self.home_dir)}, + ) + + self.assertEqual(0, result.exit_code, result.output) + self.assertEqual( + [str(self.source_one), str(self.source_two)], + _FakeFfxController.calls, + ) + self.assertIn("Skipping file", result.output) + self.assertIn("-fflags +genpts", result.output) + self.assertIn("Files with ffmpeg findings that require review:", result.output) + self.assertIn( + "episode1.avi: retry-with-generated-pts", + result.output, + ) + + def test_convert_prints_clean_summary_when_no_unremedied_issues_were_seen(self): + runner = CliRunner() + _FakeFfxController.mode = "clean" + + with ( + patch("ffx.file_properties.FileProperties", _FakeFileProperties), + patch("ffx.ffx_controller.FfxController", _FakeFfxController), + patch( + "ffx.shifted_season_controller.ShiftedSeasonController", + _FakeShiftedSeasonController, + ), + patch("ffx.show_controller.ShowController", _FakeShowController), + ): + result = runner.invoke( + cli.ffx, + [ + "--database-file", + str(self.database_path), + "convert", + "--no-tmdb", + "--no-pattern", + str(self.source_one), + str(self.source_two), + ], + env={**os.environ, "HOME": str(self.home_dir)}, + ) + + self.assertEqual(0, result.exit_code, result.output) + self.assertIn( + "All files converted with no ffmpeg findings requiring review.", + result.output, + ) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/unit/test_ffmpeg_diagnostics.py b/tests/unit/test_ffmpeg_diagnostics.py new file mode 100644 index 0000000..a2e9a89 --- /dev/null +++ b/tests/unit/test_ffmpeg_diagnostics.py @@ -0,0 +1,193 @@ +from __future__ import annotations + +from pathlib import Path +import sys +import unittest +from unittest.mock import patch + + +SRC_ROOT = Path(__file__).resolve().parents[2] / "src" + +if str(SRC_ROOT) not in sys.path: + sys.path.insert(0, str(SRC_ROOT)) + + +from ffx.diagnostics import ( # noqa: E402 + FfmpegCommandRunner, + FfmpegDiagnosticMonitor, + FfmpegSkipFileWarning, + getUnremediedIssues, + iterUnremediedIssueSummaryLines, +) + + +class RecordingLogger: + def __init__(self): + self.messages: list[str] = [] + + def warning(self, message, *args, **kwargs): + if args: + message = message % args + self.messages.append(str(message)) + + +class FfmpegDiagnosticsTests(unittest.TestCase): + def test_command_runner_retries_with_genpts_after_timestamp_warning(self): + logger = RecordingLogger() + context = { + "logger": logger, + "current_source_path": "tests/assets/avi/conan_S01E754_amalgam.avi", + } + runner = FfmpegCommandRunner(context) + commands = [] + + def fake_execute(commandSequence, **kwargs): + commands.append(list(commandSequence)) + stderrLineHandler = kwargs["stderrLineHandler"] + if len(commands) == 1: + self.assertTrue( + stderrLineHandler( + "[matroska @ 0x1] Timestamps are unset in a packet for stream 0. " + + "This is deprecated and will stop working in the future." + ) + ) + return "", "timestamp warning\n", -15 + + return "done", "", 0 + + with patch("ffx.diagnostics.monitor.executeProcess", side_effect=fake_execute): + out, err, rc = runner.execute(["ffmpeg", "-y", "-i", "input.avi", "output.mkv"]) + + self.assertEqual("done", out) + self.assertEqual("", err) + self.assertEqual(0, rc) + self.assertEqual( + [ + ["ffmpeg", "-y", "-i", "input.avi", "output.mkv"], + ["ffmpeg", "-fflags", "+genpts", "-y", "-i", "input.avi", "output.mkv"], + ], + commands, + ) + self.assertEqual( + [ + "ffmpeg reported unset packet timestamps for tests/assets/avi/conan_S01E754_amalgam.avi. " + + "Stopping early and retrying with -fflags +genpts." + ], + logger.messages, + ) + self.assertEqual({}, getUnremediedIssues(context)) + + def test_command_runner_skips_file_when_timestamp_warning_persists_after_genpts(self): + logger = RecordingLogger() + context = { + "logger": logger, + "current_source_path": "tests/assets/avi/conan_S01E754_amalgam.avi", + } + runner = FfmpegCommandRunner(context) + + def fake_execute(commandSequence, **kwargs): + stderrLineHandler = kwargs["stderrLineHandler"] + self.assertTrue( + stderrLineHandler( + "[matroska @ 0x1] Timestamps are unset in a packet for stream 0. " + + "This is deprecated and will stop working in the future." + ) + ) + return "", "timestamp warning\n", -15 + + with patch("ffx.diagnostics.monitor.executeProcess", side_effect=fake_execute): + with self.assertRaises(FfmpegSkipFileWarning): + runner.execute( + ["ffmpeg", "-fflags", "+genpts", "-y", "-i", "input.avi", "output.mkv"] + ) + + self.assertEqual( + [ + "Skipping file tests/assets/avi/conan_S01E754_amalgam.avi: ffmpeg still reported " + + "unset packet timestamps after retry with -fflags +genpts." + ], + logger.messages, + ) + self.assertEqual( + { + "tests/assets/avi/conan_S01E754_amalgam.avi": ["retry-with-generated-pts"] + }, + getUnremediedIssues(context), + ) + + def test_monitor_tracks_non_harmless_corrupt_mpeg_audio_remedy_in_summary(self): + logger = RecordingLogger() + context = { + "logger": logger, + "current_source_path": "tests/assets/avi/conan_S01E763_amalgam.avi", + } + monitor = FfmpegDiagnosticMonitor( + context, + ["ffmpeg", "-y", "-i", "input.avi", "output.mkv"], + ) + + self.assertFalse(monitor.handle_stderr_line("[mp3float @ 0x1] invalid block type")) + self.assertFalse( + monitor.handle_stderr_line( + "[aist#0:1/mp3 @ 0x2] [dec:mp3float @ 0x3] Error submitting packet to decoder: " + + "Invalid data found when processing input" + ) + ) + + self.assertEqual( + [ + "ffmpeg reported damaged MPEG audio frames while converting " + + "tests/assets/avi/conan_S01E763_amalgam.avi. FFX will continue, but the " + + "output audio may contain gaps or glitches." + ], + logger.messages, + ) + self.assertEqual( + { + "tests/assets/avi/conan_S01E763_amalgam.avi": ["warn-corrupt-mpeg-audio"] + }, + getUnremediedIssues(context), + ) + self.assertEqual( + ["conan_S01E763_amalgam.avi: warn-corrupt-mpeg-audio"], + iterUnremediedIssueSummaryLines(context), + ) + + def test_monitor_tracks_unhandled_diagnostic_for_summary(self): + context = { + "logger": RecordingLogger(), + "current_source_path": "tests/assets/avi/example.avi", + } + monitor = FfmpegDiagnosticMonitor( + context, + ["ffmpeg", "-y", "-i", "input.avi", "output.mkv"], + ) + + self.assertFalse( + monitor.handle_stderr_line( + "[avi @ 0x1] Strange warning with no automatic remedy is present" + ) + ) + + self.assertEqual( + { + "tests/assets/avi/example.avi": ["unhandled-warning"] + }, + getUnremediedIssues(context), + ) + self.assertEqual( + ["example.avi: unhandled-warning"], + iterUnremediedIssueSummaryLines(context), + ) + self.assertEqual( + [ + "ffmpeg reported a diagnostic with no automatic remedy while converting " + + "tests/assets/avi/example.avi. FFX will continue, but review the output " + + "file. First unhandled line: [avi @ 0x1] Strange warning with no automatic remedy is present" + ], + context["logger"].messages, + ) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/unit/test_process.py b/tests/unit/test_process.py index 05ef254..ace62aa 100644 --- a/tests/unit/test_process.py +++ b/tests/unit/test_process.py @@ -2,6 +2,7 @@ from __future__ import annotations from pathlib import Path import sys +import time import unittest from unittest.mock import patch @@ -51,6 +52,33 @@ class ProcessTests(unittest.TestCase): self.assertIn("Command timed out", err) self.assertIn(sys.executable, err) + def test_execute_process_can_stop_early_while_streaming_stderr(self): + start = time.monotonic() + observed_lines = [] + + out, err, rc = executeProcess( + [ + sys.executable, + "-c", + ( + "import sys, time; " + "sys.stderr.write('fatal warning\\n'); sys.stderr.flush(); " + "time.sleep(2); " + "sys.stderr.write('late line\\n'); sys.stderr.flush()" + ), + ], + stderrLineHandler=lambda line: observed_lines.append(line) or ("fatal warning" in line), + ) + + elapsed = time.monotonic() - start + + self.assertLess(elapsed, 1.5) + self.assertNotEqual(0, rc) + self.assertEqual("", out) + self.assertIn("fatal warning", err) + self.assertNotIn("late line", err) + self.assertEqual(["fatal warning\n"], observed_lines) + def test_get_wrapped_command_sequence_leaves_command_unwrapped_when_limits_disabled(self): wrapped = getWrappedCommandSequence( ["ffmpeg", "-i", "input.mkv"],