14 Commits
v0.3.1 ... main

Author SHA1 Message Date
Javanaut
2e2c94f539 Release v0.4.2 2026-04-24 13:40:37 +02:00
Javanaut
12be6e985a v0.4.2 2026-04-24 13:39:57 +02:00
Javanaut
12310942ae Fix inspect attachment subtracks 2026-04-24 10:34:43 +02:00
Javanaut
f913cb4fe3 ff 2026-04-24 08:49:48 +02:00
Javanaut
0a153280e3 ff 2026-04-24 08:49:30 +02:00
Javanaut
6ca0cd54b0 addendum 2026-04-23 22:16:03 +02:00
Javanaut
14c956b6fa Release v0.4.1 2026-04-23 22:10:04 +02:00
Javanaut
502a822bb4 prep 0.4.1 2026-04-23 22:09:36 +02:00
Javanaut
6cc21b5f36 Adds diagnostics/remedy system 2026-04-23 20:32:49 +02:00
Javanaut
0034f8ca97 ff 2026-04-23 16:37:47 +02:00
Javanaut
eedcbaed0a Merge branch 'dev' of gitea.maveno.de:Javanaut/ffx into dev 2026-04-23 16:31:19 +02:00
Javanaut
653ce7b417 Copy audio and video flags 2026-04-23 16:30:15 +02:00
Javanaut
b80c055826 fix table 2026-04-17 13:17:15 +02:00
Javanaut
c5fc6ac13d fix styled ASS and refactor att format 2026-04-17 11:41:13 +02:00
33 changed files with 2006 additions and 74 deletions

2
.gitignore vendored
View File

@@ -10,6 +10,8 @@ tools/ansible/inventory/group_vars/all.yml
ffx_test_report.log
bin/conversiontest.py
tests/assets/
build/
dist/
*.egg-info/

View File

@@ -99,6 +99,19 @@ TMDB-backed metadata enrichment requires `TMDB_API_KEY` to be set in the environ
## Version History
### 0.4.2
- pattern details now show an inline `Show: <quality>` hint next to the quality field when the pattern itself has no stored quality but the selected show does
- inspect stream tables now show attachment format labels like `TTF` in the codec column and keep attachment language cells blank instead of showing an undefined language
- ffmpeg damaged-MP3 diagnostics now recognize additional corruption lines such as `invalid new backstep`, keeping them grouped under the `warn-corrupt-mpeg-audio` review summary
### 0.4.1
- `convert` now supports `--copy-video` and `--copy-audio` to keep the selected stream type in copy mode without applying the corresponding reencode flags, filters, or formatting options
- ffmpeg conversions now monitor diagnostics while the process is running, retry unset AVI packet timestamps once with `-fflags +genpts`, and stop early when a file should be skipped instead of waiting for the full job to finish
- end-of-run convert summaries now list only ffmpeg findings that still require review, including named remedy identifiers such as `warn-corrupt-mpeg-audio`
- `upgrade` now finishes by reporting the installed FFX version together with the active bundle branch
### 0.3.1
- debug mode screen titles now append the active Textual screen class name, making screen-specific troubleshooting easier during inspect and edit flows

170
docs/file_formats.md Normal file
View File

@@ -0,0 +1,170 @@
# File Formats
This document captures source-file-format notes that complement the normative
requirements in `requirements/source_file_formats.md`.
The first documented format is a Matroska source that carries styled ASS/SSA
subtitle streams together with embedded font attachments.
## Styled ASS In Matroska With Embedded Fonts
These files are typically `.mkv` releases where subtitle rendering quality
depends on keeping both parts of the subtitle package together:
- one or more subtitle streams with codec `ass`
- one or more attachment streams that embed font files used by those subtitles
This matters because ASS subtitles are not plain text subtitles in the narrow
WebVTT sense. They can carry layout, styling, positioning, karaoke, signs, and
other typesetting effects. If the matching embedded fonts are lost, consumers
can still see subtitle text but the intended styling and sometimes glyph
coverage can be degraded.
For FFX this format is special because the ASS subtitle streams should remain
normally editable and mappable, while the related font attachments should be
transported unchanged.
## Observed Sample
Assessment date: `2026-04-17`
Observed sample file:
- `tests/assets/boruto_s01e283_ssa.mkv`
Commands used for assessment:
```bash
ffprobe tests/assets/boruto_s01e283_ssa.mkv
ffprobe -hide_banner -show_format -show_streams -of json tests/assets/boruto_s01e283_ssa.mkv
```
Observed stream layout:
| Stream index | Kind | Key details |
| --- | --- | --- |
| `0` | video | `codec_name=h264` |
| `1` | audio | `codec_name=aac`, `language=jpn` |
| `2` | subtitle | `codec_name=ass`, `language=ger`, default |
| `3` | subtitle | `codec_name=ass`, `language=eng` |
| `4`-`13` | attachment | `tags.mimetype=font/ttf`, `.ttf` filenames |
Observed attachment filenames:
- `AmazonEmberTanuki-Italic.ttf`
- `AmazonEmberTanuki-Regular.ttf`
- `Arial.ttf`
- `Arial Bold.ttf`
- `Georgia.ttf`
- `Times New Roman.ttf`
- `Times New Roman Bold.ttf`
- `Trebuchet MS.ttf`
- `Verdana.ttf`
- `Verdana Bold.ttf`
Important probe behavior from the real sample:
- Plain `ffprobe` lists the font streams as `Attachment: none`.
- Plain `ffprobe` also prints warnings such as `Could not find codec
parameters for stream 4 (Attachment: none): unknown codec` and later
`Unsupported codec with id 0 for input stream ...`.
- The JSON produced by `FileProperties.FFPROBE_COMMAND_TOKENS`
(`ffprobe -hide_banner -show_format -show_streams -of json`) still exposes
the attachment streams clearly through `codec_type="attachment"` and the
attachment tags.
- In that JSON, the attachment streams do not expose `codec_name`.
This last point is important for FFX: robust detection must not depend on
attachment `codec_name` being present.
## Detection Guidance
Current known indicators for this format are:
- one or more subtitle streams with `codec_type="subtitle"` and
`codec_name="ass"`
- one or more attachment streams with `codec_type="attachment"`
- attachment tags that identify embedded fonts, especially
`tags.mimetype="font/ttf"`
- attachment filenames that end in `.ttf`
The pattern can vary. FFX should therefore treat the above as a cluster of
signals rather than an exact signature tied to one file.
Inference from the observed sample plus FFmpeg documentation:
- MIME matching should not be limited to `font/ttf` alone.
- The Boruto sample uses `font/ttf`.
- FFmpeg's Matroska attachment example uses
`mimetype=application/x-truetype-font` for a `.ttf` attachment.
- Detection should therefore normalize multiple TTF-like MIME values rather
than depend on a single exact string.
## Processing Expectations In FFX
The format-specific requirements live in
`requirements/source_file_formats.md`. In practical terms, FFX should:
- recognize the ASS-plus-font-attachment pattern even when attachment probe
data is incomplete
- tell the operator that the pattern was detected and that special handling is
being used
- reject sidecar subtitle import for such sources, because converting or
replacing these subtitle tracks with ordinary external text subtitles would
break the intended subtitle package
- continue to allow normal manipulation of the ASS subtitle tracks themselves
- preserve the font attachment streams unchanged
## FFmpeg Notes
Relevant FFmpeg documentation confirms several behaviors that line up with
FFX's needs:
- FFmpeg documents `-attach` as adding an attachment stream to the output, and
explicitly names Matroska fonts used in subtitle rendering as an example.
- FFmpeg documents attachment streams as regular streams that are created after
the mapped media streams.
- FFmpeg documents `-dump_attachment` for extracting attachment streams, which
is useful for debugging or validating a source file's embedded fonts.
- FFmpeg's Matroska example requires a `mimetype` metadata tag for attached
fonts, which is consistent with using attachment tags as detection signals.
- FFmpeg also notes that attachments are implemented as codec extradata. That
helps explain why probe output for attachment streams can look different from
ordinary audio, video, and subtitle streams.
Implication for FFX:
- Attachment preservation is not an optional cosmetic feature for this format.
It is part of preserving the subtitle package correctly.
## Jellyfin Notes
Jellyfin's documentation also supports keeping this format intact:
- Jellyfin's subtitle compatibility table lists `ASS/SSA` as supported in
`MKV` and not supported in `MP4`.
- Jellyfin notes that when subtitles must be transcoded, they are either
converted to a supported format or burned into the video, and burning them in
is the most CPU-intensive path.
- Jellyfin's subtitle-extraction example for `SSA/ASS` first dumps attachment
streams and then extracts the ASS subtitle stream, which reflects the real
relationship between ASS subtitles and embedded fonts in MKV releases.
- Jellyfin's font documentation says text-based subtitles require fonts to
render properly.
- Jellyfin's configuration documentation says the web client uses configured
fallback fonts for ASS subtitles when other fonts such as MKV attachments or
client-side fonts are not available.
Inference from the Jellyfin compatibility tables:
- Keeping this subtitle format in Matroska is the safest interoperability
choice for Jellyfin consumers.
- Converting the subtitle payload to WebVTT would lose styled ASS behavior.
- Dropping the attachment streams would force client or fallback font
substitution and can change appearance or glyph coverage.
## References
- FFmpeg documentation: https://ffmpeg.org/ffmpeg.html
- Jellyfin codec support: https://jellyfin.org/docs/general/clients/codec-support/
- Jellyfin configuration and fonts: https://jellyfin.org/docs/general/administration/configuration/

View File

@@ -1,7 +1,7 @@
[project]
name = "ffx"
description = "FFX recoding and metadata managing tool"
version = "0.3.1"
version = "0.4.2"
license = {file = "LICENSE.md"}
dependencies = [
"requests",

View File

@@ -0,0 +1,67 @@
from enum import Enum
import os
class AttachmentFormat(Enum):
TTF = {'identifier': 'ttf', 'format': None, 'extension': 'ttf', 'label': 'TTF'}
PNG = {'identifier': 'png', 'format': None, 'extension': 'png', 'label': 'PNG'}
UNKNOWN = {'identifier': 'unknown', 'format': None, 'extension': None, 'label': 'UNKNOWN'}
def identifier(self):
return str(self.value['identifier'])
def label(self):
return str(self.value['label'])
def format(self):
return self.value['format']
def extension(self):
return str(self.value['extension'])
@staticmethod
def identify(identifier: str):
formats = [f for f in AttachmentFormat if f.value['identifier'] == str(identifier)]
if formats:
return formats[0]
return AttachmentFormat.UNKNOWN
@staticmethod
def identifyFfprobeStream(streamObj: dict):
identifier = streamObj.get("codec_name")
identifiedFormat = AttachmentFormat.identify(identifier)
if identifiedFormat != AttachmentFormat.UNKNOWN:
return identifiedFormat
if str(streamObj.get("codec_type", "")).strip() != "attachment":
return AttachmentFormat.UNKNOWN
tags = streamObj.get("tags", {}) or {}
mimetype = str(tags.get("mimetype", "")).strip().lower()
filename = str(tags.get("filename", "")).strip().lower()
filenameExtension = os.path.splitext(filename)[1]
if (
mimetype in {
"font/ttf",
"application/x-truetype-font",
"application/x-font-ttf",
}
or "truetype" in mimetype
or filenameExtension == ".ttf"
):
return AttachmentFormat.TTF
if mimetype in {"image/png", "image/x-png"} or filenameExtension == ".png":
return AttachmentFormat.PNG
return AttachmentFormat.UNKNOWN
@staticmethod
def fromTrackCodec(trackCodec):
identifier = getattr(trackCodec, "identifier", None)
if callable(identifier):
return AttachmentFormat.identify(trackCodec.identifier())
return AttachmentFormat.UNKNOWN

View File

@@ -68,6 +68,14 @@ CUT_OPTION_HELP = (
+ "or --cut START,DURATION for an explicit start and duration. "
+ "Omit to disable."
)
COPY_VIDEO_OPTION_HELP = (
"Copy video streams without re-encoding. Skips video encoder options "
+ "and video filters."
)
COPY_AUDIO_OPTION_HELP = (
"Copy audio streams without re-encoding. Skips audio encoder options "
+ "and audio filters."
)
def normalizeNicenessOption(ctx, param, value):
@@ -385,6 +393,41 @@ def getTrackedGitChanges(repoPath):
return [line for line in completed.stdout.splitlines() if line.strip()]
def getCurrentGitBranch(repoPath):
completed = subprocess.run(
['git', 'rev-parse', '--abbrev-ref', 'HEAD'],
cwd=repoPath,
capture_output=True,
text=True,
)
if completed.returncode != 0:
commandLabel = 'git rev-parse --abbrev-ref HEAD'
errorOutput = completed.stderr.strip() or completed.stdout.strip()
raise click.ClickException(
f"Unable to inspect bundle repository branch using '{commandLabel}': {errorOutput}"
)
return completed.stdout.strip() or "unknown"
def getBundleVersion(repoPath):
constantsPath = os.path.join(repoPath, 'src', 'ffx', 'constants.py')
try:
with open(constantsPath, encoding='utf-8') as constantsFile:
for line in constantsFile:
strippedLine = line.strip()
if strippedLine.startswith('VERSION=') or strippedLine.startswith('VERSION ='):
return strippedLine.split('=', 1)[1].strip().strip('"\'')
except OSError as ex:
raise click.ClickException(
f"Unable to inspect bundle version from {constantsPath}: {ex}"
) from ex
raise click.ClickException(f"Unable to inspect bundle version from {constantsPath}")
def runScriptWrapper(ctx, scriptPath, missingDescription, commandArgs):
if not os.path.isfile(scriptPath):
raise click.ClickException(f"{missingDescription} not found at {scriptPath}")
@@ -507,6 +550,10 @@ def upgrade(ctx, branch):
if completed.returncode != 0:
ctx.exit(completed.returncode)
upgradedBranch = getCurrentGitBranch(bundleRepoPath)
upgradedVersion = getBundleVersion(bundleRepoPath)
click.echo(f"Updated FFX to version {upgradedVersion} from branch {upgradedBranch}.")
@ffx.command()
@click.pass_context
@@ -639,6 +686,7 @@ def getUnmuxSequence(trackDescriptor: TrackDescriptor, sourcePath, targetPrefix,
trackType = trackDescriptor.getType()
trackCodec = trackDescriptor.getCodec()
trackFormat = trackDescriptor.getFormatDescriptor()
targetPathBase = os.path.join(targetDirectory, targetPrefix) if targetDirectory else targetPrefix
@@ -651,12 +699,12 @@ def getUnmuxSequence(trackDescriptor: TrackDescriptor, sourcePath, targetPrefix,
commandTokens += ['-c', 'copy']
# output format
codecFormat = trackCodec.format()
codecFormat = trackFormat.format()
if codecFormat is not None:
commandTokens += ['-f', codecFormat]
# output filename
commandTokens += [f"{targetPathBase}.{trackCodec.extension()}"]
commandTokens += [f"{targetPathBase}.{trackFormat.extension()}"]
return commandTokens
@@ -771,7 +819,7 @@ def unmux(ctx,
if not ctx.obj['dry_run']:
#TODO #425: Codec Enum
ctx.obj['logger'].info(f"Unmuxing stream {trackDescriptor.getIndex()} into file {targetPrefix}.{trackDescriptor.getCodec().extension()}")
ctx.obj['logger'].info(f"Unmuxing stream {trackDescriptor.getIndex()} into file {targetPrefix}.{trackDescriptor.getFormatDescriptor().extension()}")
ctx.obj['logger'].debug(f"Executing unmuxing sequence")
@@ -914,6 +962,8 @@ def checkUniqueDispositions(context, mediaDescriptor: MediaDescriptor):
@click.option('-l', '--label', type=str, default='', help='Label to be used as filename prefix')
@click.option('-v', '--video-encoder', type=str, default=DEFAULT_VIDEO_ENCODER_LABEL, help=f"Target video encoder (vp9, av1, h264 or copy)", show_default=True)
@click.option('--copy-video', is_flag=True, default=False, help=COPY_VIDEO_OPTION_HELP)
@click.option('--copy-audio', is_flag=True, default=False, help=COPY_AUDIO_OPTION_HELP)
@click.option('-q', '--quality', type=str, default="", help=f"Quality settings to be used with VP9/H264 encoder")
@click.option('-p', '--preset', type=str, default="", help=f"Quality preset to be used with AV1 encoder")
@@ -1010,6 +1060,8 @@ def convert(ctx,
paths,
label,
video_encoder,
copy_video,
copy_audio,
quality,
preset,
stereo_bitrate,
@@ -1069,6 +1121,11 @@ def convert(ctx,
Suffices will we appended to filename in case of multiple created files
or if the filename has not changed."""
from ffx.ffx_controller import FfxController
from ffx.diagnostics import (
FfmpegSkipFileWarning,
getUnremediedIssues,
iterUnremediedIssueSummaryLines,
)
from ffx.file_properties import FileProperties
from ffx.filter.crop_filter import CropFilter
from ffx.filter.deinterlace_filter import DeinterlaceFilter
@@ -1089,9 +1146,12 @@ def convert(ctx,
context = ctx.obj
context['video_encoder'] = VideoEncoder.fromLabel(video_encoder)
context['copy_video'] = copy_video
context['copy_audio'] = copy_audio
copyVideoEffective = copy_video or context['video_encoder'] == VideoEncoder.COPY
# HINT: quick and dirty override for h264, todo improve
if context['video_encoder'] in (VideoEncoder.H264, VideoEncoder.COPY):
if context['video_encoder'] in (VideoEncoder.H264, VideoEncoder.COPY) or copy_video or copy_audio:
targetFormat = ''
targetExtension = 'mkv'
else:
@@ -1224,36 +1284,54 @@ def convert(ctx,
tc = TmdbController() if context['use_tmdb'] else None
qualityKwargs = {QualityFilter.QUALITY_KEY: str(quality)}
if copyVideoEffective and quality:
ctx.obj['logger'].warning("Ignoring quality settings because video is being copied")
qualityKwargs = {
QualityFilter.QUALITY_KEY: "" if copyVideoEffective else str(quality)
}
qf = QualityFilter(**qualityKwargs)
if context['video_encoder'] == VideoEncoder.AV1 and preset:
if context['video_encoder'] == VideoEncoder.AV1 and preset and not copyVideoEffective:
presetKwargs = {PresetFilter.PRESET_KEY: preset}
PresetFilter(**presetKwargs)
cf = None
# if crop != 'none':
if crop == 'auto':
videoFilterOptionsRequested = (
crop != 'none'
or deinterlace != 'none'
or denoise != 'none'
or denoise_strength
or denoise_patch_size
or denoise_chroma_patch_size
or denoise_research_window
or denoise_chroma_research_window
)
if copyVideoEffective and videoFilterOptionsRequested:
ctx.obj['logger'].warning("Ignoring video filter options because video is being copied")
if crop == 'auto' and not copyVideoEffective:
cropKwargs = {}
cf = CropFilter(**cropKwargs)
denoiseKwargs = {}
if denoise_strength:
if denoise_strength and not copyVideoEffective:
denoiseKwargs[NlmeansFilter.STRENGTH_KEY] = denoise_strength
if denoise_patch_size:
if denoise_patch_size and not copyVideoEffective:
denoiseKwargs[NlmeansFilter.PATCH_SIZE_KEY] = denoise_patch_size
if denoise_chroma_patch_size:
if denoise_chroma_patch_size and not copyVideoEffective:
denoiseKwargs[NlmeansFilter.CHROMA_PATCH_SIZE_KEY] = denoise_chroma_patch_size
if denoise_research_window:
if denoise_research_window and not copyVideoEffective:
denoiseKwargs[NlmeansFilter.RESEARCH_WINDOW_KEY] = denoise_research_window
if denoise_chroma_research_window:
if denoise_chroma_research_window and not copyVideoEffective:
denoiseKwargs[NlmeansFilter.CHROMA_RESEARCH_WINDOW_KEY] = denoise_chroma_research_window
if denoise != 'none' or denoiseKwargs:
if not copyVideoEffective and (denoise != 'none' or denoiseKwargs):
NlmeansFilter(**denoiseKwargs)
if deinterlace != 'none':
if deinterlace != 'none' and not copyVideoEffective:
DeinterlaceFilter()
chainYield = list(qf.getChainYield())
@@ -1313,10 +1391,12 @@ def convert(ctx,
sourceMediaDescriptor = mediaFileProperties.getMediaDescriptor()
from ffx.attachment_format import AttachmentFormat
if ([smd for smd in sourceMediaDescriptor.getSubtitleTracks()
if smd.getCodec() == TrackCodec.ASS]
and [amd for amd in sourceMediaDescriptor.getAttachmentTracks()
if amd.getCodec() == TrackCodec.TTF]):
if amd.getAttachmentFormat() == AttachmentFormat.TTF]):
targetFormat = ''
targetExtension = 'mkv'
@@ -1525,18 +1605,30 @@ def convert(ctx,
if rename_only:
shutil.move(sourcePath, targetPath)
else:
fc.runJob(sourcePath,
targetPath,
targetFormat,
chainIteration,
cropArguments,
currentPattern,
currentShowDescriptor)
try:
fc.runJob(sourcePath,
targetPath,
targetFormat,
chainIteration,
cropArguments,
currentPattern,
currentShowDescriptor)
except FfmpegSkipFileWarning:
if os.path.exists(targetPath):
os.remove(targetPath)
continue
endTime = time.perf_counter()
ctx.obj['logger'].info(f"\nDONE\nTime elapsed {endTime - startTime}")
unremediedIssues = getUnremediedIssues(context)
if unremediedIssues:
ctx.obj['logger'].warning("\nFiles with ffmpeg findings that require review:")
for summaryLine in iterUnremediedIssueSummaryLines(context):
ctx.obj['logger'].warning(summaryLine)
else:
ctx.obj['logger'].info("All files converted with no issues.")
if __name__ == '__main__':

View File

@@ -1,4 +1,4 @@
VERSION='0.3.1'
VERSION='0.4.2'
DATABASE_VERSION = 3
DEFAULT_QUALITY = 32

View File

@@ -0,0 +1,24 @@
from .base import FfmpegRemedy, FfmpegRemedyDecision, FfmpegSkipFileWarning
from .monitor import FfmpegCommandRunner, FfmpegDiagnosticMonitor
from .retry_with_generated_pts import RetryWithGeneratedPtsRemedy
from .state import (
getDiagnosticsState,
getUnremediedIssues,
iterUnremediedIssueSummaryLines,
recordUnremediedIssue,
)
from .warn_corrupt_mpeg_audio import WarnCorruptMpegAudioRemedy
__all__ = [
"FfmpegCommandRunner",
"FfmpegDiagnosticMonitor",
"FfmpegRemedy",
"FfmpegRemedyDecision",
"FfmpegSkipFileWarning",
"RetryWithGeneratedPtsRemedy",
"WarnCorruptMpegAudioRemedy",
"getDiagnosticsState",
"getUnremediedIssues",
"iterUnremediedIssueSummaryLines",
"recordUnremediedIssue",
]

View File

@@ -0,0 +1,33 @@
from __future__ import annotations
from dataclasses import dataclass
class FfmpegSkipFileWarning(Exception):
pass
@dataclass(frozen=True)
class FfmpegRemedyDecision:
stop_process: bool = False
retry_input_tokens: tuple[str, ...] = ()
skip_file: bool = False
console_warning: str = ""
summary_identifier: str = ""
unremedied_issue_identifier: str = ""
@property
def retry_requested(self) -> bool:
return bool(self.retry_input_tokens)
class FfmpegRemedy:
identifier = "ffmpeg-remedy"
harmless = False
def inspect_line(
self,
line: str,
session: "FfmpegDiagnosticMonitor",
) -> FfmpegRemedyDecision | None:
raise NotImplementedError

View File

@@ -0,0 +1,222 @@
from __future__ import annotations
import re
from ffx.logging_utils import get_ffx_logger
from ffx.process import executeProcess
from .base import FfmpegSkipFileWarning, FfmpegRemedy
from .retry_with_generated_pts import RetryWithGeneratedPtsRemedy
from .state import recordUnremediedIssue
from .warn_corrupt_mpeg_audio import WarnCorruptMpegAudioRemedy
UNHANDLED_DIAGNOSTIC_PATTERNS = (
re.compile(r"\bwarning\b", re.IGNORECASE),
re.compile(r"\berror\b", re.IGNORECASE),
re.compile(r"\bfailed\b", re.IGNORECASE),
re.compile(r"\binvalid\b", re.IGNORECASE),
re.compile(r"\bmissing\b", re.IGNORECASE),
re.compile(r"\bcorrupt\b", re.IGNORECASE),
re.compile(r"\boverflow\b", re.IGNORECASE),
re.compile(r"\bdeprecated\b", re.IGNORECASE),
)
class FfmpegDiagnosticMonitor:
def __init__(
self,
context: dict | None,
command_sequence: list[str],
*,
remedies: list[FfmpegRemedy] | None = None,
emittedWarnings: set[str] | None = None,
):
self.context = context or {}
self.command_sequence = list(command_sequence)
self.logger = self.context.get("logger", get_ffx_logger())
self.source_path = str(self.context.get("current_source_path", "")).strip()
self.remedies = remedies or [
RetryWithGeneratedPtsRemedy(),
WarnCorruptMpegAudioRemedy(),
]
self._emittedWarnings = emittedWarnings if emittedWarnings is not None else set()
self.retry_input_tokens: tuple[str, ...] = ()
self.skip_file = False
self.skip_file_message = ""
def describe_source(self) -> str:
return self.source_path if self.source_path else "current file"
def command_contains_tokens(self, tokens: tuple[str, ...]) -> bool:
tokenCount = len(tokens)
if tokenCount == 0:
return True
return any(
tuple(self.command_sequence[index:index + tokenCount]) == tuple(tokens)
for index in range(len(self.command_sequence) - tokenCount + 1)
)
def emitConsoleWarning(self, warningMessage: str) -> None:
if warningMessage and warningMessage not in self._emittedWarnings:
self.logger.warning(warningMessage)
self._emittedWarnings.add(warningMessage)
def recordUnremediedIssue(self, issueIdentifier: str, issueLine: str) -> None:
isFirstIssueForFile = recordUnremediedIssue(
self.context,
self.describe_source(),
issueIdentifier,
)
if not isFirstIssueForFile:
return
self.emitConsoleWarning(
f"ffmpeg reported a diagnostic with no automatic remedy while converting "
+ f"{self.describe_source()}. FFX will continue, but review the output "
+ f"file. First unhandled line: {issueLine}"
)
def lineLooksLikeUnhandledDiagnostic(self, line: str) -> bool:
return any(pattern.search(line) for pattern in UNHANDLED_DIAGNOSTIC_PATTERNS)
def getUnhandledDiagnosticIdentifier(self, line: str) -> str:
loweredLine = str(line).lower()
if any(token in loweredLine for token in ("error", "failed", "invalid", "missing", "corrupt", "overflow")):
return "unhandled-error"
if any(token in loweredLine for token in ("warning", "deprecated")):
return "unhandled-warning"
return "unhandled-diagnostic"
def getSummaryIdentifier(
self,
remedy: FfmpegRemedy,
decision,
) -> str:
explicitIdentifier = str(decision.summary_identifier).strip()
if explicitIdentifier:
return explicitIdentifier
remedyIdentifier = str(getattr(remedy, "identifier", "")).strip()
if remedyIdentifier and remedyIdentifier != FfmpegRemedy.identifier:
return remedyIdentifier
return str(decision.unremedied_issue_identifier).strip()
def shouldRecordSummary(
self,
remedy: FfmpegRemedy,
decision,
) -> bool:
if getattr(remedy, "harmless", False):
return False
if decision.retry_requested and not decision.skip_file:
return False
return bool(self.getSummaryIdentifier(remedy, decision))
def handle_stderr_line(self, line: str) -> bool:
strippedLine = str(line).strip()
if not strippedLine:
return False
for remedy in self.remedies:
decision = remedy.inspect_line(strippedLine, self)
if decision is None:
continue
self.emitConsoleWarning(decision.console_warning)
if decision.retry_requested:
self.retry_input_tokens = tuple(decision.retry_input_tokens)
if self.shouldRecordSummary(remedy, decision):
recordUnremediedIssue(
self.context,
self.describe_source(),
self.getSummaryIdentifier(remedy, decision),
)
if decision.skip_file:
self.skip_file = True
self.skip_file_message = (
decision.console_warning
or f"Skipping file {self.describe_source()} because ffmpeg reported a fatal diagnostic."
)
return bool(decision.stop_process)
if self.lineLooksLikeUnhandledDiagnostic(strippedLine):
self.recordUnremediedIssue(
self.getUnhandledDiagnosticIdentifier(strippedLine),
strippedLine,
)
return False
@property
def retry_requested(self) -> bool:
return bool(self.retry_input_tokens)
def insertFfmpegInputOptions(
commandSequence: list[str],
extraTokens: tuple[str, ...],
) -> list[str]:
if not extraTokens:
return list(commandSequence)
if not commandSequence:
return list(extraTokens)
return [commandSequence[0]] + list(extraTokens) + list(commandSequence[1:])
class FfmpegCommandRunner:
def __init__(
self,
context: dict | None,
*,
remedies: list[FfmpegRemedy] | None = None,
):
self.__context = context or {}
self.__remedies = remedies
def execute(
self,
commandSequence: list[str],
*,
directory: str = None,
timeoutSeconds: float = None,
):
emittedWarnings: set[str] = set()
attemptCommandSequence = list(commandSequence)
while True:
monitor = FfmpegDiagnosticMonitor(
self.__context,
attemptCommandSequence,
remedies=self.__remedies,
emittedWarnings=emittedWarnings,
)
out, err, rc = executeProcess(
attemptCommandSequence,
directory=directory,
context=self.__context,
timeoutSeconds=timeoutSeconds,
stderrLineHandler=monitor.handle_stderr_line,
)
if monitor.retry_requested:
attemptCommandSequence = insertFfmpegInputOptions(
attemptCommandSequence,
monitor.retry_input_tokens,
)
continue
if monitor.skip_file:
raise FfmpegSkipFileWarning(monitor.skip_file_message)
return out, err, rc

View File

@@ -0,0 +1,41 @@
from __future__ import annotations
import re
from .base import FfmpegRemedy, FfmpegRemedyDecision
class RetryWithGeneratedPtsRemedy(FfmpegRemedy):
identifier = "retry-with-generated-pts"
RETRY_INPUT_TOKENS = ("-fflags", "+genpts")
TIMESTAMP_UNSET_PATTERN = re.compile(
r"Timestamps are unset in a packet for stream \d+"
)
def inspect_line(
self,
line: str,
session: "FfmpegDiagnosticMonitor",
) -> FfmpegRemedyDecision | None:
if self.TIMESTAMP_UNSET_PATTERN.search(line) is None:
return None
if session.command_contains_tokens(self.RETRY_INPUT_TOKENS):
return FfmpegRemedyDecision(
stop_process=True,
skip_file=True,
console_warning=(
f"Skipping file {session.describe_source()}: ffmpeg still reported "
+ "unset packet timestamps after retry with -fflags +genpts."
),
unremedied_issue_identifier="timestamp-unset-after-genpts",
)
return FfmpegRemedyDecision(
stop_process=True,
retry_input_tokens=self.RETRY_INPUT_TOKENS,
console_warning=(
f"ffmpeg reported unset packet timestamps for {session.describe_source()}. "
+ "Stopping early and retrying with -fflags +genpts."
),
)

View File

@@ -0,0 +1,53 @@
from __future__ import annotations
import os
DIAGNOSTICS_STATE_KEY = "diagnostics_state"
UNREMEDIED_ISSUES_KEY = "unremedied_issues"
def getDiagnosticsState(context: dict | None) -> dict:
if context is None:
return {UNREMEDIED_ISSUES_KEY: {}}
if DIAGNOSTICS_STATE_KEY not in context:
context[DIAGNOSTICS_STATE_KEY] = {
UNREMEDIED_ISSUES_KEY: {},
}
return context[DIAGNOSTICS_STATE_KEY]
def recordUnremediedIssue(
context: dict | None,
sourcePath: str,
identifier: str,
) -> bool:
if not sourcePath:
return False
diagnosticsState = getDiagnosticsState(context)
unremediedIssues = diagnosticsState[UNREMEDIED_ISSUES_KEY]
issueList = unremediedIssues.setdefault(sourcePath, [])
strippedIdentifier = str(identifier).strip()
if not strippedIdentifier or strippedIdentifier in issueList:
return False
issueList.append(strippedIdentifier)
return True
def getUnremediedIssues(context: dict | None) -> dict[str, list[str]]:
diagnosticsState = getDiagnosticsState(context)
return diagnosticsState.get(UNREMEDIED_ISSUES_KEY, {})
def iterUnremediedIssueSummaryLines(context: dict | None) -> list[str]:
summaryLines = []
unremediedIssues = getUnremediedIssues(context)
for sourcePath in sorted(unremediedIssues.keys()):
identifiers = unremediedIssues[sourcePath]
summaryLines.append(f"{os.path.basename(sourcePath)}: {', '.join(identifiers)}")
return summaryLines

View File

@@ -0,0 +1,35 @@
from __future__ import annotations
import re
from .base import FfmpegRemedy, FfmpegRemedyDecision
class WarnCorruptMpegAudioRemedy(FfmpegRemedy):
identifier = "warn-corrupt-mpeg-audio"
PATTERNS = (
re.compile(r"\[mp3float @ .*\] invalid block type", re.IGNORECASE),
re.compile(r"\[mp3float @ .*\] invalid new backstep -?\d+", re.IGNORECASE),
re.compile(r"\[mp3float @ .*\] Header missing"),
re.compile(r"\[mp3float @ .*\] overread, skip ", re.IGNORECASE),
re.compile(r"Error while decoding MPEG audio frame\."),
re.compile(
r"Error submitting packet to decoder: Invalid data found when processing input"
),
)
def inspect_line(
self,
line: str,
session: "FfmpegDiagnosticMonitor",
) -> FfmpegRemedyDecision | None:
if not any(pattern.search(line) for pattern in self.PATTERNS):
return None
return FfmpegRemedyDecision(
console_warning=(
f"ffmpeg reported damaged MPEG audio frames while converting "
+ f"{session.describe_source()}. FFX will continue, but the output "
+ "audio may contain gaps or glitches."
),
)

View File

@@ -0,0 +1,27 @@
from .diagnostics import (
FfmpegCommandRunner,
FfmpegDiagnosticMonitor,
FfmpegRemedy,
FfmpegRemedyDecision,
FfmpegSkipFileWarning,
RetryWithGeneratedPtsRemedy,
WarnCorruptMpegAudioRemedy,
getDiagnosticsState,
getUnremediedIssues,
iterUnremediedIssueSummaryLines,
recordUnremediedIssue,
)
__all__ = [
"FfmpegCommandRunner",
"FfmpegDiagnosticMonitor",
"FfmpegRemedy",
"FfmpegRemedyDecision",
"FfmpegSkipFileWarning",
"RetryWithGeneratedPtsRemedy",
"WarnCorruptMpegAudioRemedy",
"getDiagnosticsState",
"getUnremediedIssues",
"iterUnremediedIssueSummaryLines",
"recordUnremediedIssue",
]

View File

@@ -3,6 +3,7 @@ from functools import lru_cache
from logging import Logger
from ffx.media_descriptor_change_set import MediaDescriptorChangeSet
from ffx.diagnostics import FfmpegCommandRunner
from ffx.media_descriptor import MediaDescriptor
from ffx.audio_layout import AudioLayout
@@ -63,6 +64,7 @@ class FfxController():
self.__logger: Logger = context['logger']
self.__warnedH264Fallback = False
self.__ffmpegCommandRunner = FfmpegCommandRunner(context)
@staticmethod
@@ -100,7 +102,13 @@ class FfxController():
def executeCommandSequence(self, commandSequence):
out, err, rc = executeProcess(commandSequence, context=self.__context)
if commandSequence and str(commandSequence[0]).strip() == "ffmpeg":
out, err, rc = self.__ffmpegCommandRunner.execute(
commandSequence,
timeoutSeconds=None,
)
else:
out, err, rc = executeProcess(commandSequence, context=self.__context)
if rc:
raise click.ClickException(f"Command resulted in error: rc={rc} error={err}")
return out, err, rc
@@ -172,6 +180,16 @@ class FfxController():
def generateAudioCopyTokens(self, subIndex):
return [f"-c:a:{int(subIndex)}", 'copy']
def generateVideoCopyAllTokens(self):
if self.__targetMediaDescriptor.getTrackDescriptors(trackType=TrackType.VIDEO):
return ["-c:v", "copy"]
return []
def generateAudioCopyAllTokens(self):
if self.__targetMediaDescriptor.getTrackDescriptors(trackType=TrackType.AUDIO):
return ["-c:a", "copy"]
return []
def generateSubtitleCopyTokens(self, subIndex):
return [f"-c:s:{int(subIndex)}", 'copy']
@@ -292,6 +310,12 @@ class FfxController():
return audioTokens
def generateAudioProcessingTokens(self):
if self.__context.get('copy_audio', False):
return self.generateAudioCopyAllTokens()
return self.generateAudioEncodingTokens()
def runJob(self,
sourcePath,
targetPath,
@@ -305,6 +329,8 @@ class FfxController():
videoEncoder: VideoEncoder = self.__context.get('video_encoder', VideoEncoder.VP9)
self.__context['current_source_path'] = sourcePath
copyVideo = self.__context.get('copy_video', False) or videoEncoder == VideoEncoder.COPY
qualityFilters = [fy for fy in chainIteration if fy['identifier'] == 'quality']
@@ -315,30 +341,35 @@ class FfxController():
deinterlaceFilters = [fy for fy in chainIteration if fy['identifier'] == 'bwdif']
if qualityFilters and (quality := qualityFilters[0]['parameters']['quality']):
self.__logger.info(f"Setting quality {quality} from command line")
elif currentPattern is not None and (quality := currentPattern.quality):
self.__logger.info(f"Setting quality {quality} from pattern")
elif currentShowDescriptor is not None and (quality := currentShowDescriptor.getQuality()):
self.__logger.info(f"Setting quality {quality} from show")
if copyVideo:
quality = None
self.__context['encoding_metadata_tags'] = {}
else:
quality = (QualityFilter.DEFAULT_H264_QUALITY
if (videoEncoder == VideoEncoder.H264)
else QualityFilter.DEFAULT_VP9_QUALITY)
self.__logger.info(f"Setting quality {quality} from default")
if qualityFilters and (quality := qualityFilters[0]['parameters']['quality']):
self.__logger.info(f"Setting quality {quality} from command line")
elif currentPattern is not None and (quality := currentPattern.quality):
self.__logger.info(f"Setting quality {quality} from pattern")
elif currentShowDescriptor is not None and (quality := currentShowDescriptor.getQuality()):
self.__logger.info(f"Setting quality {quality} from show")
else:
quality = (QualityFilter.DEFAULT_H264_QUALITY
if (videoEncoder == VideoEncoder.H264)
else QualityFilter.DEFAULT_VP9_QUALITY)
self.__logger.info(f"Setting quality {quality} from default")
preset = presetFilters[0]['parameters']['preset'] if presetFilters else PresetFilter.DEFAULT_PRESET
self.__context['encoding_metadata_tags'] = self.generateEncodingMetadataTags(
videoEncoder,
quality,
preset,
)
if not copyVideo:
self.__context['encoding_metadata_tags'] = self.generateEncodingMetadataTags(
videoEncoder,
quality,
preset,
)
filterParamTokens = []
if cropArguments:
if cropArguments and not copyVideo:
cropParams = (f"crop="
+ f"{cropArguments[CropFilter.OUTPUT_WIDTH_KEY]}"
@@ -348,8 +379,9 @@ class FfxController():
filterParamTokens.append(cropParams)
filterParamTokens.extend(denoiseFilters[0]['tokens'] if denoiseFilters else [])
filterParamTokens.extend(deinterlaceFilters[0]['tokens'] if deinterlaceFilters else [])
if not copyVideo:
filterParamTokens.extend(denoiseFilters[0]['tokens'] if denoiseFilters else [])
filterParamTokens.extend(deinterlaceFilters[0]['tokens'] if deinterlaceFilters else [])
deinterlaceFilters
@@ -380,6 +412,29 @@ class FfxController():
self.executeCommandSequence(commandSequence)
return
if copyVideo:
commandSequence = (commandTokens
+ self.__targetMediaDescriptor.getImportFileTokens()
+ self.__targetMediaDescriptor.getInputMappingTokens(sourceMediaDescriptor = self.__sourceMediaDescriptor)
+ self.__mdcs.generateDispositionTokens())
commandSequence += self.__mdcs.generateMetadataTokens()
commandSequence += self.generateVideoCopyAllTokens()
commandSequence += self.generateAudioProcessingTokens()
if self.__context['perform_cut']:
commandSequence += self.generateCropTokens()
commandSequence += self.generateOutputTokens(targetPath,
targetFormat)
self.__logger.debug("FfxController.runJob(): Running command sequence")
if not self.__context['dry_run']:
self.executeCommandSequence(commandSequence)
return
if videoEncoder == VideoEncoder.AV1:
commandSequence = (commandTokens
@@ -396,7 +451,7 @@ class FfxController():
if td.getCodec != TrackCodec.PNG:
commandSequence += self.generateAV1Tokens(int(quality), int(preset))
commandSequence += self.generateAudioEncodingTokens()
commandSequence += self.generateAudioProcessingTokens()
if self.__context['perform_cut']:
commandSequence += self.generateCropTokens()
@@ -426,7 +481,7 @@ class FfxController():
if td.getCodec != TrackCodec.PNG:
commandSequence += self.generateH264Tokens(int(quality))
commandSequence += self.generateAudioEncodingTokens()
commandSequence += self.generateAudioProcessingTokens()
if self.__context['perform_cut']:
commandSequence += self.generateCropTokens()
@@ -485,7 +540,7 @@ class FfxController():
if td.getCodec != TrackCodec.PNG:
commandSequence2 += self.generateVP9Pass2Tokens(int(quality))
commandSequence2 += self.generateAudioEncodingTokens()
commandSequence2 += self.generateAudioProcessingTokens()
if self.__context['perform_cut']:
commandSequence2 += self.generateCropTokens()

View File

@@ -2,6 +2,7 @@ import os, re, click
from typing import List, Self
from ffx.attachment_format import AttachmentFormat
from ffx.track_type import TrackType
from ffx.iso_language import IsoLanguage
@@ -421,11 +422,11 @@ class MediaDescriptor:
if sourceMediaDescriptor:
fontDescriptors = [ftd for ftd in sourceMediaDescriptor.getAttachmentTracks()
if ftd.getCodec() == TrackCodec.TTF]
if ftd.getAttachmentFormat() == AttachmentFormat.TTF]
else:
fontDescriptors = [ftd for ftd in self.__trackDescriptors
if ftd.getType() == TrackType.ATTACHMENT
and ftd.getCodec() == TrackCodec.TTF]
and ftd.getAttachmentFormat() == AttachmentFormat.TTF]
for ad in sorted(fontDescriptors, key=lambda d: d.getIndex()):
inputMappingTokens += ["-map", f"0:{ad.getIndex()}"]

View File

@@ -6,6 +6,7 @@ from textual.screen import Screen
from textual.widgets import DataTable
from textual.widgets._data_table import CellDoesNotExist
from ffx.attachment_format import AttachmentFormat
from ffx.audio_layout import AudioLayout
from ffx.file_properties import FileProperties
from ffx.helper import DIFF_ADDED_KEY, DIFF_CHANGED_KEY, DIFF_REMOVED_KEY
@@ -125,6 +126,32 @@ class MediaWorkflowScreenBase(Screen):
add_auto_table_column(self.differencesTable, t(self.DIFFERENCES_COLUMN_LABEL))
self.differencesTable.cursor_type = "row"
def _track_codec_cell_value(self, trackDescriptor: TrackDescriptor) -> str:
if trackDescriptor.getType() == TrackType.ATTACHMENT:
attachmentFormat = trackDescriptor.getAttachmentFormat()
if attachmentFormat == AttachmentFormat.UNKNOWN:
return attachmentFormat.identifier()
return attachmentFormat.label()
return trackDescriptor.getFormatDescriptor().label()
def _track_language_cell_value(self, trackDescriptor: TrackDescriptor) -> str:
if trackDescriptor.getType() == TrackType.ATTACHMENT:
return " "
return trackDescriptor.getLanguage().label()
def _track_disposition_cell_value(
self,
trackDescriptor: TrackDescriptor,
disposition: TrackDisposition,
) -> str:
if trackDescriptor.getType() == TrackType.ATTACHMENT:
return " "
return (
t("Yes")
if disposition in trackDescriptor.getDispositionSet()
else t("No")
)
def reloadProperties(self, reset_draft: bool = True):
self._mediaFileProperties = FileProperties(self.context, self._mediaFilename)
probedMediaDescriptor = self._mediaFileProperties.getMediaDescriptor()
@@ -221,15 +248,21 @@ class MediaWorkflowScreenBase(Screen):
trackDescriptor.getIndex(),
t(trackType.label()),
typeCounter[trackType],
trackDescriptor.getCodec().label(),
self._track_codec_cell_value(trackDescriptor),
t(audioLayout.label())
if trackType == TrackType.AUDIO
and audioLayout != AudioLayout.LAYOUT_UNDEFINED
else " ",
trackDescriptor.getLanguage().label(),
self._track_language_cell_value(trackDescriptor),
trackTitle,
t("Yes") if TrackDisposition.DEFAULT in dispositionSet else t("No"),
t("Yes") if TrackDisposition.FORCED in dispositionSet else t("No"),
self._track_disposition_cell_value(
trackDescriptor,
TrackDisposition.DEFAULT,
),
self._track_disposition_cell_value(
trackDescriptor,
TrackDisposition.FORCED,
),
)
row_key = self.tracksTable.add_row(*map(str, row))

View File

@@ -4,6 +4,7 @@ from sqlalchemy.orm import relationship, declarative_base, sessionmaker
from .show import Base
from ffx.attachment_format import AttachmentFormat
from ffx.track_type import TrackType
from ffx.iso_language import IsoLanguage
@@ -132,9 +133,16 @@ class Track(Base):
if trackType in [t.label() for t in TrackType]:
if trackType == TrackType.ATTACHMENT.label():
storedFormatIdentifier = AttachmentFormat.identifyFfprobeStream(streamObj).identifier()
else:
storedFormatIdentifier = TrackCodec.identify(
streamObj.get(TrackDescriptor.FFPROBE_CODEC_KEY)
).identifier()
return cls(pattern_id = patternId,
track_type = trackType,
codec_name = streamObj[TrackDescriptor.FFPROBE_CODEC_NAME_KEY],
codec_name = storedFormatIdentifier,
disposition_flags = sum([2**t.index() for (k,v) in streamObj[TrackDescriptor.FFPROBE_DISPOSITION_KEY].items()
if v and (t := TrackDisposition.find(k)) is not None]),
audio_layout = AudioLayout.identify(streamObj))
@@ -153,8 +161,20 @@ class Track(Base):
return TrackType.fromIndex(self.track_type)
def getCodec(self) -> TrackCodec:
if self.getType() == TrackType.ATTACHMENT:
return TrackCodec.UNKNOWN
return TrackCodec.identify(self.codec_name)
def getAttachmentFormat(self) -> AttachmentFormat:
if self.getType() != TrackType.ATTACHMENT:
return AttachmentFormat.UNKNOWN
return AttachmentFormat.identify(self.codec_name)
def getFormatDescriptor(self):
if self.getType() == TrackType.ATTACHMENT:
return self.getAttachmentFormat()
return self.getCodec()
def getIndex(self):
return int(self.index) if self.index is not None else -1
@@ -206,7 +226,10 @@ class Track(Base):
kwargs[TrackDescriptor.SUB_INDEX_KEY] = subIndex
kwargs[TrackDescriptor.TRACK_TYPE_KEY] = self.getType()
kwargs[TrackDescriptor.CODEC_KEY] = self.getCodec()
if self.getType() == TrackType.ATTACHMENT:
kwargs[TrackDescriptor.ATTACHMENT_FORMAT_KEY] = self.getAttachmentFormat()
else:
kwargs[TrackDescriptor.CODEC_KEY] = self.getCodec()
kwargs[TrackDescriptor.DISPOSITION_SET_KEY] = self.getDispositionSet()
kwargs[TrackDescriptor.TAGS_KEY] = self.getTags()

View File

@@ -134,7 +134,7 @@ class PatternController:
def _build_track_row(self, trackDescriptor: TrackDescriptor) -> Track:
track = Track(
track_type=int(trackDescriptor.getType().index()),
codec_name=str(trackDescriptor.getCodec().identifier()),
codec_name=str(trackDescriptor.getFormatDescriptor().identifier()),
index=int(trackDescriptor.getIndex()),
source_index=int(trackDescriptor.getSourceIndex()),
disposition_flags=int(

View File

@@ -88,6 +88,9 @@ class PatternDetailsScreen(Screen):
.three {
column-span: 3;
}
.two {
column-span: 2;
}
.four {
column-span: 4;
@@ -114,7 +117,7 @@ class PatternDetailsScreen(Screen):
}
.yellow {
tint: yellow 40%;
color: yellow;
}
"""
@@ -175,7 +178,7 @@ class PatternDetailsScreen(Screen):
row = (td.getIndex(),
t(trackType.label()),
typeCounter[trackType],
td.getCodec().label(),
td.getFormatDescriptor().label(),
t(audioLayout.label()) if trackType == TrackType.AUDIO
and audioLayout != AudioLayout.LAYOUT_UNDEFINED else ' ',
trackLanguage.label() if trackLanguage != IsoLanguage.UNDEFINED else ' ',
@@ -331,6 +334,7 @@ class PatternDetailsScreen(Screen):
if not self.__showDescriptor is None:
self.query_one("#showlabel", Static).update(f"{self.__showDescriptor.getId()} - {self.__showDescriptor.getName()} ({self.__showDescriptor.getYear()})")
self.updateShowQualityHint()
if self.__pattern is not None:
@@ -350,6 +354,7 @@ class PatternDetailsScreen(Screen):
if not hasattr(self, "tracksTable") or not hasattr(self, "tagsTable"):
return
self.updateShowQualityHint()
self.updateTags()
self.updateTracks()
@@ -415,7 +420,9 @@ class PatternDetailsScreen(Screen):
# Row 4
yield Static(t("Quality"))
yield Input(type="integer", id="quality_input")
yield Static(' ', classes="five")
yield Static(" ")
yield Static("", id="show_quality_hint", classes="two yellow")
yield Static(' ', classes="two")
# Row 5
@@ -504,6 +511,23 @@ class PatternDetailsScreen(Screen):
def getPatternFromInput(self):
return str(self.query_one("#pattern_input", Input).value)
def getShowQualityHintText(self):
if self.__showDescriptor is None:
return ""
showQuality = int(self.__showDescriptor.getQuality() or 0)
if showQuality <= 0:
return ""
patternQuality = int(getattr(self.__pattern, "quality", 0) or 0)
if patternQuality > 0:
return ""
return f"{t('Show')}: {showQuality}"
def updateShowQualityHint(self):
self.query_one("#show_quality_hint", Static).update(self.getShowQualityHintText())
def getQualityFromInput(self):
try:
return int(self.query_one("#quality_input", Input).value)

View File

@@ -1,7 +1,10 @@
import os
import shlex
import signal
import subprocess
from typing import Iterable, List
import threading
import time
from typing import Callable, Iterable, List
from .logging_utils import get_ffx_logger
@@ -118,6 +121,8 @@ def executeProcess(
directory: str = None,
context: dict = None,
timeoutSeconds: float = None,
stdoutLineHandler: Callable[[str], bool] | None = None,
stderrLineHandler: Callable[[str], bool] | None = None,
):
logger = context['logger'] if context is not None and 'logger' in context else get_ffx_logger()
@@ -131,6 +136,16 @@ def executeProcess(
formatCommandSequence(wrappedCommandSequence),
)
if stdoutLineHandler is not None or stderrLineHandler is not None:
return executeStreamingProcess(
wrappedCommandSequence,
directory=directory,
logger=logger,
timeoutSeconds=timeoutSeconds,
stdoutLineHandler=stdoutLineHandler,
stderrLineHandler=stderrLineHandler,
)
try:
completed = subprocess.run(
wrappedCommandSequence,
@@ -167,3 +182,162 @@ def executeProcess(
)
return completed.stdout, completed.stderr, completed.returncode
def terminateProcess(process: subprocess.Popen, *, killAfterSeconds: float = 1.0) -> None:
if process.poll() is not None:
return
try:
if hasattr(os, "killpg"):
os.killpg(process.pid, signal.SIGTERM)
else:
process.terminate()
except ProcessLookupError:
return
deadline = time.monotonic() + killAfterSeconds
while process.poll() is None and time.monotonic() < deadline:
time.sleep(0.05)
if process.poll() is not None:
return
try:
if hasattr(os, "killpg"):
os.killpg(process.pid, signal.SIGKILL)
else:
process.kill()
except ProcessLookupError:
return
def readProcessStream(
stream,
outputParts: list[str],
lineHandler: Callable[[str], bool] | None,
stopRequested: threading.Event,
logger,
) -> None:
try:
for line in iter(stream.readline, ''):
outputParts.append(line)
if lineHandler is None:
continue
try:
if lineHandler(line):
stopRequested.set()
except Exception:
logger.exception("Process line handler raised an exception")
finally:
stream.close()
def executeStreamingProcess(
commandSequence: List[str],
*,
directory: str = None,
logger = None,
timeoutSeconds: float = None,
stdoutLineHandler: Callable[[str], bool] | None = None,
stderrLineHandler: Callable[[str], bool] | None = None,
):
logger = logger or get_ffx_logger()
try:
process = subprocess.Popen(
commandSequence,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
cwd=directory,
bufsize=1,
start_new_session=True,
)
except FileNotFoundError as ex:
error = (
"Command not found while running "
+ f"{formatCommandSequence(commandSequence)}: {ex.filename or ex}"
)
logger.error(error)
return '', error, COMMAND_NOT_FOUND_RETURN_CODE
stdoutParts: list[str] = []
stderrParts: list[str] = []
stopRequested = threading.Event()
timedOut = False
stdoutThread = threading.Thread(
target=readProcessStream,
args=(
process.stdout,
stdoutParts,
stdoutLineHandler,
stopRequested,
logger,
),
daemon=True,
)
stderrThread = threading.Thread(
target=readProcessStream,
args=(
process.stderr,
stderrParts,
stderrLineHandler,
stopRequested,
logger,
),
daemon=True,
)
stdoutThread.start()
stderrThread.start()
deadline = (
time.monotonic() + float(timeoutSeconds)
if timeoutSeconds is not None
else None
)
terminationRequested = False
while process.poll() is None:
if stopRequested.is_set():
terminationRequested = True
terminateProcess(process)
break
if deadline is not None and time.monotonic() >= deadline:
timedOut = True
terminationRequested = True
terminateProcess(process)
break
time.sleep(0.05)
returnCode = process.wait()
stdoutThread.join()
stderrThread.join()
stdout = ''.join(stdoutParts)
stderr = ''.join(stderrParts)
if timedOut:
error = (
f"Command timed out after {timeoutSeconds} seconds while running "
+ formatCommandSequence(commandSequence)
)
if stderr:
error = f"{error}\n{stderr}"
logger.error(error)
return stdout, error, COMMAND_TIMED_OUT_RETURN_CODE
if returnCode != 0 and not terminationRequested:
logger.warning(
"executeProcess() rc=%s command=%s",
returnCode,
formatCommandSequence(commandSequence),
)
return stdout, stderr, returnCode

View File

@@ -19,7 +19,6 @@ class TrackCodec(Enum):
WEBVTT = {'identifier': 'webvtt', 'format': 'webvtt', 'extension': 'vtt' , 'label': 'WebVTT'}
SRT = {'identifier': 'subrip', 'format': 'srt', 'extension': 'srt' , 'label': 'SRT'}
ASS = {'identifier': 'ass', 'format': 'ass', 'extension': 'ass' , 'label': 'ASS'}
TTF = {'identifier': 'ttf', 'format': None, 'extension': 'ttf' , 'label': 'TTF'}
PGS = {'identifier': 'hdmv_pgs_subtitle', 'format': 'sup', 'extension': 'sup' , 'label': 'PGS'}
VOBSUB = {'identifier': 'dvd_subtitle', 'format': None, 'extension': 'mkv' , 'label': 'VobSub'}

View File

@@ -43,7 +43,7 @@ class TrackController():
s = self.Session()
track = Track(pattern_id = patId,
track_type = int(trackDescriptor.getType().index()),
codec_name = str(trackDescriptor.getCodec().identifier()),
codec_name = str(trackDescriptor.getFormatDescriptor().identifier()),
index = int(trackDescriptor.getIndex()),
source_index = int(trackDescriptor.getSourceIndex()),
disposition_flags = int(TrackDisposition.toFlags(trackDescriptor.getDispositionSet())),
@@ -82,7 +82,7 @@ class TrackController():
track.index = int(trackDescriptor.getIndex())
track.track_type = int(trackDescriptor.getType().index())
track.codec_name = str(trackDescriptor.getCodec().identifier())
track.codec_name = str(trackDescriptor.getFormatDescriptor().identifier())
track.audio_layout = int(trackDescriptor.getAudioLayout().index())
track.disposition_flags = int(TrackDisposition.toFlags(trackDescriptor.getDispositionSet()))

View File

@@ -1,5 +1,6 @@
from typing import Self
from .attachment_format import AttachmentFormat
from .iso_language import IsoLanguage
from .track_type import TrackType
from .audio_layout import AudioLayout
@@ -26,6 +27,7 @@ class TrackDescriptor:
TRACK_TYPE_KEY = "track_type"
CODEC_KEY = "codec_name"
ATTACHMENT_FORMAT_KEY = "attachment_format"
AUDIO_LAYOUT_KEY = "audio_layout"
FFPROBE_INDEX_KEY = "index"
@@ -110,15 +112,6 @@ class TrackDescriptor:
else:
self.__trackType = TrackType.UNKNOWN
if TrackDescriptor.CODEC_KEY in kwargs.keys():
if type(kwargs[TrackDescriptor.CODEC_KEY]) is not TrackCodec:
raise TypeError(
f"TrackDesciptor.__init__(): Argument {TrackDescriptor.CODEC_KEY} is required to be of type TrackCodec"
)
self.__trackCodec = kwargs[TrackDescriptor.CODEC_KEY]
else:
self.__trackCodec = TrackCodec.UNKNOWN
if TrackDescriptor.TAGS_KEY in kwargs.keys():
if type(kwargs[TrackDescriptor.TAGS_KEY]) is not dict:
raise TypeError(
@@ -151,6 +144,34 @@ class TrackDescriptor:
else:
self.__audioLayout = AudioLayout.LAYOUT_UNDEFINED
self.__trackCodec = TrackCodec.UNKNOWN
self.__attachmentFormat = AttachmentFormat.UNKNOWN
if self.__trackType == TrackType.ATTACHMENT:
if TrackDescriptor.ATTACHMENT_FORMAT_KEY in kwargs.keys():
if type(kwargs[TrackDescriptor.ATTACHMENT_FORMAT_KEY]) is not AttachmentFormat:
raise TypeError(
f"TrackDesciptor.__init__(): Argument {TrackDescriptor.ATTACHMENT_FORMAT_KEY} is required to be of type AttachmentFormat"
)
self.__attachmentFormat = kwargs[TrackDescriptor.ATTACHMENT_FORMAT_KEY]
elif TrackDescriptor.CODEC_KEY in kwargs.keys():
legacyCodec = kwargs[TrackDescriptor.CODEC_KEY]
if type(legacyCodec) is AttachmentFormat:
self.__attachmentFormat = legacyCodec
elif type(legacyCodec) is TrackCodec:
self.__attachmentFormat = AttachmentFormat.fromTrackCodec(legacyCodec)
else:
raise TypeError(
f"TrackDesciptor.__init__(): Argument {TrackDescriptor.CODEC_KEY} is required to be of type TrackCodec for legacy attachment compatibility"
)
else:
if TrackDescriptor.CODEC_KEY in kwargs.keys():
if type(kwargs[TrackDescriptor.CODEC_KEY]) is not TrackCodec:
raise TypeError(
f"TrackDesciptor.__init__(): Argument {TrackDescriptor.CODEC_KEY} is required to be of type TrackCodec"
)
self.__trackCodec = kwargs[TrackDescriptor.CODEC_KEY]
@classmethod
def fromFfprobe(cls, streamObj, subIndex: int = -1):
"""Processes ffprobe stream data as array with elements according to the following example
@@ -215,7 +236,12 @@ class TrackDescriptor:
kwargs[TrackDescriptor.TRACK_TYPE_KEY] = trackType
kwargs[TrackDescriptor.CODEC_KEY] = TrackCodec.identify(streamObj[TrackDescriptor.FFPROBE_CODEC_KEY])
if trackType == TrackType.ATTACHMENT:
kwargs[TrackDescriptor.ATTACHMENT_FORMAT_KEY] = AttachmentFormat.identifyFfprobeStream(streamObj)
else:
kwargs[TrackDescriptor.CODEC_KEY] = TrackCodec.identify(
streamObj.get(TrackDescriptor.FFPROBE_CODEC_KEY)
)
kwargs[TrackDescriptor.DISPOSITION_SET_KEY] = (
{
@@ -277,6 +303,14 @@ class TrackDescriptor:
def getCodec(self) -> TrackCodec:
return self.__trackCodec
def getAttachmentFormat(self) -> AttachmentFormat:
return self.__attachmentFormat
def getFormatDescriptor(self):
if self.__trackType == TrackType.ATTACHMENT:
return self.__attachmentFormat
return self.__trackCodec
def getLanguage(self):
if "language" in self.__trackTags.keys():
return IsoLanguage.findThreeLetter(self.__trackTags["language"])
@@ -353,12 +387,16 @@ class TrackDescriptor:
TrackDescriptor.SOURCE_INDEX_KEY: int(self.__sourceIndex),
TrackDescriptor.SUB_INDEX_KEY: int(self.__subIndex),
TrackDescriptor.TRACK_TYPE_KEY: self.__trackType,
TrackDescriptor.CODEC_KEY: self.__trackCodec,
TrackDescriptor.TAGS_KEY: dict(self.__trackTags),
TrackDescriptor.DISPOSITION_SET_KEY: set(self.__dispositionSet),
TrackDescriptor.AUDIO_LAYOUT_KEY: self.__audioLayout,
}
if self.__trackType == TrackType.ATTACHMENT:
kwargs[TrackDescriptor.ATTACHMENT_FORMAT_KEY] = self.__attachmentFormat
else:
kwargs[TrackDescriptor.CODEC_KEY] = self.__trackCodec
if context is not None:
kwargs[TrackDescriptor.CONTEXT_KEY] = context
elif self.__context:

View File

@@ -5,6 +5,7 @@ from textual.widgets import Header, Footer, Static, Button, SelectionList, Selec
from textual.containers import Grid
from textual.widgets._data_table import CellDoesNotExist
from .attachment_format import AttachmentFormat
from .audio_layout import AudioLayout
from .iso_language import IsoLanguage
from .tag_delete_screen import TagDeleteScreen
@@ -141,6 +142,7 @@ class TrackDetailsScreen(Screen):
if self.__isNew:
self.__trackType = trackType
self.__trackCodec = TrackCodec.UNKNOWN
self.__attachmentFormat = AttachmentFormat.UNKNOWN
self.__audioLayout = AudioLayout.LAYOUT_UNDEFINED
self.__index = index
self.__subIndex = subIndex
@@ -150,6 +152,7 @@ class TrackDetailsScreen(Screen):
else:
self.__trackType = trackDescriptor.getType()
self.__trackCodec = trackDescriptor.getCodec()
self.__attachmentFormat = trackDescriptor.getAttachmentFormat()
self.__audioLayout = trackDescriptor.getAudioLayout()
self.__index = trackDescriptor.getIndex()
self.__subIndex = trackDescriptor.getSubIndex()
@@ -433,7 +436,10 @@ class TrackDetailsScreen(Screen):
if not isinstance(selectedTrackType, TrackType):
selectedTrackType = TrackType.UNKNOWN
kwargs[TrackDescriptor.TRACK_TYPE_KEY] = selectedTrackType
kwargs[TrackDescriptor.CODEC_KEY] = self.__trackCodec
if selectedTrackType == TrackType.ATTACHMENT:
kwargs[TrackDescriptor.ATTACHMENT_FORMAT_KEY] = self.__attachmentFormat
else:
kwargs[TrackDescriptor.CODEC_KEY] = self.__trackCodec
if selectedTrackType == TrackType.AUDIO:
selectedAudioLayout = self.query_one("#audio_layout_select", Select).value

View File

@@ -0,0 +1,211 @@
from __future__ import annotations
import os
from pathlib import Path
import sys
import tempfile
import unittest
from unittest.mock import patch
from click.testing import CliRunner
SRC_ROOT = Path(__file__).resolve().parents[2] / "src"
if str(SRC_ROOT) not in sys.path:
sys.path.insert(0, str(SRC_ROOT))
from ffx import cli # noqa: E402
from ffx.diagnostics import FfmpegSkipFileWarning, recordUnremediedIssue # noqa: E402
from ffx.logging_utils import get_ffx_logger # noqa: E402
class _FakeMediaDescriptor:
def getVideoTracks(self):
return []
def getAudioTracks(self):
return []
def getSubtitleTracks(self):
return []
def getAttachmentTracks(self):
return []
def applyOverrides(self, overrides):
return None
class _FakeFileProperties:
def __init__(self, context, source_path):
self.source_path = source_path
def getShowId(self):
return -1
def getSeason(self):
return -1
def getEpisode(self):
return -1
def getMediaDescriptor(self):
return _FakeMediaDescriptor()
def getPattern(self):
return None
class _FakeShiftedSeasonController:
def __init__(self, context):
self.context = context
def shiftSeason(self, show_id, season, episode, patternId=None):
return season, episode
class _FakeShowController:
def __init__(self, context):
self.context = context
def getShowDescriptor(self, show_id):
return None
class _FakeFfxController:
calls: list[str] = []
mode = "skip_first"
def __init__(self, context, *args, **kwargs):
self.context = context
def runJob(self, sourcePath, *args, **kwargs):
self.calls.append(sourcePath)
if self.mode == "clean":
return
if self.mode == "warn_unhandled" and sourcePath.endswith("episode1.avi"):
recordUnremediedIssue(
self.context,
sourcePath,
"unhandled-warning",
)
return
if self.mode == "skip_first" and sourcePath.endswith("episode1.avi"):
message = (
f"Skipping file {sourcePath}: ffmpeg still reported unset packet "
+ "timestamps after retry with -fflags +genpts."
)
recordUnremediedIssue(
self.context,
sourcePath,
"retry-with-generated-pts",
)
self.context["logger"].warning(message)
raise FfmpegSkipFileWarning(message)
class ConvertDiagnosticCliTests(unittest.TestCase):
def setUp(self):
logger = get_ffx_logger()
for handler in list(logger.handlers):
logger.removeHandler(handler)
try:
handler.close()
except Exception:
pass
self.tempdir = tempfile.TemporaryDirectory()
self.home_dir = Path(self.tempdir.name) / "home"
self.home_dir.mkdir()
self.database_path = Path(self.tempdir.name) / "test.db"
self.source_dir = Path(self.tempdir.name) / "source"
self.source_dir.mkdir()
self.source_one = self.source_dir / "episode1.avi"
self.source_two = self.source_dir / "episode2.avi"
self.source_one.write_bytes(b"one")
self.source_two.write_bytes(b"two")
_FakeFfxController.calls = []
_FakeFfxController.mode = "skip_first"
def tearDown(self):
self.tempdir.cleanup()
def test_convert_continues_after_skipping_one_file_due_to_ffmpeg_diagnostic(self):
runner = CliRunner()
with (
patch("ffx.file_properties.FileProperties", _FakeFileProperties),
patch("ffx.ffx_controller.FfxController", _FakeFfxController),
patch(
"ffx.shifted_season_controller.ShiftedSeasonController",
_FakeShiftedSeasonController,
),
patch("ffx.show_controller.ShowController", _FakeShowController),
):
result = runner.invoke(
cli.ffx,
[
"--database-file",
str(self.database_path),
"convert",
"--no-tmdb",
"--no-pattern",
str(self.source_one),
str(self.source_two),
],
env={**os.environ, "HOME": str(self.home_dir)},
)
self.assertEqual(0, result.exit_code, result.output)
self.assertEqual(
[str(self.source_one), str(self.source_two)],
_FakeFfxController.calls,
)
self.assertIn("Skipping file", result.output)
self.assertIn("-fflags +genpts", result.output)
self.assertIn("Files with ffmpeg findings that require review:", result.output)
self.assertIn(
"episode1.avi: retry-with-generated-pts",
result.output,
)
def test_convert_prints_clean_summary_when_no_unremedied_issues_were_seen(self):
runner = CliRunner()
_FakeFfxController.mode = "clean"
with (
patch("ffx.file_properties.FileProperties", _FakeFileProperties),
patch("ffx.ffx_controller.FfxController", _FakeFfxController),
patch(
"ffx.shifted_season_controller.ShiftedSeasonController",
_FakeShiftedSeasonController,
),
patch("ffx.show_controller.ShowController", _FakeShowController),
):
result = runner.invoke(
cli.ffx,
[
"--database-file",
str(self.database_path),
"convert",
"--no-tmdb",
"--no-pattern",
str(self.source_one),
str(self.source_two),
],
env={**os.environ, "HOME": str(self.home_dir)},
)
self.assertEqual(0, result.exit_code, result.output)
self.assertIn(
"All files converted with no issues.",
result.output,
)
if __name__ == "__main__":
unittest.main()

View File

@@ -263,6 +263,47 @@ class CliLazyImportTests(unittest.TestCase):
result["modules"],
)
def test_convert_copy_flags_parse_without_loading_runtime_modules(self):
result = self.run_python(
textwrap.dedent(
f"""
import click
import json
import sys
sys.path.insert(0, {str(SRC_ROOT)!r})
import ffx.cli
context = ffx.cli.convert.make_context(
"convert",
["--copy-video", "--copy-audio"],
resilient_parsing=True,
)
help_output = ffx.cli.convert.get_help(click.Context(ffx.cli.convert))
print(json.dumps({{
"copy_video": context.params["copy_video"],
"copy_audio": context.params["copy_audio"],
"output": help_output,
"modules": {{
module_name: module_name in sys.modules
for module_name in {HEAVY_MODULES!r}
}},
}}))
"""
)
)
self.assertTrue(result["copy_video"])
self.assertTrue(result["copy_audio"])
self.assertIn("--copy-video", result["output"])
self.assertIn("--copy-audio", result["output"])
self.assertTrue(
all(not is_loaded for is_loaded in result["modules"].values()),
result["modules"],
)
def test_edit_command_avoids_database_bootstrap(self):
result = self.run_python(
textwrap.dedent(

View File

@@ -68,11 +68,14 @@ class UpgradeCommandTests(unittest.TestCase):
subprocess_calls.append((args, kwargs))
if args == ['git', 'status', '--porcelain', '--untracked-files=no']:
return self.make_completed(args, stdout="M src/ffx/constants.py\n")
if args == ['git', 'rev-parse', '--abbrev-ref', 'HEAD']:
return self.make_completed(args, stdout="main\n")
return self.make_completed(args)
with (
patch.object(cli, "getBundleRepoPath", return_value=repo_path),
patch.object(cli, "getBundlePipPath", return_value=pip_path),
patch.object(cli, "getBundleVersion", return_value="0.3.2"),
patch.object(cli.os.path, "isdir", return_value=True),
patch.object(cli.os.path, "isfile", return_value=True),
patch.object(cli.subprocess, "run", side_effect=fake_run),
@@ -81,6 +84,7 @@ class UpgradeCommandTests(unittest.TestCase):
self.assertEqual(0, result.exit_code, result.output)
self.assertIn("Tracked local changes detected in the bundle repository:", result.output)
self.assertIn("Updated FFX to version 0.3.2 from branch main.", result.output)
self.assertEqual(
[
['git', 'status', '--porcelain', '--untracked-files=no'],
@@ -89,6 +93,7 @@ class UpgradeCommandTests(unittest.TestCase):
['git', 'checkout', '-B', 'main', 'FETCH_HEAD'],
[pip_path, 'install', '--upgrade', 'pip', 'setuptools', 'wheel'],
[pip_path, 'install', '--editable', '.'],
['git', 'rev-parse', '--abbrev-ref', 'HEAD'],
],
[call[0] for call in subprocess_calls],
)
@@ -106,11 +111,14 @@ class UpgradeCommandTests(unittest.TestCase):
subprocess_calls.append((args, kwargs))
if args == ['git', 'status', '--porcelain', '--untracked-files=no']:
return self.make_completed(args, stdout="")
if args == ['git', 'rev-parse', '--abbrev-ref', 'HEAD']:
return self.make_completed(args, stdout="develop\n")
return self.make_completed(args)
with (
patch.object(cli, "getBundleRepoPath", return_value=repo_path),
patch.object(cli, "getBundlePipPath", return_value=pip_path),
patch.object(cli, "getBundleVersion", return_value="0.3.3"),
patch.object(cli.os.path, "isdir", return_value=True),
patch.object(cli.os.path, "isfile", return_value=True),
patch.object(cli.subprocess, "run", side_effect=fake_run),
@@ -118,12 +126,14 @@ class UpgradeCommandTests(unittest.TestCase):
result = runner.invoke(cli.ffx, ["upgrade"])
self.assertEqual(0, result.exit_code, result.output)
self.assertIn("Updated FFX to version 0.3.3 from branch develop.", result.output)
self.assertEqual(
[
['git', 'status', '--porcelain', '--untracked-files=no'],
['git', 'pull'],
[pip_path, 'install', '--upgrade', 'pip', 'setuptools', 'wheel'],
[pip_path, 'install', '--editable', '.'],
['git', 'rev-parse', '--abbrev-ref', 'HEAD'],
],
[call[0] for call in subprocess_calls],
)

View File

@@ -0,0 +1,196 @@
from __future__ import annotations
from pathlib import Path
import sys
import unittest
from unittest.mock import patch
SRC_ROOT = Path(__file__).resolve().parents[2] / "src"
if str(SRC_ROOT) not in sys.path:
sys.path.insert(0, str(SRC_ROOT))
from ffx.diagnostics import ( # noqa: E402
FfmpegCommandRunner,
FfmpegDiagnosticMonitor,
FfmpegSkipFileWarning,
getUnremediedIssues,
iterUnremediedIssueSummaryLines,
)
class RecordingLogger:
def __init__(self):
self.messages: list[str] = []
def warning(self, message, *args, **kwargs):
if args:
message = message % args
self.messages.append(str(message))
class FfmpegDiagnosticsTests(unittest.TestCase):
def test_command_runner_retries_with_genpts_after_timestamp_warning(self):
logger = RecordingLogger()
context = {
"logger": logger,
"current_source_path": "tests/assets/avi/conan_S01E754_amalgam.avi",
}
runner = FfmpegCommandRunner(context)
commands = []
def fake_execute(commandSequence, **kwargs):
commands.append(list(commandSequence))
stderrLineHandler = kwargs["stderrLineHandler"]
if len(commands) == 1:
self.assertTrue(
stderrLineHandler(
"[matroska @ 0x1] Timestamps are unset in a packet for stream 0. "
+ "This is deprecated and will stop working in the future."
)
)
return "", "timestamp warning\n", -15
return "done", "", 0
with patch("ffx.diagnostics.monitor.executeProcess", side_effect=fake_execute):
out, err, rc = runner.execute(["ffmpeg", "-y", "-i", "input.avi", "output.mkv"])
self.assertEqual("done", out)
self.assertEqual("", err)
self.assertEqual(0, rc)
self.assertEqual(
[
["ffmpeg", "-y", "-i", "input.avi", "output.mkv"],
["ffmpeg", "-fflags", "+genpts", "-y", "-i", "input.avi", "output.mkv"],
],
commands,
)
self.assertEqual(
[
"ffmpeg reported unset packet timestamps for tests/assets/avi/conan_S01E754_amalgam.avi. "
+ "Stopping early and retrying with -fflags +genpts."
],
logger.messages,
)
self.assertEqual({}, getUnremediedIssues(context))
def test_command_runner_skips_file_when_timestamp_warning_persists_after_genpts(self):
logger = RecordingLogger()
context = {
"logger": logger,
"current_source_path": "tests/assets/avi/conan_S01E754_amalgam.avi",
}
runner = FfmpegCommandRunner(context)
def fake_execute(commandSequence, **kwargs):
stderrLineHandler = kwargs["stderrLineHandler"]
self.assertTrue(
stderrLineHandler(
"[matroska @ 0x1] Timestamps are unset in a packet for stream 0. "
+ "This is deprecated and will stop working in the future."
)
)
return "", "timestamp warning\n", -15
with patch("ffx.diagnostics.monitor.executeProcess", side_effect=fake_execute):
with self.assertRaises(FfmpegSkipFileWarning):
runner.execute(
["ffmpeg", "-fflags", "+genpts", "-y", "-i", "input.avi", "output.mkv"]
)
self.assertEqual(
[
"Skipping file tests/assets/avi/conan_S01E754_amalgam.avi: ffmpeg still reported "
+ "unset packet timestamps after retry with -fflags +genpts."
],
logger.messages,
)
self.assertEqual(
{
"tests/assets/avi/conan_S01E754_amalgam.avi": ["retry-with-generated-pts"]
},
getUnremediedIssues(context),
)
def test_monitor_tracks_non_harmless_corrupt_mpeg_audio_remedy_in_summary(self):
logger = RecordingLogger()
context = {
"logger": logger,
"current_source_path": "tests/assets/avi/conan_S01E763_amalgam.avi",
}
monitor = FfmpegDiagnosticMonitor(
context,
["ffmpeg", "-y", "-i", "input.avi", "output.mkv"],
)
self.assertFalse(
monitor.handle_stderr_line("[mp3float @ 0x1] invalid new backstep -1")
)
self.assertFalse(monitor.handle_stderr_line("[mp3float @ 0x1] invalid block type"))
self.assertFalse(
monitor.handle_stderr_line(
"[aist#0:1/mp3 @ 0x2] [dec:mp3float @ 0x3] Error submitting packet to decoder: "
+ "Invalid data found when processing input"
)
)
self.assertEqual(
[
"ffmpeg reported damaged MPEG audio frames while converting "
+ "tests/assets/avi/conan_S01E763_amalgam.avi. FFX will continue, but the "
+ "output audio may contain gaps or glitches."
],
logger.messages,
)
self.assertEqual(
{
"tests/assets/avi/conan_S01E763_amalgam.avi": ["warn-corrupt-mpeg-audio"]
},
getUnremediedIssues(context),
)
self.assertEqual(
["conan_S01E763_amalgam.avi: warn-corrupt-mpeg-audio"],
iterUnremediedIssueSummaryLines(context),
)
def test_monitor_tracks_unhandled_diagnostic_for_summary(self):
context = {
"logger": RecordingLogger(),
"current_source_path": "tests/assets/avi/example.avi",
}
monitor = FfmpegDiagnosticMonitor(
context,
["ffmpeg", "-y", "-i", "input.avi", "output.mkv"],
)
self.assertFalse(
monitor.handle_stderr_line(
"[avi @ 0x1] Strange warning with no automatic remedy is present"
)
)
self.assertEqual(
{
"tests/assets/avi/example.avi": ["unhandled-warning"]
},
getUnremediedIssues(context),
)
self.assertEqual(
["example.avi: unhandled-warning"],
iterUnremediedIssueSummaryLines(context),
)
self.assertEqual(
[
"ffmpeg reported a diagnostic with no automatic remedy while converting "
+ "tests/assets/avi/example.avi. FFX will continue, but review the output "
+ "file. First unhandled line: [avi @ 0x1] Strange warning with no automatic remedy is present"
],
context["logger"].messages,
)
if __name__ == "__main__":
unittest.main()

View File

@@ -15,6 +15,7 @@ if str(SRC_ROOT) not in sys.path:
from ffx.ffx_controller import FfxController # noqa: E402
from ffx.audio_layout import AudioLayout # noqa: E402
from ffx.logging_utils import get_ffx_logger # noqa: E402
from ffx.media_descriptor import MediaDescriptor # noqa: E402
from ffx.show_descriptor import ShowDescriptor # noqa: E402
@@ -43,6 +44,8 @@ class FfxControllerTests(unittest.TestCase):
"video_encoder": video_encoder,
"dry_run": False,
"perform_cut": False,
"copy_video": False,
"copy_audio": False,
"bitrates": {
"stereo": "112k",
"ac3": "256k",
@@ -75,6 +78,56 @@ class FfxControllerTests(unittest.TestCase):
)
return descriptor, source_descriptor
def make_media_descriptors_with_audio(
self,
audio_layout: AudioLayout = AudioLayout.LAYOUT_STEREO,
) -> tuple[MediaDescriptor, MediaDescriptor]:
descriptor = MediaDescriptor(
track_descriptors=[
TrackDescriptor(
index=0,
source_index=0,
sub_index=0,
track_type=TrackType.VIDEO,
codec_name=TrackCodec.H264,
),
TrackDescriptor(
index=1,
source_index=1,
sub_index=0,
track_type=TrackType.AUDIO,
codec_name=TrackCodec.AAC,
audio_layout=audio_layout,
),
]
)
source_descriptor = MediaDescriptor(
track_descriptors=[
TrackDescriptor(
index=0,
source_index=0,
sub_index=0,
track_type=TrackType.VIDEO,
codec_name=TrackCodec.H264,
),
TrackDescriptor(
index=1,
source_index=1,
sub_index=0,
track_type=TrackType.AUDIO,
codec_name=TrackCodec.AAC,
audio_layout=audio_layout,
),
]
)
return descriptor, source_descriptor
def assert_token_pair(self, command: list[str], first: str, second: str):
self.assertTrue(
any(command[index:index + 2] == [first, second] for index in range(len(command) - 1)),
command,
)
def test_vp9_run_job_emits_file_level_encoding_quality_metadata(self):
context = self.make_context(VideoEncoder.VP9)
target_descriptor, source_descriptor = self.make_media_descriptors()
@@ -196,6 +249,79 @@ class FfxControllerTests(unittest.TestCase):
self.assertIn("ENCODING_QUALITY=19", commands[0])
mocked_info.assert_any_call("Setting quality 19 from pattern")
def test_copy_video_uses_single_copy_command_without_video_encoding_options(self):
context = self.make_context(VideoEncoder.VP9)
context["copy_video"] = True
target_descriptor, source_descriptor = self.make_media_descriptors_with_audio()
controller = FfxController(context, target_descriptor, source_descriptor)
commands = []
with patch.object(
controller,
"executeCommandSequence",
side_effect=lambda command: commands.append(command) or ("", "", 0),
):
controller.runJob(
"input.mkv",
"output.mkv",
chainIteration=[
{
"identifier": "quality",
"parameters": {"quality": 27},
},
{
"identifier": "nlmeans",
"parameters": {},
"tokens": ["nlmeans=s=2.0"],
},
],
cropArguments={
"output_width": 1280,
"output_height": 720,
"x_offset": 0,
"y_offset": 0,
},
)
self.assertEqual(1, len(commands))
self.assert_token_pair(commands[0], "-c:v", "copy")
self.assertIn("libopus", commands[0])
self.assertNotIn("libvpx-vp9", commands[0])
self.assertNotIn("-pass", commands[0])
self.assertNotIn("-vf", commands[0])
self.assertFalse(any(token.startswith("ENCODING_QUALITY=") for token in commands[0]))
def test_copy_audio_uses_audio_copy_without_audio_encoding_options(self):
context = self.make_context(VideoEncoder.H264)
context["copy_audio"] = True
target_descriptor, source_descriptor = self.make_media_descriptors_with_audio(
AudioLayout.LAYOUT_5_1
)
controller = FfxController(context, target_descriptor, source_descriptor)
commands = []
with patch.object(
controller,
"executeCommandSequence",
side_effect=lambda command: commands.append(command) or ("", "", 0),
):
controller.runJob(
"input.mkv",
"output.mkv",
chainIteration=[
{
"identifier": "quality",
"parameters": {"quality": 21},
}
],
)
self.assertEqual(1, len(commands))
self.assert_token_pair(commands[0], "-c:a", "copy")
self.assertIn("libx264", commands[0])
self.assertNotIn("libopus", commands[0])
self.assertFalse(any(token.startswith("-b:a") for token in commands[0]))
self.assertFalse(any(token.startswith("-filter:a") for token in commands[0]))
def test_generate_h264_tokens_prefers_libx264_when_available(self):
context = self.make_context(VideoEncoder.H264)
target_descriptor, source_descriptor = self.make_media_descriptors()

View File

@@ -2,6 +2,7 @@ from __future__ import annotations
from pathlib import Path
import sys
import time
import unittest
from unittest.mock import patch
@@ -51,6 +52,33 @@ class ProcessTests(unittest.TestCase):
self.assertIn("Command timed out", err)
self.assertIn(sys.executable, err)
def test_execute_process_can_stop_early_while_streaming_stderr(self):
start = time.monotonic()
observed_lines = []
out, err, rc = executeProcess(
[
sys.executable,
"-c",
(
"import sys, time; "
"sys.stderr.write('fatal warning\\n'); sys.stderr.flush(); "
"time.sleep(2); "
"sys.stderr.write('late line\\n'); sys.stderr.flush()"
),
],
stderrLineHandler=lambda line: observed_lines.append(line) or ("fatal warning" in line),
)
elapsed = time.monotonic() - start
self.assertLess(elapsed, 1.5)
self.assertNotEqual(0, rc)
self.assertEqual("", out)
self.assertIn("fatal warning", err)
self.assertNotIn("late line", err)
self.assertEqual(["fatal warning\n"], observed_lines)
def test_get_wrapped_command_sequence_leaves_command_unwrapped_when_limits_disabled(self):
wrapped = getWrappedCommandSequence(
["ffmpeg", "-i", "input.mkv"],

View File

@@ -14,6 +14,7 @@ if str(SRC_ROOT) not in sys.path:
from ffx.audio_layout import AudioLayout # noqa: E402
from ffx.attachment_format import AttachmentFormat # noqa: E402
from ffx.helper import DIFF_ADDED_KEY # noqa: E402
from ffx.iso_language import IsoLanguage # noqa: E402
from ffx.logging_utils import get_ffx_logger # noqa: E402
@@ -200,6 +201,32 @@ class TagTableScreenStateTests(unittest.TestCase):
self.assertEqual("German Audio", descriptor.getTitle())
self.assertEqual("value", descriptor.getTags()["KEEP"])
def test_track_details_screen_preserves_attachment_format_for_attachment_tracks(self):
screen = object.__new__(TrackDetailsScreen)
screen.context = {"logger": get_ffx_logger()}
screen._TrackDetailsScreen__trackDescriptor = None
screen._TrackDetailsScreen__patternId = 5
screen._TrackDetailsScreen__index = 4
screen._TrackDetailsScreen__subIndex = 0
screen._TrackDetailsScreen__trackCodec = TrackCodec.UNKNOWN
screen._TrackDetailsScreen__attachmentFormat = AttachmentFormat.TTF
screen._TrackDetailsScreen__draftTrackTags = {"filename": "font.ttf", "mimetype": "font/ttf"}
widgets = {
"#type_select": FakeValueWidget(TrackType.ATTACHMENT),
"#audio_layout_select": FakeValueWidget(AudioLayout.LAYOUT_UNDEFINED),
"#language_select": FakeValueWidget(Select.NULL),
"#title_input": FakeInputWidget(""),
"#dispositions_selection_list": FakeSelectionListWidget(set()),
}
screen.query_one = lambda selector, _widget_type=None: widgets[selector]
descriptor = screen.getTrackDescriptorFromInput()
self.assertEqual(TrackType.ATTACHMENT, descriptor.getType())
self.assertEqual(AttachmentFormat.TTF, descriptor.getAttachmentFormat())
self.assertEqual(TrackCodec.UNKNOWN, descriptor.getCodec())
def test_track_details_screen_auto_sets_localized_title_from_selected_language(self):
set_current_language("de")
screen = object.__new__(TrackDetailsScreen)
@@ -521,6 +548,11 @@ class TagTableScreenStateTests(unittest.TestCase):
screen.tagsTable = FakeTagTable()
screen.shiftedSeasonsTable = FakeTagTable()
screen._PatternDetailsScreen__pattern = object()
screen._PatternDetailsScreen__showDescriptor = None
widgets = {
"#show_quality_hint": FakeStaticWidget(),
}
screen.query_one = lambda selector, _type=None: widgets[selector]
calls = []
screen.updateTags = lambda: calls.append("updateTags")
@@ -534,6 +566,48 @@ class TagTableScreenStateTests(unittest.TestCase):
calls,
)
def test_pattern_details_screen_on_mount_shows_show_quality_hint_for_new_pattern(self):
set_current_language("en")
screen = object.__new__(PatternDetailsScreen)
screen.context = {}
screen._PatternDetailsScreen__showDescriptor = ShowDescriptor(
id=7,
name="Demo",
year=1999,
quality=23,
)
screen._PatternDetailsScreen__pattern = None
widgets = {
"#showlabel": FakeStaticWidget(),
"#show_quality_hint": FakeStaticWidget(),
}
screen.query_one = lambda selector, _type=None: widgets[selector]
screen.on_mount()
self.assertEqual("7 - Demo (1999)", widgets["#showlabel"].value)
self.assertEqual("Show: 23", widgets["#show_quality_hint"].value)
def test_pattern_details_screen_show_quality_hint_is_hidden_when_pattern_quality_exists(self):
set_current_language("en")
screen = object.__new__(PatternDetailsScreen)
screen._PatternDetailsScreen__showDescriptor = ShowDescriptor(
id=7,
name="Demo",
year=1999,
quality=23,
)
screen._PatternDetailsScreen__pattern = type(
"_Pattern",
(),
{"quality": 19},
)()
self.assertEqual("", screen.getShowQualityHintText())
def test_inspect_details_screen_handle_edit_pattern_refreshes_even_without_result(self):
screen = object.__new__(InspectDetailsScreen)
@@ -695,6 +769,59 @@ class TagTableScreenStateTests(unittest.TestCase):
self.assertIn("English Full", screen.tracksTable.rows["row-0"])
self.assertIs(target_track, screen.getSelectedTrackDescriptor())
def test_inspect_details_screen_update_tracks_shows_attachment_format_and_blanks_language(self):
attachment_track = TrackDescriptor(
index=4,
source_index=4,
sub_index=0,
track_type=TrackType.ATTACHMENT,
attachment_format=AttachmentFormat.TTF,
tags={"filename": "font.ttf", "mimetype": "font/ttf"},
)
screen = object.__new__(InspectDetailsScreen)
screen.tracksTable = FakeTagTable()
screen._sourceMediaDescriptor = FakeMediaDescriptor([attachment_track])
screen._targetMediaDescriptor = None
screen._currentPattern = None
screen._trackRowData = {}
screen._applyNormalization = False
screen.updateTracks()
row = screen.tracksTable.rows["row-0"]
self.assertEqual("4", row[0])
self.assertEqual("TTF", row[3])
self.assertEqual(" ", row[5])
self.assertEqual(" ", row[7])
self.assertEqual(" ", row[8])
def test_inspect_details_screen_update_tracks_shows_unknown_for_unknown_attachment_format(self):
attachment_track = TrackDescriptor(
index=5,
source_index=5,
sub_index=0,
track_type=TrackType.ATTACHMENT,
attachment_format=AttachmentFormat.UNKNOWN,
tags={"filename": "blob.bin", "mimetype": "application/octet-stream"},
)
screen = object.__new__(InspectDetailsScreen)
screen.tracksTable = FakeTagTable()
screen._sourceMediaDescriptor = FakeMediaDescriptor([attachment_track])
screen._targetMediaDescriptor = None
screen._currentPattern = None
screen._trackRowData = {}
screen._applyNormalization = False
screen.updateTracks()
row = screen.tracksTable.rows["row-0"]
self.assertEqual("unknown", row[3])
self.assertEqual(" ", row[5])
def test_inspect_details_screen_maps_target_selection_back_to_source_track(self):
source_track = TrackDescriptor(
index=3,

View File

@@ -0,0 +1,61 @@
from __future__ import annotations
from pathlib import Path
import sys
import unittest
SRC_ROOT = Path(__file__).resolve().parents[2] / "src"
if str(SRC_ROOT) not in sys.path:
sys.path.insert(0, str(SRC_ROOT))
from ffx.attachment_format import AttachmentFormat # noqa: E402
from ffx.track_codec import TrackCodec # noqa: E402
from ffx.track_descriptor import TrackDescriptor # noqa: E402
from ffx.track_type import TrackType # noqa: E402
class TrackDescriptorProbeTests(unittest.TestCase):
def test_attachment_without_codec_name_uses_font_metadata_to_identify_ttf(self):
descriptor = TrackDescriptor.fromFfprobe(
{
"index": 4,
"codec_type": "attachment",
"disposition": {"default": 0},
"tags": {
"filename": "AmazonEmberTanuki-Italic.ttf",
"mimetype": "font/ttf",
},
},
subIndex=0,
)
self.assertIsNotNone(descriptor)
self.assertEqual(TrackType.ATTACHMENT, descriptor.getType())
self.assertEqual(AttachmentFormat.TTF, descriptor.getAttachmentFormat())
self.assertEqual(AttachmentFormat.TTF, descriptor.getFormatDescriptor())
self.assertEqual(TrackCodec.UNKNOWN, descriptor.getCodec())
def test_attachment_without_codec_name_still_probes_as_unknown_when_not_font(self):
descriptor = TrackDescriptor.fromFfprobe(
{
"index": 9,
"codec_type": "attachment",
"disposition": {"default": 0},
"tags": {
"filename": "cover.bin",
"mimetype": "application/octet-stream",
},
},
subIndex=0,
)
self.assertIsNotNone(descriptor)
self.assertEqual(TrackType.ATTACHMENT, descriptor.getType())
self.assertEqual(AttachmentFormat.UNKNOWN, descriptor.getAttachmentFormat())
self.assertEqual(TrackCodec.UNKNOWN, descriptor.getCodec())
if __name__ == "__main__":
unittest.main()