18 Commits

Author SHA1 Message Date
Javanaut
f794f822f2 Merge branch 'dev' of gitea.maveno.de:Javanaut/ffx into dev 2026-06-15 11:17:21 +02:00
Javanaut
1a11710df7 Convert docs to sphinx 2026-06-15 11:14:21 +02:00
Javanaut
93d19629dc v0.4.3 2026-05-22 21:24:48 +02:00
Javanaut
db43501ce2 v0.4.3 2026-05-22 21:13:48 +02:00
Javanaut
87568989fe fix styled ASS 2026-05-22 21:04:50 +02:00
Javanaut
20ab08626b TF Fix styled ASS font tracks 2026-05-22 20:11:05 +02:00
Javanaut
12be6e985a v0.4.2 2026-04-24 13:39:57 +02:00
Javanaut
12310942ae Fix inspect attachment subtracks 2026-04-24 10:34:43 +02:00
Javanaut
f913cb4fe3 ff 2026-04-24 08:49:48 +02:00
Javanaut
0a153280e3 ff 2026-04-24 08:49:30 +02:00
Javanaut
6ca0cd54b0 addendum 2026-04-23 22:16:03 +02:00
Javanaut
502a822bb4 prep 0.4.1 2026-04-23 22:09:36 +02:00
Javanaut
6cc21b5f36 Adds diagnostics/remedy system 2026-04-23 20:32:49 +02:00
Javanaut
0034f8ca97 ff 2026-04-23 16:37:47 +02:00
Javanaut
eedcbaed0a Merge branch 'dev' of gitea.maveno.de:Javanaut/ffx into dev 2026-04-23 16:31:19 +02:00
Javanaut
653ce7b417 Copy audio and video flags 2026-04-23 16:30:15 +02:00
Javanaut
b80c055826 fix table 2026-04-17 13:17:15 +02:00
Javanaut
c5fc6ac13d fix styled ASS and refactor att format 2026-04-17 11:41:13 +02:00
53 changed files with 3378 additions and 96 deletions

4
.gitignore vendored
View File

@@ -1,7 +1,6 @@
__pycache__/
*.py[cod]
junk/
.vscode
.ipynb_checkpoints/
tools/ansible/inventory/hawaii.yml
tools/ansible/inventory/peppermint.yml
@@ -10,11 +9,14 @@ tools/ansible/inventory/group_vars/all.yml
ffx_test_report.log
bin/conversiontest.py
tests/assets/
build/
dist/
*.egg-info/
.venv/
venv/
docs/_build/
.codex

11
.vscode/extensions.json vendored Normal file
View File

@@ -0,0 +1,11 @@
{
"recommendations": [
"swyddfa.esbonio",
"ms-python.python",
"ms-python.vscode-pylance",
"ms-python.debugpy",
"tamasfe.even-better-toml",
"redhat.vscode-yaml",
"DavidAnson.vscode-markdownlint"
]
}

18
.vscode/settings.json vendored Normal file
View File

@@ -0,0 +1,18 @@
{
"esbonio.sphinx.pythonCommand": "${venv:.venv}/bin/python",
"esbonio.sphinx.buildCommand": [
"sphinx-build",
"-b",
"html",
"docs",
"docs/_build/html"
],
"python.defaultInterpreterPath": "${workspaceFolder}/.venv/bin/python",
"python.testing.pytestEnabled": true,
"python.testing.pytestArgs": [
"--ignore=tests/legacy",
"--ignore=tests/support",
"tests"
],
"restructuredtext.confPath": "${workspaceFolder}/docs"
}

View File

@@ -99,6 +99,25 @@ TMDB-backed metadata enrichment requires `TMDB_API_KEY` to be set in the environ
## Version History
### 0.4.3
- styled ASS subtitle sources with embedded font attachments are now detected explicitly, keep MKV output, preserve current source font attachments, and reject incompatible sidecar subtitle import
- attachment descriptors are now treated as source-runtime data instead of pattern schema data, so pattern persistence skips them and source-vs-pattern validation ignores them
- inspect differences no longer report planned changes for attachment filename/count drift while still showing attachment streams in the stream table
### 0.4.2
- pattern details now show an inline `Show: <quality>` hint next to the quality field when the pattern itself has no stored quality but the selected show does
- inspect stream tables now show attachment format labels like `TTF` in the codec column and keep attachment language cells blank instead of showing an undefined language
- ffmpeg damaged-MP3 diagnostics now recognize additional corruption lines such as `invalid new backstep`, keeping them grouped under the `warn-corrupt-mpeg-audio` review summary
### 0.4.1
- `convert` now supports `--copy-video` and `--copy-audio` to keep the selected stream type in copy mode without applying the corresponding reencode flags, filters, or formatting options
- ffmpeg conversions now monitor diagnostics while the process is running, retry unset AVI packet timestamps once with `-fflags +genpts`, and stop early when a file should be skipped instead of waiting for the full job to finish
- end-of-run convert summaries now list only ffmpeg findings that still require review, including named remedy identifiers such as `warn-corrupt-mpeg-audio`
- `upgrade` now finishes by reporting the installed FFX version together with the active bundle branch
### 0.3.1
- debug mode screen titles now append the active Textual screen class name, making screen-specific troubleshooting easier during inspect and edit flows

View File

@@ -71,11 +71,5 @@
- Delete this scratchpad once the optimization backlog is either converted into issues/work items or distilled into durable project guidance.
## Missing Timestamps
Detect ffmpeg warning "Timestamps are unset in a packet for stream 0. This is deprecated and will stop working in the future. Fix your code to set the timestamps properly" and try autofix by -fflags +genpts -> Warning if fails -> Error. Check if flags collide with anything.
## .265 export
-map 0:v -c:v copy -bsf:v hevc_mp4toannexb out.h265
## TODO: Review styled ASS separate handling

21
docs/Makefile Normal file
View File

@@ -0,0 +1,21 @@
SPHINXOPTS ?=
VENV_SPHINXBUILD = ../.venv/bin/sphinx-build
SPHINXBUILD ?= $(if $(wildcard $(VENV_SPHINXBUILD)),$(VENV_SPHINXBUILD),sphinx-build)
SOURCEDIR = .
BUILDDIR = _build
.PHONY: help clean html linkcheck
help:
@echo "Please use 'make <target>' where <target> is one of"
@echo " html to make standalone HTML files"
@echo " linkcheck to check all external links for integrity"
clean:
rm -rf "$(BUILDDIR)"
html:
@$(SPHINXBUILD) -b html "$(SOURCEDIR)" "$(BUILDDIR)/html" $(SPHINXOPTS)
linkcheck:
@$(SPHINXBUILD) -b linkcheck "$(SOURCEDIR)" "$(BUILDDIR)/linkcheck" $(SPHINXOPTS)

31
docs/api.rst Normal file
View File

@@ -0,0 +1,31 @@
API Reference
=============
This section exposes selected modules that are useful when working on tests,
diagnostics, process execution, metadata editing, and file probing.
CLI Helpers
-----------
.. automodule:: ffx.cli
:members:
:undoc-members:
Process Helpers
---------------
.. automodule:: ffx.process
:members:
:undoc-members:
File Probing
------------
.. automodule:: ffx.file_properties
Metadata Editing
----------------
.. automodule:: ffx.metadata_editor
:members:
:undoc-members:

44
docs/conf.py Normal file
View File

@@ -0,0 +1,44 @@
from __future__ import annotations
from importlib.metadata import PackageNotFoundError, version as package_version
from pathlib import Path
import sys
ROOT_DIR = Path(__file__).resolve().parents[1]
SRC_DIR = ROOT_DIR / "src"
sys.path.insert(0, str(SRC_DIR))
project = "FFX"
author = "javanaut@maveno.de"
copyright = "2026, Maveno"
try:
release = package_version("ffx")
except PackageNotFoundError:
release = "0.0.0"
version = release
extensions = [
"sphinx.ext.autodoc",
"sphinx.ext.napoleon",
"sphinx.ext.viewcode",
"sphinx_copybutton",
]
source_suffix = {
".rst": "restructuredtext",
}
templates_path = ["_templates"]
exclude_patterns = ["_build", "Thumbs.db", ".DS_Store"]
html_theme = "sphinx_rtd_theme"
html_title = "FFX"
html_static_path = []
autodoc_typehints = "description"
autodoc_member_order = "bysource"
napoleon_google_docstring = True
napoleon_numpy_docstring = True

50
docs/development.rst Normal file
View File

@@ -0,0 +1,50 @@
Development
===========
The repo-local ``.venv`` is the preferred environment for contributors working
on tests or documentation:
.. code-block:: sh
tests/prepare.sh
The preparation script installs the package in editable mode with both test and
documentation extras:
.. code-block:: text
.[test,docs]
Run Tests
---------
Run the modern pytest suite:
.. code-block:: sh
.venv/bin/python -m pytest --ignore=tests/legacy --ignore=tests/support tests
The legacy harness remains available separately and is intentionally not part of
the default pytest run.
Build Docs
----------
Build HTML documentation:
.. code-block:: sh
.venv/bin/sphinx-build -b html docs docs/_build/html
The same command is wrapped by the Sphinx ``Makefile``:
.. code-block:: sh
make -C docs html
VS Code
-------
The repository includes ``.vscode/extensions.json`` with recommended
extensions, including Esbonio for Sphinx language-server support. The workspace
settings point Python tooling and Esbonio at the repo-local ``.venv``.

BIN
docs/esbonio.db Normal file

Binary file not shown.

192
docs/file_formats.rst Normal file
View File

@@ -0,0 +1,192 @@
File Formats
============
This document captures source-file-format notes that complement the normative
requirements in ``requirements/source_file_formats.md``.
The first documented format is a Matroska source that carries styled ASS/SSA
subtitle streams together with embedded font attachments.
Styled ASS In Matroska With Embedded Fonts
------------------------------------------
These files are typically ``.mkv`` releases where subtitle rendering quality
depends on keeping both parts of the subtitle package together:
* one or more subtitle streams with codec ``ass``
* one or more attachment streams that embed font files used by those subtitles
This matters because ASS subtitles are not plain text subtitles in the narrow
WebVTT sense. They can carry layout, styling, positioning, karaoke, signs, and
other typesetting effects. If the matching embedded fonts are lost, consumers
can still see subtitle text but the intended styling and sometimes glyph
coverage can be degraded.
For FFX this format is special because the ASS subtitle streams should remain
normally editable and mappable, while the related font attachments should be
transported unchanged.
Observed Sample
---------------
Assessment date: ``2026-04-17``
Observed sample file:
* ``tests/assets/boruto_s01e283_ssa.mkv``
Commands used for assessment:
.. code-block:: bash
ffprobe tests/assets/boruto_s01e283_ssa.mkv
ffprobe -hide_banner -show_format -show_streams -of json tests/assets/boruto_s01e283_ssa.mkv
Observed stream layout:
.. list-table::
:header-rows: 1
* - Stream index
- Kind
- Key details
* - ``0``
- video
- ``codec_name=h264``
* - ``1``
- audio
- ``codec_name=aac``, ``language=jpn``
* - ``2``
- subtitle
- ``codec_name=ass``, ``language=ger``, default
* - ``3``
- subtitle
- ``codec_name=ass``, ``language=eng``
* - ``4``-``13``
- attachment
- ``tags.mimetype=font/ttf``, ``.ttf`` filenames
Observed attachment filenames:
* ``AmazonEmberTanuki-Italic.ttf``
* ``AmazonEmberTanuki-Regular.ttf``
* ``Arial.ttf``
* ``Arial Bold.ttf``
* ``Georgia.ttf``
* ``Times New Roman.ttf``
* ``Times New Roman Bold.ttf``
* ``Trebuchet MS.ttf``
* ``Verdana.ttf``
* ``Verdana Bold.ttf``
Important probe behavior from the real sample:
* Plain ``ffprobe`` lists the font streams as ``Attachment: none``.
* Plain ``ffprobe`` also prints warnings such as ``Could not find codec
parameters for stream 4 (Attachment: none): unknown codec`` and later
``Unsupported codec with id 0 for input stream ...``.
* The JSON produced by ``FileProperties.FFPROBE_COMMAND_TOKENS``
(``ffprobe -hide_banner -show_format -show_streams -of json``) still exposes
the attachment streams clearly through ``codec_type="attachment"`` and the
attachment tags.
* In that JSON, the attachment streams do not expose ``codec_name``.
This last point is important for FFX: robust detection must not depend on
attachment ``codec_name`` being present.
Detection Guidance
------------------
Current known indicators for this format are:
* one or more subtitle streams with ``codec_type="subtitle"`` and
``codec_name="ass"``
* one or more attachment streams with ``codec_type="attachment"``
* attachment tags that identify embedded fonts, especially
``tags.mimetype="font/ttf"``
* attachment filenames that end in ``.ttf``
The pattern can vary. FFX should therefore treat the above as a cluster of
signals rather than an exact signature tied to one file.
Inference from the observed sample plus FFmpeg documentation:
* MIME matching should not be limited to ``font/ttf`` alone.
* The Boruto sample uses ``font/ttf``.
* FFmpeg's Matroska attachment example uses
``mimetype=application/x-truetype-font`` for a ``.ttf`` attachment.
* Detection should therefore normalize multiple TTF-like MIME values rather
than depend on a single exact string.
Processing Expectations In FFX
------------------------------
The format-specific requirements live in
``requirements/source_file_formats.md``. In practical terms, FFX should:
* recognize the ASS-plus-font-attachment pattern even when attachment probe data
is incomplete
* tell the operator that the pattern was detected and that special handling is
being used
* reject sidecar subtitle import for such sources, because converting or
replacing these subtitle tracks with ordinary external text subtitles would
break the intended subtitle package
* continue to allow normal manipulation of the ASS subtitle tracks themselves
* preserve the font attachment streams unchanged
FFmpeg Notes
------------
Relevant FFmpeg documentation confirms several behaviors that line up with
FFX's needs:
* FFmpeg documents ``-attach`` as adding an attachment stream to the output, and
explicitly names Matroska fonts used in subtitle rendering as an example.
* FFmpeg documents attachment streams as regular streams that are created after
the mapped media streams.
* FFmpeg documents ``-dump_attachment`` for extracting attachment streams, which
is useful for debugging or validating a source file's embedded fonts.
* FFmpeg's Matroska example requires a ``mimetype`` metadata tag for attached
fonts, which is consistent with using attachment tags as detection signals.
* FFmpeg also notes that attachments are implemented as codec extradata. That
helps explain why probe output for attachment streams can look different from
ordinary audio, video, and subtitle streams.
Implication for FFX:
* Attachment preservation is not an optional cosmetic feature for this format.
It is part of preserving the subtitle package correctly.
Jellyfin Notes
--------------
Jellyfin's documentation also supports keeping this format intact:
* Jellyfin's subtitle compatibility table lists ``ASS/SSA`` as supported in
``MKV`` and not supported in ``MP4``.
* Jellyfin notes that when subtitles must be transcoded, they are either
converted to a supported format or burned into the video, and burning them in
is the most CPU-intensive path.
* Jellyfin's subtitle-extraction example for ``SSA/ASS`` first dumps attachment
streams and then extracts the ASS subtitle stream, which reflects the real
relationship between ASS subtitles and embedded fonts in MKV releases.
* Jellyfin's font documentation says text-based subtitles require fonts to
render properly.
* Jellyfin's configuration documentation says the web client uses configured
fallback fonts for ASS subtitles when other fonts such as MKV attachments or
client-side fonts are not available.
Inference from the Jellyfin compatibility tables:
* Keeping this subtitle format in Matroska is the safest interoperability choice
for Jellyfin consumers.
* Converting the subtitle payload to WebVTT would lose styled ASS behavior.
* Dropping the attachment streams would force client or fallback font
substitution and can change appearance or glyph coverage.
References
----------
* FFmpeg documentation: https://ffmpeg.org/ffmpeg.html
* Jellyfin codec support: https://jellyfin.org/docs/general/clients/codec-support/
* Jellyfin configuration and fonts: https://jellyfin.org/docs/general/administration/configuration/

25
docs/index.rst Normal file
View File

@@ -0,0 +1,25 @@
FFX Documentation
=================
FFX is a local command-line and Textual terminal UI for inspecting TV episode
files, storing normalization rules, and converting media into predictable
archive-ready outputs.
This documentation covers operator setup, day-to-day command usage, contributor
workflow, format-specific notes, and generated API references for the smaller
utility modules.
.. toctree::
:maxdepth: 2
:caption: User Guide
installation
usage
file_formats
.. toctree::
:maxdepth: 2
:caption: Contributor Guide
development
api

52
docs/installation.rst Normal file
View File

@@ -0,0 +1,52 @@
Installation
============
FFX is designed for a Linux-like workstation with local command execution. The
runtime media tools must be available on ``PATH``:
* ``ffmpeg``
* ``ffprobe``
* ``cpulimit``
User Bundle
-----------
The persistent user installation is prepared with the two-step flow described in
the project README:
.. code-block:: sh
bash tools/setup.sh
bash tools/configure_workstation.sh
``tools/setup.sh`` creates the long-lived bundle virtualenv at
``~/.local/share/ffx.venv`` and exposes the ``ffx`` command. The workstation
script checks system tools and seeds local config directories.
Local Test And Docs Environment
-------------------------------
Contributor test and documentation work uses the repo-local virtualenv:
.. code-block:: sh
tests/prepare.sh
The script creates ``.venv``, installs FFX in editable mode with test and docs
extras, and verifies the Sphinx toolchain. Use check-only mode when you only
want to inspect readiness:
.. code-block:: sh
tests/prepare.sh --check
Documentation Build
-------------------
After preparation, build the documentation with:
.. code-block:: sh
.venv/bin/sphinx-build -b html docs docs/_build/html
The generated site starts at ``docs/_build/html/index.html``.

42
docs/make.bat Normal file
View File

@@ -0,0 +1,42 @@
@ECHO OFF
pushd %~dp0
if "%SPHINXBUILD%" == "" if exist ..\.venv\Scripts\sphinx-build.exe (
set SPHINXBUILD=..\.venv\Scripts\sphinx-build.exe
)
if "%SPHINXBUILD%" == "" set SPHINXBUILD=sphinx-build
set SOURCEDIR=.
set BUILDDIR=_build
%SPHINXBUILD% >NUL 2>NUL
if errorlevel 9009 (
echo.
echo The 'sphinx-build' command was not found. Make sure Sphinx is installed,
echo then set SPHINXBUILD to the full path if needed.
exit /b 1
)
if "%1" == "" goto help
if "%1" == "html" goto html
if "%1" == "linkcheck" goto linkcheck
echo.
echo Unknown target "%1".
goto help
:html
%SPHINXBUILD% -b html %SOURCEDIR% %BUILDDIR%\html %SPHINXOPTS%
goto end
:linkcheck
%SPHINXBUILD% -b linkcheck %SOURCEDIR% %BUILDDIR%\linkcheck %SPHINXOPTS%
goto end
:help
echo.
echo Please use 'make.bat ^<target^>' where ^<target^> is one of
echo html to make standalone HTML files
echo linkcheck to check all external links for integrity
:end
popd

75
docs/usage.rst Normal file
View File

@@ -0,0 +1,75 @@
Usage
=====
FFX exposes a single ``ffx`` command with subcommands for inspection,
conversion, metadata editing, setup, and maintenance.
Inspect Files
-------------
Open the inspection workflow for one or more files:
.. code-block:: sh
ffx inspect /path/to/episode.mkv
Print resolved season-shift mappings without opening the TUI:
.. code-block:: sh
ffx inspect --shift /path/to/episode.mkv
Convert Files
-------------
Convert one or more source files using stored rules where available:
.. code-block:: sh
ffx convert /path/to/episode.mkv
Useful overrides include:
* ``--no-pattern`` to skip database pattern matching
* ``--show``, ``--season``, and ``--episode`` for explicit episode identity
* ``--output-directory`` for generated output placement
* ``--subtitle-directory`` and ``--subtitle-prefix`` for sidecar subtitle
imports
* ``--copy-video`` or ``--copy-audio`` to preserve selected stream types
* ``--rename-only`` for filename normalization without media rewriting
Manage Shows And Patterns
-------------------------
Open the Textual interface for show and pattern management:
.. code-block:: sh
ffx shows
Extract Streams
---------------
Extract streams from a file:
.. code-block:: sh
ffx unmux /path/to/episode.mkv
For subtitle-only extraction:
.. code-block:: sh
ffx unmux --subtitles-only --label show-name /path/to/episode.mkv
Detect Crop
-----------
Ask FFmpeg to suggest crop parameters:
.. code-block:: sh
ffx cropdetect /path/to/episode.mkv
The default sampling window is controlled by the application defaults and can be
overridden with command options.

View File

@@ -1,7 +1,7 @@
[project]
name = "ffx"
description = "FFX recoding and metadata managing tool"
version = "0.3.1"
version = "0.4.3"
license = {file = "LICENSE.md"}
dependencies = [
"requests",
@@ -31,6 +31,12 @@ Issues = "https://gitea.maveno.de/Javanaut/ffx/issues"
test = [
"pytest",
]
docs = [
"esbonio",
"sphinx",
"sphinx-copybutton",
"sphinx-rtd-theme",
]
[build-system]
requires = [

View File

@@ -0,0 +1,90 @@
# Source File Formats
This file defines source-file-format-specific processing requirements for FFX.
It is intended to grow as additional relevant source file types are identified.
The first covered format is Matroska media that contains styled ASS/SSA
subtitle streams together with embedded font attachments.
## Scope
- Detecting source files that use ASS subtitle streams together with embedded
font attachments needed for correct rendering.
- Defining the required `ffx convert` behavior when this format is present.
- Preserving the required attachment streams during conversion.
- Keeping normal subtitle-track manipulation behavior for the ASS subtitle
tracks themselves.
## Out Of Scope
- General subtitle behavior for sources that do not carry this pattern.
- A complete catalog of all source file formats FFX may support later.
## Terms
- `styled ASS source`: a source media file that contains one or more subtitle
streams with `codec_type="subtitle"` and `codec_name="ass"` together with
one or more font-bearing attachment streams.
- `font attachment`: an attachment stream whose metadata identifies a font
payload, commonly through `tags.mimetype` and attachment filename metadata.
- `external subtitle feed`: subtitle tracks supplied from separate subtitle
files through the existing subtitle-import path.
- `special attachment subtracks`: the embedded font attachment streams that
belong to the styled ASS source pattern.
## Rules
- `SOURCE_FILE_FORMATS-0001`: The system shall recognize the styled ASS source
pattern.
- `SOURCE_FILE_FORMATS-0002`: Recognition shall not depend on fixed stream
counts, fixed stream indices, or one exact attachment count.
- `SOURCE_FILE_FORMATS-0003`: Recognition shall use the best available ffprobe
signals. For known subtitle streams this includes
`codec_type="subtitle"` together with `codec_name="ass"`.
- `SOURCE_FILE_FORMATS-0004`: Recognition of the special attachment subtracks
shall use attachment-oriented signals such as `codec_type="attachment"` and
font-identifying metadata such as `tags.mimetype="font/ttf"` when present.
- `SOURCE_FILE_FORMATS-0005`: Recognition shall tolerate known ffprobe
variation in attachment reporting, including files where attachment streams
do not expose a `codec_name` but do expose `codec_type="attachment"` and
font-identifying tags.
- `SOURCE_FILE_FORMATS-0006`: When attachment metadata varies across files,
detection shall not depend on one exact MIME string alone. Detection shall
be written so the known pattern can vary while still recognizing font
attachments.
- `SOURCE_FILE_FORMATS-0007`: When the styled ASS source pattern is detected,
`ffx convert` shall emit an operator-facing message that reports the
detection and hints that special subtitle preservation handling is being
applied.
- `SOURCE_FILE_FORMATS-0008`: When the styled ASS source pattern is present on
the source file, `ffx convert` shall not process an external subtitle feed.
The command shall stop before conversion and report an error that explains
that separate subtitle-file import is incompatible with this source format.
- `SOURCE_FILE_FORMATS-0009`: Normal manipulation of the ASS subtitle streams
themselves shall continue to work through the usual selection, ordering,
metadata, language, title, and disposition handling paths.
- `SOURCE_FILE_FORMATS-0010`: The special attachment subtracks shall be
preserved in the target media file as-is rather than transcoded,
regenerated, or replaced from external sources.
- `SOURCE_FILE_FORMATS-0011`: Preserving the special attachment subtracks
as-is includes retaining the attachment payload and the attachment metadata
required by consumers, especially attachment filename and mimetype
information.
- `SOURCE_FILE_FORMATS-0012`: This file shall remain the extension point for
additional source-file-format contracts as FFX adds support for more special
source formats.
## Acceptance
- A source file matching the observed pattern of embedded ASS subtitles plus
font attachments is recognized even when the attachment streams do not carry
a `codec_name`.
- `ffx convert` output contains a clear detection message before the actual
conversion work proceeds.
- If external subtitle import is requested for such a source file, the command
fails fast with an explicit error instead of mixing sidecar subtitles into
the job.
- Existing manipulation of the ASS subtitle tracks still works for metadata,
titles, languages, ordering, and dispositions.
- The output media preserves the required font attachment streams and their
identifying metadata needed by downstream media players.

View File

@@ -0,0 +1,67 @@
from enum import Enum
import os
class AttachmentFormat(Enum):
TTF = {'identifier': 'ttf', 'format': None, 'extension': 'ttf', 'label': 'TTF'}
PNG = {'identifier': 'png', 'format': None, 'extension': 'png', 'label': 'PNG'}
UNKNOWN = {'identifier': 'unknown', 'format': None, 'extension': None, 'label': 'UNKNOWN'}
def identifier(self):
return str(self.value['identifier'])
def label(self):
return str(self.value['label'])
def format(self):
return self.value['format']
def extension(self):
return str(self.value['extension'])
@staticmethod
def identify(identifier: str):
formats = [f for f in AttachmentFormat if f.value['identifier'] == str(identifier)]
if formats:
return formats[0]
return AttachmentFormat.UNKNOWN
@staticmethod
def identifyFfprobeStream(streamObj: dict):
identifier = streamObj.get("codec_name")
identifiedFormat = AttachmentFormat.identify(identifier)
if identifiedFormat != AttachmentFormat.UNKNOWN:
return identifiedFormat
if str(streamObj.get("codec_type", "")).strip() != "attachment":
return AttachmentFormat.UNKNOWN
tags = streamObj.get("tags", {}) or {}
mimetype = str(tags.get("mimetype", "")).strip().lower()
filename = str(tags.get("filename", "")).strip().lower()
filenameExtension = os.path.splitext(filename)[1]
if (
mimetype in {
"font/ttf",
"application/x-truetype-font",
"application/x-font-ttf",
}
or "truetype" in mimetype
or filenameExtension == ".ttf"
):
return AttachmentFormat.TTF
if mimetype in {"image/png", "image/x-png"} or filenameExtension == ".png":
return AttachmentFormat.PNG
return AttachmentFormat.UNKNOWN
@staticmethod
def fromTrackCodec(trackCodec):
identifier = getattr(trackCodec, "identifier", None)
if callable(identifier):
return AttachmentFormat.identify(trackCodec.identifier())
return AttachmentFormat.UNKNOWN

View File

@@ -68,6 +68,14 @@ CUT_OPTION_HELP = (
+ "or --cut START,DURATION for an explicit start and duration. "
+ "Omit to disable."
)
COPY_VIDEO_OPTION_HELP = (
"Copy video streams without re-encoding. Skips video encoder options "
+ "and video filters."
)
COPY_AUDIO_OPTION_HELP = (
"Copy audio streams without re-encoding. Skips audio encoder options "
+ "and audio filters."
)
def normalizeNicenessOption(ctx, param, value):
@@ -385,6 +393,41 @@ def getTrackedGitChanges(repoPath):
return [line for line in completed.stdout.splitlines() if line.strip()]
def getCurrentGitBranch(repoPath):
completed = subprocess.run(
['git', 'rev-parse', '--abbrev-ref', 'HEAD'],
cwd=repoPath,
capture_output=True,
text=True,
)
if completed.returncode != 0:
commandLabel = 'git rev-parse --abbrev-ref HEAD'
errorOutput = completed.stderr.strip() or completed.stdout.strip()
raise click.ClickException(
f"Unable to inspect bundle repository branch using '{commandLabel}': {errorOutput}"
)
return completed.stdout.strip() or "unknown"
def getBundleVersion(repoPath):
constantsPath = os.path.join(repoPath, 'src', 'ffx', 'constants.py')
try:
with open(constantsPath, encoding='utf-8') as constantsFile:
for line in constantsFile:
strippedLine = line.strip()
if strippedLine.startswith('VERSION=') or strippedLine.startswith('VERSION ='):
return strippedLine.split('=', 1)[1].strip().strip('"\'')
except OSError as ex:
raise click.ClickException(
f"Unable to inspect bundle version from {constantsPath}: {ex}"
) from ex
raise click.ClickException(f"Unable to inspect bundle version from {constantsPath}")
def runScriptWrapper(ctx, scriptPath, missingDescription, commandArgs):
if not os.path.isfile(scriptPath):
raise click.ClickException(f"{missingDescription} not found at {scriptPath}")
@@ -507,6 +550,10 @@ def upgrade(ctx, branch):
if completed.returncode != 0:
ctx.exit(completed.returncode)
upgradedBranch = getCurrentGitBranch(bundleRepoPath)
upgradedVersion = getBundleVersion(bundleRepoPath)
click.echo(f"Updated FFX to version {upgradedVersion} from branch {upgradedBranch}.")
@ffx.command()
@click.pass_context
@@ -639,6 +686,7 @@ def getUnmuxSequence(trackDescriptor: TrackDescriptor, sourcePath, targetPrefix,
trackType = trackDescriptor.getType()
trackCodec = trackDescriptor.getCodec()
trackFormat = trackDescriptor.getFormatDescriptor()
targetPathBase = os.path.join(targetDirectory, targetPrefix) if targetDirectory else targetPrefix
@@ -651,12 +699,12 @@ def getUnmuxSequence(trackDescriptor: TrackDescriptor, sourcePath, targetPrefix,
commandTokens += ['-c', 'copy']
# output format
codecFormat = trackCodec.format()
codecFormat = trackFormat.format()
if codecFormat is not None:
commandTokens += ['-f', codecFormat]
# output filename
commandTokens += [f"{targetPathBase}.{trackCodec.extension()}"]
commandTokens += [f"{targetPathBase}.{trackFormat.extension()}"]
return commandTokens
@@ -771,7 +819,7 @@ def unmux(ctx,
if not ctx.obj['dry_run']:
#TODO #425: Codec Enum
ctx.obj['logger'].info(f"Unmuxing stream {trackDescriptor.getIndex()} into file {targetPrefix}.{trackDescriptor.getCodec().extension()}")
ctx.obj['logger'].info(f"Unmuxing stream {trackDescriptor.getIndex()} into file {targetPrefix}.{trackDescriptor.getFormatDescriptor().extension()}")
ctx.obj['logger'].debug(f"Executing unmuxing sequence")
@@ -914,6 +962,8 @@ def checkUniqueDispositions(context, mediaDescriptor: MediaDescriptor):
@click.option('-l', '--label', type=str, default='', help='Label to be used as filename prefix')
@click.option('-v', '--video-encoder', type=str, default=DEFAULT_VIDEO_ENCODER_LABEL, help=f"Target video encoder (vp9, av1, h264 or copy)", show_default=True)
@click.option('--copy-video', is_flag=True, default=False, help=COPY_VIDEO_OPTION_HELP)
@click.option('--copy-audio', is_flag=True, default=False, help=COPY_AUDIO_OPTION_HELP)
@click.option('-q', '--quality', type=str, default="", help=f"Quality settings to be used with VP9/H264 encoder")
@click.option('-p', '--preset', type=str, default="", help=f"Quality preset to be used with AV1 encoder")
@@ -1010,6 +1060,8 @@ def convert(ctx,
paths,
label,
video_encoder,
copy_video,
copy_audio,
quality,
preset,
stereo_bitrate,
@@ -1069,6 +1121,11 @@ def convert(ctx,
Suffices will we appended to filename in case of multiple created files
or if the filename has not changed."""
from ffx.ffx_controller import FfxController
from ffx.diagnostics import (
FfmpegSkipFileWarning,
getUnremediedIssues,
iterUnremediedIssueSummaryLines,
)
from ffx.file_properties import FileProperties
from ffx.filter.crop_filter import CropFilter
from ffx.filter.deinterlace_filter import DeinterlaceFilter
@@ -1082,6 +1139,7 @@ def convert(ctx,
from ffx.tmdb_controller import TmdbController
from ffx.track_codec import TrackCodec
from ffx.track_disposition import TrackDisposition
from ffx.track_type import TrackType
from ffx.video_encoder import VideoEncoder
startTime = time.perf_counter()
@@ -1089,9 +1147,12 @@ def convert(ctx,
context = ctx.obj
context['video_encoder'] = VideoEncoder.fromLabel(video_encoder)
context['copy_video'] = copy_video
context['copy_audio'] = copy_audio
copyVideoEffective = copy_video or context['video_encoder'] == VideoEncoder.COPY
# HINT: quick and dirty override for h264, todo improve
if context['video_encoder'] in (VideoEncoder.H264, VideoEncoder.COPY):
if context['video_encoder'] in (VideoEncoder.H264, VideoEncoder.COPY) or copy_video or copy_audio:
targetFormat = ''
targetExtension = 'mkv'
else:
@@ -1224,36 +1285,54 @@ def convert(ctx,
tc = TmdbController() if context['use_tmdb'] else None
qualityKwargs = {QualityFilter.QUALITY_KEY: str(quality)}
if copyVideoEffective and quality:
ctx.obj['logger'].warning("Ignoring quality settings because video is being copied")
qualityKwargs = {
QualityFilter.QUALITY_KEY: "" if copyVideoEffective else str(quality)
}
qf = QualityFilter(**qualityKwargs)
if context['video_encoder'] == VideoEncoder.AV1 and preset:
if context['video_encoder'] == VideoEncoder.AV1 and preset and not copyVideoEffective:
presetKwargs = {PresetFilter.PRESET_KEY: preset}
PresetFilter(**presetKwargs)
cf = None
# if crop != 'none':
if crop == 'auto':
videoFilterOptionsRequested = (
crop != 'none'
or deinterlace != 'none'
or denoise != 'none'
or denoise_strength
or denoise_patch_size
or denoise_chroma_patch_size
or denoise_research_window
or denoise_chroma_research_window
)
if copyVideoEffective and videoFilterOptionsRequested:
ctx.obj['logger'].warning("Ignoring video filter options because video is being copied")
if crop == 'auto' and not copyVideoEffective:
cropKwargs = {}
cf = CropFilter(**cropKwargs)
denoiseKwargs = {}
if denoise_strength:
if denoise_strength and not copyVideoEffective:
denoiseKwargs[NlmeansFilter.STRENGTH_KEY] = denoise_strength
if denoise_patch_size:
if denoise_patch_size and not copyVideoEffective:
denoiseKwargs[NlmeansFilter.PATCH_SIZE_KEY] = denoise_patch_size
if denoise_chroma_patch_size:
if denoise_chroma_patch_size and not copyVideoEffective:
denoiseKwargs[NlmeansFilter.CHROMA_PATCH_SIZE_KEY] = denoise_chroma_patch_size
if denoise_research_window:
if denoise_research_window and not copyVideoEffective:
denoiseKwargs[NlmeansFilter.RESEARCH_WINDOW_KEY] = denoise_research_window
if denoise_chroma_research_window:
if denoise_chroma_research_window and not copyVideoEffective:
denoiseKwargs[NlmeansFilter.CHROMA_RESEARCH_WINDOW_KEY] = denoise_chroma_research_window
if denoise != 'none' or denoiseKwargs:
if not copyVideoEffective and (denoise != 'none' or denoiseKwargs):
NlmeansFilter(**denoiseKwargs)
if deinterlace != 'none':
if deinterlace != 'none' and not copyVideoEffective:
DeinterlaceFilter()
chainYield = list(qf.getChainYield())
@@ -1313,13 +1392,31 @@ def convert(ctx,
sourceMediaDescriptor = mediaFileProperties.getMediaDescriptor()
if ([smd for smd in sourceMediaDescriptor.getSubtitleTracks()
if smd.getCodec() == TrackCodec.ASS]
and [amd for amd in sourceMediaDescriptor.getAttachmentTracks()
if amd.getCodec() == TrackCodec.TTF]):
from ffx.attachment_format import AttachmentFormat
styledAssDetector = getattr(
sourceMediaDescriptor,
"hasStyledAssSubtitlesWithFontAttachments",
None,
)
styledAssSourceDetected = (
bool(styledAssDetector())
if callable(styledAssDetector)
else False
)
if styledAssSourceDetected:
styledAssMessage = (
"Styled ASS subtitles with embedded font attachments detected; "
+ "preserving source font attachments."
)
click.echo(styledAssMessage)
targetFormat = ''
targetExtension = 'mkv'
if context['import_subtitles']:
raise click.ClickException(
"External subtitle import is incompatible with styled ASS "
+ "sources that carry embedded font attachments."
)
#HINT: This is None if the filename did not match anything in database
@@ -1346,6 +1443,12 @@ def convert(ctx,
else:
targetMediaDescriptor = currentPattern.getMediaDescriptor(ctx.obj)
if styledAssSourceDetected:
targetMediaDescriptor = targetMediaDescriptor.withSourceAttachmentTracks(
sourceMediaDescriptor,
AttachmentFormat.TTF,
context=ctx.obj,
)
checkUniqueDispositions(context, targetMediaDescriptor)
currentShowDescriptor = currentPattern.getShowDescriptor(ctx.obj)
@@ -1355,6 +1458,8 @@ def convert(ctx,
targetTrackDescriptorList = targetMediaDescriptor.getTrackDescriptors()
for ttd in targetTrackDescriptorList:
if ttd.getType() == TrackType.ATTACHMENT:
continue
tti = ttd.getIndex()
ttsi = ttd.getSourceIndex()
@@ -1525,6 +1630,7 @@ def convert(ctx,
if rename_only:
shutil.move(sourcePath, targetPath)
else:
try:
fc.runJob(sourcePath,
targetPath,
targetFormat,
@@ -1532,11 +1638,22 @@ def convert(ctx,
cropArguments,
currentPattern,
currentShowDescriptor)
except FfmpegSkipFileWarning:
if os.path.exists(targetPath):
os.remove(targetPath)
continue
endTime = time.perf_counter()
ctx.obj['logger'].info(f"\nDONE\nTime elapsed {endTime - startTime}")
unremediedIssues = getUnremediedIssues(context)
if unremediedIssues:
ctx.obj['logger'].warning("\nFiles with ffmpeg findings that require review:")
for summaryLine in iterUnremediedIssueSummaryLines(context):
ctx.obj['logger'].warning(summaryLine)
else:
ctx.obj['logger'].info("All files converted with no issues.")
if __name__ == '__main__':

View File

@@ -1,4 +1,4 @@
VERSION='0.3.1'
VERSION='0.4.3'
DATABASE_VERSION = 3
DEFAULT_QUALITY = 32

View File

@@ -0,0 +1,24 @@
from .base import FfmpegRemedy, FfmpegRemedyDecision, FfmpegSkipFileWarning
from .monitor import FfmpegCommandRunner, FfmpegDiagnosticMonitor
from .retry_with_generated_pts import RetryWithGeneratedPtsRemedy
from .state import (
getDiagnosticsState,
getUnremediedIssues,
iterUnremediedIssueSummaryLines,
recordUnremediedIssue,
)
from .warn_corrupt_mpeg_audio import WarnCorruptMpegAudioRemedy
__all__ = [
"FfmpegCommandRunner",
"FfmpegDiagnosticMonitor",
"FfmpegRemedy",
"FfmpegRemedyDecision",
"FfmpegSkipFileWarning",
"RetryWithGeneratedPtsRemedy",
"WarnCorruptMpegAudioRemedy",
"getDiagnosticsState",
"getUnremediedIssues",
"iterUnremediedIssueSummaryLines",
"recordUnremediedIssue",
]

View File

@@ -0,0 +1,33 @@
from __future__ import annotations
from dataclasses import dataclass
class FfmpegSkipFileWarning(Exception):
pass
@dataclass(frozen=True)
class FfmpegRemedyDecision:
stop_process: bool = False
retry_input_tokens: tuple[str, ...] = ()
skip_file: bool = False
console_warning: str = ""
summary_identifier: str = ""
unremedied_issue_identifier: str = ""
@property
def retry_requested(self) -> bool:
return bool(self.retry_input_tokens)
class FfmpegRemedy:
identifier = "ffmpeg-remedy"
harmless = False
def inspect_line(
self,
line: str,
session: "FfmpegDiagnosticMonitor",
) -> FfmpegRemedyDecision | None:
raise NotImplementedError

View File

@@ -0,0 +1,222 @@
from __future__ import annotations
import re
from ffx.logging_utils import get_ffx_logger
from ffx.process import executeProcess
from .base import FfmpegSkipFileWarning, FfmpegRemedy
from .retry_with_generated_pts import RetryWithGeneratedPtsRemedy
from .state import recordUnremediedIssue
from .warn_corrupt_mpeg_audio import WarnCorruptMpegAudioRemedy
UNHANDLED_DIAGNOSTIC_PATTERNS = (
re.compile(r"\bwarning\b", re.IGNORECASE),
re.compile(r"\berror\b", re.IGNORECASE),
re.compile(r"\bfailed\b", re.IGNORECASE),
re.compile(r"\binvalid\b", re.IGNORECASE),
re.compile(r"\bmissing\b", re.IGNORECASE),
re.compile(r"\bcorrupt\b", re.IGNORECASE),
re.compile(r"\boverflow\b", re.IGNORECASE),
re.compile(r"\bdeprecated\b", re.IGNORECASE),
)
class FfmpegDiagnosticMonitor:
def __init__(
self,
context: dict | None,
command_sequence: list[str],
*,
remedies: list[FfmpegRemedy] | None = None,
emittedWarnings: set[str] | None = None,
):
self.context = context or {}
self.command_sequence = list(command_sequence)
self.logger = self.context.get("logger", get_ffx_logger())
self.source_path = str(self.context.get("current_source_path", "")).strip()
self.remedies = remedies or [
RetryWithGeneratedPtsRemedy(),
WarnCorruptMpegAudioRemedy(),
]
self._emittedWarnings = emittedWarnings if emittedWarnings is not None else set()
self.retry_input_tokens: tuple[str, ...] = ()
self.skip_file = False
self.skip_file_message = ""
def describe_source(self) -> str:
return self.source_path if self.source_path else "current file"
def command_contains_tokens(self, tokens: tuple[str, ...]) -> bool:
tokenCount = len(tokens)
if tokenCount == 0:
return True
return any(
tuple(self.command_sequence[index:index + tokenCount]) == tuple(tokens)
for index in range(len(self.command_sequence) - tokenCount + 1)
)
def emitConsoleWarning(self, warningMessage: str) -> None:
if warningMessage and warningMessage not in self._emittedWarnings:
self.logger.warning(warningMessage)
self._emittedWarnings.add(warningMessage)
def recordUnremediedIssue(self, issueIdentifier: str, issueLine: str) -> None:
isFirstIssueForFile = recordUnremediedIssue(
self.context,
self.describe_source(),
issueIdentifier,
)
if not isFirstIssueForFile:
return
self.emitConsoleWarning(
f"ffmpeg reported a diagnostic with no automatic remedy while converting "
+ f"{self.describe_source()}. FFX will continue, but review the output "
+ f"file. First unhandled line: {issueLine}"
)
def lineLooksLikeUnhandledDiagnostic(self, line: str) -> bool:
return any(pattern.search(line) for pattern in UNHANDLED_DIAGNOSTIC_PATTERNS)
def getUnhandledDiagnosticIdentifier(self, line: str) -> str:
loweredLine = str(line).lower()
if any(token in loweredLine for token in ("error", "failed", "invalid", "missing", "corrupt", "overflow")):
return "unhandled-error"
if any(token in loweredLine for token in ("warning", "deprecated")):
return "unhandled-warning"
return "unhandled-diagnostic"
def getSummaryIdentifier(
self,
remedy: FfmpegRemedy,
decision,
) -> str:
explicitIdentifier = str(decision.summary_identifier).strip()
if explicitIdentifier:
return explicitIdentifier
remedyIdentifier = str(getattr(remedy, "identifier", "")).strip()
if remedyIdentifier and remedyIdentifier != FfmpegRemedy.identifier:
return remedyIdentifier
return str(decision.unremedied_issue_identifier).strip()
def shouldRecordSummary(
self,
remedy: FfmpegRemedy,
decision,
) -> bool:
if getattr(remedy, "harmless", False):
return False
if decision.retry_requested and not decision.skip_file:
return False
return bool(self.getSummaryIdentifier(remedy, decision))
def handle_stderr_line(self, line: str) -> bool:
strippedLine = str(line).strip()
if not strippedLine:
return False
for remedy in self.remedies:
decision = remedy.inspect_line(strippedLine, self)
if decision is None:
continue
self.emitConsoleWarning(decision.console_warning)
if decision.retry_requested:
self.retry_input_tokens = tuple(decision.retry_input_tokens)
if self.shouldRecordSummary(remedy, decision):
recordUnremediedIssue(
self.context,
self.describe_source(),
self.getSummaryIdentifier(remedy, decision),
)
if decision.skip_file:
self.skip_file = True
self.skip_file_message = (
decision.console_warning
or f"Skipping file {self.describe_source()} because ffmpeg reported a fatal diagnostic."
)
return bool(decision.stop_process)
if self.lineLooksLikeUnhandledDiagnostic(strippedLine):
self.recordUnremediedIssue(
self.getUnhandledDiagnosticIdentifier(strippedLine),
strippedLine,
)
return False
@property
def retry_requested(self) -> bool:
return bool(self.retry_input_tokens)
def insertFfmpegInputOptions(
commandSequence: list[str],
extraTokens: tuple[str, ...],
) -> list[str]:
if not extraTokens:
return list(commandSequence)
if not commandSequence:
return list(extraTokens)
return [commandSequence[0]] + list(extraTokens) + list(commandSequence[1:])
class FfmpegCommandRunner:
def __init__(
self,
context: dict | None,
*,
remedies: list[FfmpegRemedy] | None = None,
):
self.__context = context or {}
self.__remedies = remedies
def execute(
self,
commandSequence: list[str],
*,
directory: str = None,
timeoutSeconds: float = None,
):
emittedWarnings: set[str] = set()
attemptCommandSequence = list(commandSequence)
while True:
monitor = FfmpegDiagnosticMonitor(
self.__context,
attemptCommandSequence,
remedies=self.__remedies,
emittedWarnings=emittedWarnings,
)
out, err, rc = executeProcess(
attemptCommandSequence,
directory=directory,
context=self.__context,
timeoutSeconds=timeoutSeconds,
stderrLineHandler=monitor.handle_stderr_line,
)
if monitor.retry_requested:
attemptCommandSequence = insertFfmpegInputOptions(
attemptCommandSequence,
monitor.retry_input_tokens,
)
continue
if monitor.skip_file:
raise FfmpegSkipFileWarning(monitor.skip_file_message)
return out, err, rc

View File

@@ -0,0 +1,41 @@
from __future__ import annotations
import re
from .base import FfmpegRemedy, FfmpegRemedyDecision
class RetryWithGeneratedPtsRemedy(FfmpegRemedy):
identifier = "retry-with-generated-pts"
RETRY_INPUT_TOKENS = ("-fflags", "+genpts")
TIMESTAMP_UNSET_PATTERN = re.compile(
r"Timestamps are unset in a packet for stream \d+"
)
def inspect_line(
self,
line: str,
session: "FfmpegDiagnosticMonitor",
) -> FfmpegRemedyDecision | None:
if self.TIMESTAMP_UNSET_PATTERN.search(line) is None:
return None
if session.command_contains_tokens(self.RETRY_INPUT_TOKENS):
return FfmpegRemedyDecision(
stop_process=True,
skip_file=True,
console_warning=(
f"Skipping file {session.describe_source()}: ffmpeg still reported "
+ "unset packet timestamps after retry with -fflags +genpts."
),
unremedied_issue_identifier="timestamp-unset-after-genpts",
)
return FfmpegRemedyDecision(
stop_process=True,
retry_input_tokens=self.RETRY_INPUT_TOKENS,
console_warning=(
f"ffmpeg reported unset packet timestamps for {session.describe_source()}. "
+ "Stopping early and retrying with -fflags +genpts."
),
)

View File

@@ -0,0 +1,53 @@
from __future__ import annotations
import os
DIAGNOSTICS_STATE_KEY = "diagnostics_state"
UNREMEDIED_ISSUES_KEY = "unremedied_issues"
def getDiagnosticsState(context: dict | None) -> dict:
if context is None:
return {UNREMEDIED_ISSUES_KEY: {}}
if DIAGNOSTICS_STATE_KEY not in context:
context[DIAGNOSTICS_STATE_KEY] = {
UNREMEDIED_ISSUES_KEY: {},
}
return context[DIAGNOSTICS_STATE_KEY]
def recordUnremediedIssue(
context: dict | None,
sourcePath: str,
identifier: str,
) -> bool:
if not sourcePath:
return False
diagnosticsState = getDiagnosticsState(context)
unremediedIssues = diagnosticsState[UNREMEDIED_ISSUES_KEY]
issueList = unremediedIssues.setdefault(sourcePath, [])
strippedIdentifier = str(identifier).strip()
if not strippedIdentifier or strippedIdentifier in issueList:
return False
issueList.append(strippedIdentifier)
return True
def getUnremediedIssues(context: dict | None) -> dict[str, list[str]]:
diagnosticsState = getDiagnosticsState(context)
return diagnosticsState.get(UNREMEDIED_ISSUES_KEY, {})
def iterUnremediedIssueSummaryLines(context: dict | None) -> list[str]:
summaryLines = []
unremediedIssues = getUnremediedIssues(context)
for sourcePath in sorted(unremediedIssues.keys()):
identifiers = unremediedIssues[sourcePath]
summaryLines.append(f"{os.path.basename(sourcePath)}: {', '.join(identifiers)}")
return summaryLines

View File

@@ -0,0 +1,35 @@
from __future__ import annotations
import re
from .base import FfmpegRemedy, FfmpegRemedyDecision
class WarnCorruptMpegAudioRemedy(FfmpegRemedy):
identifier = "warn-corrupt-mpeg-audio"
PATTERNS = (
re.compile(r"\[mp3float @ .*\] invalid block type", re.IGNORECASE),
re.compile(r"\[mp3float @ .*\] invalid new backstep -?\d+", re.IGNORECASE),
re.compile(r"\[mp3float @ .*\] Header missing"),
re.compile(r"\[mp3float @ .*\] overread, skip ", re.IGNORECASE),
re.compile(r"Error while decoding MPEG audio frame\."),
re.compile(
r"Error submitting packet to decoder: Invalid data found when processing input"
),
)
def inspect_line(
self,
line: str,
session: "FfmpegDiagnosticMonitor",
) -> FfmpegRemedyDecision | None:
if not any(pattern.search(line) for pattern in self.PATTERNS):
return None
return FfmpegRemedyDecision(
console_warning=(
f"ffmpeg reported damaged MPEG audio frames while converting "
+ f"{session.describe_source()}. FFX will continue, but the output "
+ "audio may contain gaps or glitches."
),
)

View File

@@ -0,0 +1,27 @@
from .diagnostics import (
FfmpegCommandRunner,
FfmpegDiagnosticMonitor,
FfmpegRemedy,
FfmpegRemedyDecision,
FfmpegSkipFileWarning,
RetryWithGeneratedPtsRemedy,
WarnCorruptMpegAudioRemedy,
getDiagnosticsState,
getUnremediedIssues,
iterUnremediedIssueSummaryLines,
recordUnremediedIssue,
)
__all__ = [
"FfmpegCommandRunner",
"FfmpegDiagnosticMonitor",
"FfmpegRemedy",
"FfmpegRemedyDecision",
"FfmpegSkipFileWarning",
"RetryWithGeneratedPtsRemedy",
"WarnCorruptMpegAudioRemedy",
"getDiagnosticsState",
"getUnremediedIssues",
"iterUnremediedIssueSummaryLines",
"recordUnremediedIssue",
]

View File

@@ -3,6 +3,7 @@ from functools import lru_cache
from logging import Logger
from ffx.media_descriptor_change_set import MediaDescriptorChangeSet
from ffx.diagnostics import FfmpegCommandRunner
from ffx.media_descriptor import MediaDescriptor
from ffx.audio_layout import AudioLayout
@@ -63,6 +64,7 @@ class FfxController():
self.__logger: Logger = context['logger']
self.__warnedH264Fallback = False
self.__ffmpegCommandRunner = FfmpegCommandRunner(context)
@staticmethod
@@ -100,6 +102,12 @@ class FfxController():
def executeCommandSequence(self, commandSequence):
if commandSequence and str(commandSequence[0]).strip() == "ffmpeg":
out, err, rc = self.__ffmpegCommandRunner.execute(
commandSequence,
timeoutSeconds=None,
)
else:
out, err, rc = executeProcess(commandSequence, context=self.__context)
if rc:
raise click.ClickException(f"Command resulted in error: rc={rc} error={err}")
@@ -172,6 +180,16 @@ class FfxController():
def generateAudioCopyTokens(self, subIndex):
return [f"-c:a:{int(subIndex)}", 'copy']
def generateVideoCopyAllTokens(self):
if self.__targetMediaDescriptor.getTrackDescriptors(trackType=TrackType.VIDEO):
return ["-c:v", "copy"]
return []
def generateAudioCopyAllTokens(self):
if self.__targetMediaDescriptor.getTrackDescriptors(trackType=TrackType.AUDIO):
return ["-c:a", "copy"]
return []
def generateSubtitleCopyTokens(self, subIndex):
return [f"-c:s:{int(subIndex)}", 'copy']
@@ -292,6 +310,12 @@ class FfxController():
return audioTokens
def generateAudioProcessingTokens(self):
if self.__context.get('copy_audio', False):
return self.generateAudioCopyAllTokens()
return self.generateAudioEncodingTokens()
def runJob(self,
sourcePath,
targetPath,
@@ -305,6 +329,8 @@ class FfxController():
videoEncoder: VideoEncoder = self.__context.get('video_encoder', VideoEncoder.VP9)
self.__context['current_source_path'] = sourcePath
copyVideo = self.__context.get('copy_video', False) or videoEncoder == VideoEncoder.COPY
qualityFilters = [fy for fy in chainIteration if fy['identifier'] == 'quality']
@@ -315,6 +341,10 @@ class FfxController():
deinterlaceFilters = [fy for fy in chainIteration if fy['identifier'] == 'bwdif']
if copyVideo:
quality = None
self.__context['encoding_metadata_tags'] = {}
else:
if qualityFilters and (quality := qualityFilters[0]['parameters']['quality']):
self.__logger.info(f"Setting quality {quality} from command line")
elif currentPattern is not None and (quality := currentPattern.quality):
@@ -329,6 +359,7 @@ class FfxController():
preset = presetFilters[0]['parameters']['preset'] if presetFilters else PresetFilter.DEFAULT_PRESET
if not copyVideo:
self.__context['encoding_metadata_tags'] = self.generateEncodingMetadataTags(
videoEncoder,
quality,
@@ -338,7 +369,7 @@ class FfxController():
filterParamTokens = []
if cropArguments:
if cropArguments and not copyVideo:
cropParams = (f"crop="
+ f"{cropArguments[CropFilter.OUTPUT_WIDTH_KEY]}"
@@ -348,6 +379,7 @@ class FfxController():
filterParamTokens.append(cropParams)
if not copyVideo:
filterParamTokens.extend(denoiseFilters[0]['tokens'] if denoiseFilters else [])
filterParamTokens.extend(deinterlaceFilters[0]['tokens'] if deinterlaceFilters else [])
@@ -380,6 +412,29 @@ class FfxController():
self.executeCommandSequence(commandSequence)
return
if copyVideo:
commandSequence = (commandTokens
+ self.__targetMediaDescriptor.getImportFileTokens()
+ self.__targetMediaDescriptor.getInputMappingTokens(sourceMediaDescriptor = self.__sourceMediaDescriptor)
+ self.__mdcs.generateDispositionTokens())
commandSequence += self.__mdcs.generateMetadataTokens()
commandSequence += self.generateVideoCopyAllTokens()
commandSequence += self.generateAudioProcessingTokens()
if self.__context['perform_cut']:
commandSequence += self.generateCropTokens()
commandSequence += self.generateOutputTokens(targetPath,
targetFormat)
self.__logger.debug("FfxController.runJob(): Running command sequence")
if not self.__context['dry_run']:
self.executeCommandSequence(commandSequence)
return
if videoEncoder == VideoEncoder.AV1:
commandSequence = (commandTokens
@@ -396,7 +451,7 @@ class FfxController():
if td.getCodec != TrackCodec.PNG:
commandSequence += self.generateAV1Tokens(int(quality), int(preset))
commandSequence += self.generateAudioEncodingTokens()
commandSequence += self.generateAudioProcessingTokens()
if self.__context['perform_cut']:
commandSequence += self.generateCropTokens()
@@ -426,7 +481,7 @@ class FfxController():
if td.getCodec != TrackCodec.PNG:
commandSequence += self.generateH264Tokens(int(quality))
commandSequence += self.generateAudioEncodingTokens()
commandSequence += self.generateAudioProcessingTokens()
if self.__context['perform_cut']:
commandSequence += self.generateCropTokens()
@@ -485,7 +540,7 @@ class FfxController():
if td.getCodec != TrackCodec.PNG:
commandSequence2 += self.generateVP9Pass2Tokens(int(quality))
commandSequence2 += self.generateAudioEncodingTokens()
commandSequence2 += self.generateAudioProcessingTokens()
if self.__context['perform_cut']:
commandSequence2 += self.generateCropTokens()

View File

@@ -2,6 +2,7 @@ import os, re, click
from typing import List, Self
from ffx.attachment_format import AttachmentFormat
from ffx.track_type import TrackType
from ffx.iso_language import IsoLanguage
@@ -328,6 +329,96 @@ class MediaDescriptor:
if s.getType() == TrackType.ATTACHMENT
]
def hasStyledAssSubtitlesWithFontAttachments(self) -> bool:
return (
any(
trackDescriptor.getCodec() == TrackCodec.ASS
for trackDescriptor in self.getSubtitleTracks()
)
and any(
trackDescriptor.getAttachmentFormat() == AttachmentFormat.TTF
for trackDescriptor in self.getAttachmentTracks()
)
)
def withoutAttachmentTracks(
self,
attachmentFormat: AttachmentFormat | None = None,
context: dict | None = None,
):
filteredTrackDescriptors = []
for trackDescriptor in self.__trackDescriptors:
if trackDescriptor.getType() == TrackType.ATTACHMENT and (
attachmentFormat is None
or trackDescriptor.getAttachmentFormat() == attachmentFormat
):
continue
filteredTrackDescriptors.append(
trackDescriptor.clone(
context=context if context is not None else self.__context
)
)
kwargs = {
MediaDescriptor.TAGS_KEY: dict(self.__mediaTags),
MediaDescriptor.TRACK_DESCRIPTOR_LIST_KEY: filteredTrackDescriptors,
}
if context is not None:
kwargs[MediaDescriptor.CONTEXT_KEY] = context
elif self.__context:
kwargs[MediaDescriptor.CONTEXT_KEY] = self.__context
filteredMediaDescriptor = MediaDescriptor(**kwargs)
filteredMediaDescriptor.reindexSubIndices()
return filteredMediaDescriptor
def withoutAttachmentsForComparison(self):
return self.withoutAttachmentTracks(context=self.__context)
def withSourceAttachmentTracks(
self,
sourceMediaDescriptor: Self,
attachmentFormat: AttachmentFormat | None = None,
context: dict | None = None,
):
trackDescriptors = []
for trackDescriptor in self.__trackDescriptors:
if trackDescriptor.getType() == TrackType.ATTACHMENT and (
attachmentFormat is None
or trackDescriptor.getAttachmentFormat() == attachmentFormat
):
continue
trackDescriptors.append(
trackDescriptor.clone(
context=context if context is not None else self.__context
)
)
for sourceTrackDescriptor in sourceMediaDescriptor.getAttachmentTracks():
if (
attachmentFormat is not None
and sourceTrackDescriptor.getAttachmentFormat() != attachmentFormat
):
continue
attachmentClone = sourceTrackDescriptor.clone(
context=context if context is not None else self.__context
)
attachmentClone.setIndex(len(trackDescriptors))
trackDescriptors.append(attachmentClone)
kwargs = {
MediaDescriptor.TAGS_KEY: dict(self.__mediaTags),
MediaDescriptor.TRACK_DESCRIPTOR_LIST_KEY: trackDescriptors,
}
if context is not None:
kwargs[MediaDescriptor.CONTEXT_KEY] = context
elif self.__context:
kwargs[MediaDescriptor.CONTEXT_KEY] = self.__context
mergedMediaDescriptor = MediaDescriptor(**kwargs)
mergedMediaDescriptor.reindexSubIndices()
return mergedMediaDescriptor
def getImportFileTokens(self, use_sub_index: bool = True):
"""Generate ffmpeg import options for external stream files"""
@@ -421,11 +512,11 @@ class MediaDescriptor:
if sourceMediaDescriptor:
fontDescriptors = [ftd for ftd in sourceMediaDescriptor.getAttachmentTracks()
if ftd.getCodec() == TrackCodec.TTF]
if ftd.getAttachmentFormat() == AttachmentFormat.TTF]
else:
fontDescriptors = [ftd for ftd in self.__trackDescriptors
if ftd.getType() == TrackType.ATTACHMENT
and ftd.getCodec() == TrackCodec.TTF]
and ftd.getAttachmentFormat() == AttachmentFormat.TTF]
for ad in sorted(fontDescriptors, key=lambda d: d.getIndex()):
inputMappingTokens += ["-map", f"0:{ad.getIndex()}"]

View File

@@ -56,8 +56,24 @@ class MediaDescriptorChangeSet():
and 'ignore' in metadataConfiguration['streams'].keys() else [])
self.__targetTrackDescriptors = targetMediaDescriptor.getTrackDescriptors() if targetMediaDescriptor is not None else []
self.__sourceTrackDescriptors = sourceMediaDescriptor.getTrackDescriptors() if sourceMediaDescriptor is not None else []
self.__targetTrackDescriptors = (
[
trackDescriptor
for trackDescriptor in targetMediaDescriptor.getTrackDescriptors()
if trackDescriptor.getType() != TrackType.ATTACHMENT
]
if targetMediaDescriptor is not None
else []
)
self.__sourceTrackDescriptors = (
[
trackDescriptor
for trackDescriptor in sourceMediaDescriptor.getTrackDescriptors()
if trackDescriptor.getType() != TrackType.ATTACHMENT
]
if sourceMediaDescriptor is not None
else []
)
self.__targetTrackDescriptorsByIndex = {
trackDescriptor.getIndex(): trackDescriptor
for trackDescriptor in self.__targetTrackDescriptors

View File

@@ -6,6 +6,7 @@ from textual.screen import Screen
from textual.widgets import DataTable
from textual.widgets._data_table import CellDoesNotExist
from ffx.attachment_format import AttachmentFormat
from ffx.audio_layout import AudioLayout
from ffx.file_properties import FileProperties
from ffx.helper import DIFF_ADDED_KEY, DIFF_CHANGED_KEY, DIFF_REMOVED_KEY
@@ -125,6 +126,32 @@ class MediaWorkflowScreenBase(Screen):
add_auto_table_column(self.differencesTable, t(self.DIFFERENCES_COLUMN_LABEL))
self.differencesTable.cursor_type = "row"
def _track_codec_cell_value(self, trackDescriptor: TrackDescriptor) -> str:
if trackDescriptor.getType() == TrackType.ATTACHMENT:
attachmentFormat = trackDescriptor.getAttachmentFormat()
if attachmentFormat == AttachmentFormat.UNKNOWN:
return attachmentFormat.identifier()
return attachmentFormat.label()
return trackDescriptor.getFormatDescriptor().label()
def _track_language_cell_value(self, trackDescriptor: TrackDescriptor) -> str:
if trackDescriptor.getType() == TrackType.ATTACHMENT:
return " "
return trackDescriptor.getLanguage().label()
def _track_disposition_cell_value(
self,
trackDescriptor: TrackDescriptor,
disposition: TrackDisposition,
) -> str:
if trackDescriptor.getType() == TrackType.ATTACHMENT:
return " "
return (
t("Yes")
if disposition in trackDescriptor.getDispositionSet()
else t("No")
)
def reloadProperties(self, reset_draft: bool = True):
self._mediaFileProperties = FileProperties(self.context, self._mediaFilename)
probedMediaDescriptor = self._mediaFileProperties.getMediaDescriptor()
@@ -139,10 +166,9 @@ class MediaWorkflowScreenBase(Screen):
self._baselineMediaDescriptor = probedMediaDescriptor
self._sourceMediaDescriptor = probedMediaDescriptor
self._currentPattern = self._mediaFileProperties.getPattern()
self._targetMediaDescriptor = (
self._currentPattern.getMediaDescriptor(self.context)
if self._currentPattern is not None
else None
self._targetMediaDescriptor = self._resolve_target_media_descriptor(
self._currentPattern,
self._sourceMediaDescriptor,
)
self.rebuildChangeSet()
@@ -178,6 +204,25 @@ class MediaWorkflowScreenBase(Screen):
def getTrackEditSourceDescriptor(self) -> TrackDescriptor | None:
return self.getSelectedTrackDescriptor()
def _resolve_target_media_descriptor(self, currentPattern, sourceMediaDescriptor):
if currentPattern is None:
return None
targetMediaDescriptor = currentPattern.getMediaDescriptor(self.context)
styledAssDetector = getattr(
sourceMediaDescriptor,
"hasStyledAssSubtitlesWithFontAttachments",
None,
)
if callable(styledAssDetector) and styledAssDetector():
targetMediaDescriptor = targetMediaDescriptor.withSourceAttachmentTracks(
sourceMediaDescriptor,
AttachmentFormat.TTF,
context=self.context,
)
return targetMediaDescriptor
def updateMediaTags(self):
displayedMediaDescriptor = self.getDisplayedMediaDescriptor()
self._sourceMediaTagRowData = populate_tag_table(
@@ -221,15 +266,21 @@ class MediaWorkflowScreenBase(Screen):
trackDescriptor.getIndex(),
t(trackType.label()),
typeCounter[trackType],
trackDescriptor.getCodec().label(),
self._track_codec_cell_value(trackDescriptor),
t(audioLayout.label())
if trackType == TrackType.AUDIO
and audioLayout != AudioLayout.LAYOUT_UNDEFINED
else " ",
trackDescriptor.getLanguage().label(),
self._track_language_cell_value(trackDescriptor),
trackTitle,
t("Yes") if TrackDisposition.DEFAULT in dispositionSet else t("No"),
t("Yes") if TrackDisposition.FORCED in dispositionSet else t("No"),
self._track_disposition_cell_value(
trackDescriptor,
TrackDisposition.DEFAULT,
),
self._track_disposition_cell_value(
trackDescriptor,
TrackDisposition.FORCED,
),
)
row_key = self.tracksTable.add_row(*map(str, row))

View File

@@ -7,6 +7,7 @@ from .show import Base, Show
from ffx.media_descriptor import MediaDescriptor
from ffx.show_descriptor import ShowDescriptor
from ffx.track_type import TrackType
class Pattern(Base):
@@ -76,6 +77,8 @@ class Pattern(Base):
subIndexCounter = {}
for track in self.tracks:
trackType = track.getType()
if trackType == TrackType.ATTACHMENT:
continue
if not trackType in subIndexCounter.keys():
subIndexCounter[trackType] = 0
kwargs[MediaDescriptor.TRACK_DESCRIPTOR_LIST_KEY].append(track.getDescriptor(context, subIndex = subIndexCounter[trackType]))

View File

@@ -4,6 +4,7 @@ from sqlalchemy.orm import relationship, declarative_base, sessionmaker
from .show import Base
from ffx.attachment_format import AttachmentFormat
from ffx.track_type import TrackType
from ffx.iso_language import IsoLanguage
@@ -132,9 +133,16 @@ class Track(Base):
if trackType in [t.label() for t in TrackType]:
if trackType == TrackType.ATTACHMENT.label():
storedFormatIdentifier = AttachmentFormat.identifyFfprobeStream(streamObj).identifier()
else:
storedFormatIdentifier = TrackCodec.identify(
streamObj.get(TrackDescriptor.FFPROBE_CODEC_KEY)
).identifier()
return cls(pattern_id = patternId,
track_type = trackType,
codec_name = streamObj[TrackDescriptor.FFPROBE_CODEC_NAME_KEY],
codec_name = storedFormatIdentifier,
disposition_flags = sum([2**t.index() for (k,v) in streamObj[TrackDescriptor.FFPROBE_DISPOSITION_KEY].items()
if v and (t := TrackDisposition.find(k)) is not None]),
audio_layout = AudioLayout.identify(streamObj))
@@ -153,8 +161,20 @@ class Track(Base):
return TrackType.fromIndex(self.track_type)
def getCodec(self) -> TrackCodec:
if self.getType() == TrackType.ATTACHMENT:
return TrackCodec.UNKNOWN
return TrackCodec.identify(self.codec_name)
def getAttachmentFormat(self) -> AttachmentFormat:
if self.getType() != TrackType.ATTACHMENT:
return AttachmentFormat.UNKNOWN
return AttachmentFormat.identify(self.codec_name)
def getFormatDescriptor(self):
if self.getType() == TrackType.ATTACHMENT:
return self.getAttachmentFormat()
return self.getCodec()
def getIndex(self):
return int(self.index) if self.index is not None else -1
@@ -206,6 +226,9 @@ class Track(Base):
kwargs[TrackDescriptor.SUB_INDEX_KEY] = subIndex
kwargs[TrackDescriptor.TRACK_TYPE_KEY] = self.getType()
if self.getType() == TrackType.ATTACHMENT:
kwargs[TrackDescriptor.ATTACHMENT_FORMAT_KEY] = self.getAttachmentFormat()
else:
kwargs[TrackDescriptor.CODEC_KEY] = self.getCodec()
kwargs[TrackDescriptor.DISPOSITION_SET_KEY] = self.getDispositionSet()

View File

@@ -8,6 +8,7 @@ from ffx.model.track import Track
from ffx.model.track_tag import TrackTag
from ffx.track_descriptor import TrackDescriptor
from ffx.track_disposition import TrackDisposition
from ffx.track_type import TrackType
class DuplicatePatternMatchError(click.ClickException):
@@ -86,12 +87,16 @@ class PatternController:
)
normalized_descriptors = []
filtered_attachments = False
for trackDescriptor in trackDescriptors:
if type(trackDescriptor) is not TrackDescriptor:
raise TypeError(
"PatternController: All track descriptors are required to be of type TrackDescriptor"
)
normalized_descriptors.append(trackDescriptor)
if trackDescriptor.getType() == TrackType.ATTACHMENT:
filtered_attachments = True
continue
normalized_descriptors.append(trackDescriptor.clone())
if not normalized_descriptors:
raise InvalidPatternSchemaError(
@@ -102,6 +107,10 @@ class PatternController:
normalized_descriptors, key=lambda descriptor: descriptor.getIndex()
)
if filtered_attachments:
for index, descriptor in enumerate(normalized_descriptors):
descriptor.setIndex(index)
index_set = {descriptor.getIndex() for descriptor in normalized_descriptors}
expected_indexes = set(range(len(normalized_descriptors)))
if index_set != expected_indexes:
@@ -134,7 +143,7 @@ class PatternController:
def _build_track_row(self, trackDescriptor: TrackDescriptor) -> Track:
track = Track(
track_type=int(trackDescriptor.getType().index()),
codec_name=str(trackDescriptor.getCodec().identifier()),
codec_name=str(trackDescriptor.getFormatDescriptor().identifier()),
index=int(trackDescriptor.getIndex()),
source_index=int(trackDescriptor.getSourceIndex()),
disposition_flags=int(
@@ -170,7 +179,7 @@ class PatternController:
pattern.tracks.append(self._build_track_row(trackDescriptor))
def _validate_persisted_pattern(self, pattern: Pattern):
if not pattern.tracks:
if not any(track.getType() != TrackType.ATTACHMENT for track in pattern.tracks):
raise InvalidPatternSchemaError(
f"Pattern #{pattern.getId()} ({pattern.getPattern()!r}) is invalid because it has no tracks."
)

View File

@@ -88,6 +88,9 @@ class PatternDetailsScreen(Screen):
.three {
column-span: 3;
}
.two {
column-span: 2;
}
.four {
column-span: 4;
@@ -114,7 +117,7 @@ class PatternDetailsScreen(Screen):
}
.yellow {
tint: yellow 40%;
color: yellow;
}
"""
@@ -175,7 +178,7 @@ class PatternDetailsScreen(Screen):
row = (td.getIndex(),
t(trackType.label()),
typeCounter[trackType],
td.getCodec().label(),
td.getFormatDescriptor().label(),
t(audioLayout.label()) if trackType == TrackType.AUDIO
and audioLayout != AudioLayout.LAYOUT_UNDEFINED else ' ',
trackLanguage.label() if trackLanguage != IsoLanguage.UNDEFINED else ' ',
@@ -331,6 +334,7 @@ class PatternDetailsScreen(Screen):
if not self.__showDescriptor is None:
self.query_one("#showlabel", Static).update(f"{self.__showDescriptor.getId()} - {self.__showDescriptor.getName()} ({self.__showDescriptor.getYear()})")
self.updateShowQualityHint()
if self.__pattern is not None:
@@ -350,6 +354,7 @@ class PatternDetailsScreen(Screen):
if not hasattr(self, "tracksTable") or not hasattr(self, "tagsTable"):
return
self.updateShowQualityHint()
self.updateTags()
self.updateTracks()
@@ -415,7 +420,9 @@ class PatternDetailsScreen(Screen):
# Row 4
yield Static(t("Quality"))
yield Input(type="integer", id="quality_input")
yield Static(' ', classes="five")
yield Static(" ")
yield Static("", id="show_quality_hint", classes="two yellow")
yield Static(' ', classes="two")
# Row 5
@@ -504,6 +511,23 @@ class PatternDetailsScreen(Screen):
def getPatternFromInput(self):
return str(self.query_one("#pattern_input", Input).value)
def getShowQualityHintText(self):
if self.__showDescriptor is None:
return ""
showQuality = int(self.__showDescriptor.getQuality() or 0)
if showQuality <= 0:
return ""
patternQuality = int(getattr(self.__pattern, "quality", 0) or 0)
if patternQuality > 0:
return ""
return f"{t('Show')}: {showQuality}"
def updateShowQualityHint(self):
self.query_one("#show_quality_hint", Static).update(self.getShowQualityHintText())
def getQualityFromInput(self):
try:
return int(self.query_one("#quality_input", Input).value)

View File

@@ -1,7 +1,10 @@
import os
import shlex
import signal
import subprocess
from typing import Iterable, List
import threading
import time
from typing import Callable, Iterable, List
from .logging_utils import get_ffx_logger
@@ -118,6 +121,8 @@ def executeProcess(
directory: str = None,
context: dict = None,
timeoutSeconds: float = None,
stdoutLineHandler: Callable[[str], bool] | None = None,
stderrLineHandler: Callable[[str], bool] | None = None,
):
logger = context['logger'] if context is not None and 'logger' in context else get_ffx_logger()
@@ -131,6 +136,16 @@ def executeProcess(
formatCommandSequence(wrappedCommandSequence),
)
if stdoutLineHandler is not None or stderrLineHandler is not None:
return executeStreamingProcess(
wrappedCommandSequence,
directory=directory,
logger=logger,
timeoutSeconds=timeoutSeconds,
stdoutLineHandler=stdoutLineHandler,
stderrLineHandler=stderrLineHandler,
)
try:
completed = subprocess.run(
wrappedCommandSequence,
@@ -167,3 +182,162 @@ def executeProcess(
)
return completed.stdout, completed.stderr, completed.returncode
def terminateProcess(process: subprocess.Popen, *, killAfterSeconds: float = 1.0) -> None:
if process.poll() is not None:
return
try:
if hasattr(os, "killpg"):
os.killpg(process.pid, signal.SIGTERM)
else:
process.terminate()
except ProcessLookupError:
return
deadline = time.monotonic() + killAfterSeconds
while process.poll() is None and time.monotonic() < deadline:
time.sleep(0.05)
if process.poll() is not None:
return
try:
if hasattr(os, "killpg"):
os.killpg(process.pid, signal.SIGKILL)
else:
process.kill()
except ProcessLookupError:
return
def readProcessStream(
stream,
outputParts: list[str],
lineHandler: Callable[[str], bool] | None,
stopRequested: threading.Event,
logger,
) -> None:
try:
for line in iter(stream.readline, ''):
outputParts.append(line)
if lineHandler is None:
continue
try:
if lineHandler(line):
stopRequested.set()
except Exception:
logger.exception("Process line handler raised an exception")
finally:
stream.close()
def executeStreamingProcess(
commandSequence: List[str],
*,
directory: str = None,
logger = None,
timeoutSeconds: float = None,
stdoutLineHandler: Callable[[str], bool] | None = None,
stderrLineHandler: Callable[[str], bool] | None = None,
):
logger = logger or get_ffx_logger()
try:
process = subprocess.Popen(
commandSequence,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
cwd=directory,
bufsize=1,
start_new_session=True,
)
except FileNotFoundError as ex:
error = (
"Command not found while running "
+ f"{formatCommandSequence(commandSequence)}: {ex.filename or ex}"
)
logger.error(error)
return '', error, COMMAND_NOT_FOUND_RETURN_CODE
stdoutParts: list[str] = []
stderrParts: list[str] = []
stopRequested = threading.Event()
timedOut = False
stdoutThread = threading.Thread(
target=readProcessStream,
args=(
process.stdout,
stdoutParts,
stdoutLineHandler,
stopRequested,
logger,
),
daemon=True,
)
stderrThread = threading.Thread(
target=readProcessStream,
args=(
process.stderr,
stderrParts,
stderrLineHandler,
stopRequested,
logger,
),
daemon=True,
)
stdoutThread.start()
stderrThread.start()
deadline = (
time.monotonic() + float(timeoutSeconds)
if timeoutSeconds is not None
else None
)
terminationRequested = False
while process.poll() is None:
if stopRequested.is_set():
terminationRequested = True
terminateProcess(process)
break
if deadline is not None and time.monotonic() >= deadline:
timedOut = True
terminationRequested = True
terminateProcess(process)
break
time.sleep(0.05)
returnCode = process.wait()
stdoutThread.join()
stderrThread.join()
stdout = ''.join(stdoutParts)
stderr = ''.join(stderrParts)
if timedOut:
error = (
f"Command timed out after {timeoutSeconds} seconds while running "
+ formatCommandSequence(commandSequence)
)
if stderr:
error = f"{error}\n{stderr}"
logger.error(error)
return stdout, error, COMMAND_TIMED_OUT_RETURN_CODE
if returnCode != 0 and not terminationRequested:
logger.warning(
"executeProcess() rc=%s command=%s",
returnCode,
formatCommandSequence(commandSequence),
)
return stdout, stderr, returnCode

View File

@@ -19,7 +19,6 @@ class TrackCodec(Enum):
WEBVTT = {'identifier': 'webvtt', 'format': 'webvtt', 'extension': 'vtt' , 'label': 'WebVTT'}
SRT = {'identifier': 'subrip', 'format': 'srt', 'extension': 'srt' , 'label': 'SRT'}
ASS = {'identifier': 'ass', 'format': 'ass', 'extension': 'ass' , 'label': 'ASS'}
TTF = {'identifier': 'ttf', 'format': None, 'extension': 'ttf' , 'label': 'TTF'}
PGS = {'identifier': 'hdmv_pgs_subtitle', 'format': 'sup', 'extension': 'sup' , 'label': 'PGS'}
VOBSUB = {'identifier': 'dvd_subtitle', 'format': None, 'extension': 'mkv' , 'label': 'VobSub'}

View File

@@ -35,6 +35,8 @@ class TrackController():
def addTrack(self, trackDescriptor : TrackDescriptor, patternId = None):
if trackDescriptor.getType() == TrackType.ATTACHMENT:
return False
# option to override pattern id in case track descriptor has not set it
patId = int(trackDescriptor.getPatternId() if patternId is None else patternId)
@@ -43,7 +45,7 @@ class TrackController():
s = self.Session()
track = Track(pattern_id = patId,
track_type = int(trackDescriptor.getType().index()),
codec_name = str(trackDescriptor.getCodec().identifier()),
codec_name = str(trackDescriptor.getFormatDescriptor().identifier()),
index = int(trackDescriptor.getIndex()),
source_index = int(trackDescriptor.getSourceIndex()),
disposition_flags = int(TrackDisposition.toFlags(trackDescriptor.getDispositionSet())),
@@ -72,6 +74,8 @@ class TrackController():
if type(trackDescriptor) is not TrackDescriptor:
raise TypeError('TrackController.updateTrack(): Argument trackDescriptor is required to be of type TrackDescriptor')
if trackDescriptor.getType() == TrackType.ATTACHMENT:
return False
try:
s = self.Session()
@@ -82,7 +86,7 @@ class TrackController():
track.index = int(trackDescriptor.getIndex())
track.track_type = int(trackDescriptor.getType().index())
track.codec_name = str(trackDescriptor.getCodec().identifier())
track.codec_name = str(trackDescriptor.getFormatDescriptor().identifier())
track.audio_layout = int(trackDescriptor.getAudioLayout().index())
track.disposition_flags = int(TrackDisposition.toFlags(trackDescriptor.getDispositionSet()))

View File

@@ -1,5 +1,6 @@
from typing import Self
from .attachment_format import AttachmentFormat
from .iso_language import IsoLanguage
from .track_type import TrackType
from .audio_layout import AudioLayout
@@ -26,6 +27,7 @@ class TrackDescriptor:
TRACK_TYPE_KEY = "track_type"
CODEC_KEY = "codec_name"
ATTACHMENT_FORMAT_KEY = "attachment_format"
AUDIO_LAYOUT_KEY = "audio_layout"
FFPROBE_INDEX_KEY = "index"
@@ -110,15 +112,6 @@ class TrackDescriptor:
else:
self.__trackType = TrackType.UNKNOWN
if TrackDescriptor.CODEC_KEY in kwargs.keys():
if type(kwargs[TrackDescriptor.CODEC_KEY]) is not TrackCodec:
raise TypeError(
f"TrackDesciptor.__init__(): Argument {TrackDescriptor.CODEC_KEY} is required to be of type TrackCodec"
)
self.__trackCodec = kwargs[TrackDescriptor.CODEC_KEY]
else:
self.__trackCodec = TrackCodec.UNKNOWN
if TrackDescriptor.TAGS_KEY in kwargs.keys():
if type(kwargs[TrackDescriptor.TAGS_KEY]) is not dict:
raise TypeError(
@@ -151,6 +144,34 @@ class TrackDescriptor:
else:
self.__audioLayout = AudioLayout.LAYOUT_UNDEFINED
self.__trackCodec = TrackCodec.UNKNOWN
self.__attachmentFormat = AttachmentFormat.UNKNOWN
if self.__trackType == TrackType.ATTACHMENT:
if TrackDescriptor.ATTACHMENT_FORMAT_KEY in kwargs.keys():
if type(kwargs[TrackDescriptor.ATTACHMENT_FORMAT_KEY]) is not AttachmentFormat:
raise TypeError(
f"TrackDesciptor.__init__(): Argument {TrackDescriptor.ATTACHMENT_FORMAT_KEY} is required to be of type AttachmentFormat"
)
self.__attachmentFormat = kwargs[TrackDescriptor.ATTACHMENT_FORMAT_KEY]
elif TrackDescriptor.CODEC_KEY in kwargs.keys():
legacyCodec = kwargs[TrackDescriptor.CODEC_KEY]
if type(legacyCodec) is AttachmentFormat:
self.__attachmentFormat = legacyCodec
elif type(legacyCodec) is TrackCodec:
self.__attachmentFormat = AttachmentFormat.fromTrackCodec(legacyCodec)
else:
raise TypeError(
f"TrackDesciptor.__init__(): Argument {TrackDescriptor.CODEC_KEY} is required to be of type TrackCodec for legacy attachment compatibility"
)
else:
if TrackDescriptor.CODEC_KEY in kwargs.keys():
if type(kwargs[TrackDescriptor.CODEC_KEY]) is not TrackCodec:
raise TypeError(
f"TrackDesciptor.__init__(): Argument {TrackDescriptor.CODEC_KEY} is required to be of type TrackCodec"
)
self.__trackCodec = kwargs[TrackDescriptor.CODEC_KEY]
@classmethod
def fromFfprobe(cls, streamObj, subIndex: int = -1):
"""Processes ffprobe stream data as array with elements according to the following example
@@ -215,7 +236,12 @@ class TrackDescriptor:
kwargs[TrackDescriptor.TRACK_TYPE_KEY] = trackType
kwargs[TrackDescriptor.CODEC_KEY] = TrackCodec.identify(streamObj[TrackDescriptor.FFPROBE_CODEC_KEY])
if trackType == TrackType.ATTACHMENT:
kwargs[TrackDescriptor.ATTACHMENT_FORMAT_KEY] = AttachmentFormat.identifyFfprobeStream(streamObj)
else:
kwargs[TrackDescriptor.CODEC_KEY] = TrackCodec.identify(
streamObj.get(TrackDescriptor.FFPROBE_CODEC_KEY)
)
kwargs[TrackDescriptor.DISPOSITION_SET_KEY] = (
{
@@ -277,6 +303,14 @@ class TrackDescriptor:
def getCodec(self) -> TrackCodec:
return self.__trackCodec
def getAttachmentFormat(self) -> AttachmentFormat:
return self.__attachmentFormat
def getFormatDescriptor(self):
if self.__trackType == TrackType.ATTACHMENT:
return self.__attachmentFormat
return self.__trackCodec
def getLanguage(self):
if "language" in self.__trackTags.keys():
return IsoLanguage.findThreeLetter(self.__trackTags["language"])
@@ -353,12 +387,16 @@ class TrackDescriptor:
TrackDescriptor.SOURCE_INDEX_KEY: int(self.__sourceIndex),
TrackDescriptor.SUB_INDEX_KEY: int(self.__subIndex),
TrackDescriptor.TRACK_TYPE_KEY: self.__trackType,
TrackDescriptor.CODEC_KEY: self.__trackCodec,
TrackDescriptor.TAGS_KEY: dict(self.__trackTags),
TrackDescriptor.DISPOSITION_SET_KEY: set(self.__dispositionSet),
TrackDescriptor.AUDIO_LAYOUT_KEY: self.__audioLayout,
}
if self.__trackType == TrackType.ATTACHMENT:
kwargs[TrackDescriptor.ATTACHMENT_FORMAT_KEY] = self.__attachmentFormat
else:
kwargs[TrackDescriptor.CODEC_KEY] = self.__trackCodec
if context is not None:
kwargs[TrackDescriptor.CONTEXT_KEY] = context
elif self.__context:

View File

@@ -5,6 +5,7 @@ from textual.widgets import Header, Footer, Static, Button, SelectionList, Selec
from textual.containers import Grid
from textual.widgets._data_table import CellDoesNotExist
from .attachment_format import AttachmentFormat
from .audio_layout import AudioLayout
from .iso_language import IsoLanguage
from .tag_delete_screen import TagDeleteScreen
@@ -141,6 +142,7 @@ class TrackDetailsScreen(Screen):
if self.__isNew:
self.__trackType = trackType
self.__trackCodec = TrackCodec.UNKNOWN
self.__attachmentFormat = AttachmentFormat.UNKNOWN
self.__audioLayout = AudioLayout.LAYOUT_UNDEFINED
self.__index = index
self.__subIndex = subIndex
@@ -150,6 +152,7 @@ class TrackDetailsScreen(Screen):
else:
self.__trackType = trackDescriptor.getType()
self.__trackCodec = trackDescriptor.getCodec()
self.__attachmentFormat = trackDescriptor.getAttachmentFormat()
self.__audioLayout = trackDescriptor.getAudioLayout()
self.__index = trackDescriptor.getIndex()
self.__subIndex = trackDescriptor.getSubIndex()
@@ -433,6 +436,9 @@ class TrackDetailsScreen(Screen):
if not isinstance(selectedTrackType, TrackType):
selectedTrackType = TrackType.UNKNOWN
kwargs[TrackDescriptor.TRACK_TYPE_KEY] = selectedTrackType
if selectedTrackType == TrackType.ATTACHMENT:
kwargs[TrackDescriptor.ATTACHMENT_FORMAT_KEY] = self.__attachmentFormat
else:
kwargs[TrackDescriptor.CODEC_KEY] = self.__trackCodec
if selectedTrackType == TrackType.AUDIO:

View File

@@ -18,6 +18,7 @@ from tests.support.ffx_bundle import (
write_vtt,
)
from ffx.attachment_format import AttachmentFormat
from ffx.track_type import TrackType
try:
@@ -280,6 +281,72 @@ class SubtrackMappingBundleTests(unittest.TestCase):
self.assertIn("non-existent source track #99", error_output)
self.assertFalse(expected_output_path(self.workdir, source_filename).exists())
def test_styled_ass_source_preserves_current_font_attachments_when_pattern_count_differs(self):
source_filename = "styled_ass_s01e01.mkv"
source_path = create_source_fixture(
self.workdir,
source_filename,
[
SourceTrackSpec(TrackType.VIDEO, identity="video-0"),
SourceTrackSpec(TrackType.AUDIO, identity="audio-1", language="eng"),
SourceTrackSpec(
TrackType.SUBTITLE,
identity="subtitle-2",
language="eng",
subtitle_lines=("styled subtitle payload",),
),
SourceTrackSpec(TrackType.ATTACHMENT, attachment_name="current.ttf"),
],
subtitle_encoder="ass",
)
prepare_pattern_database(
self.database_path,
r"^styled_ass_(s[0-9]+e[0-9]+)\.mkv$",
[
PatternTrackSpec(index=0, source_index=0, track_type=TrackType.VIDEO),
PatternTrackSpec(index=1, source_index=1, track_type=TrackType.AUDIO),
PatternTrackSpec(index=2, source_index=2, track_type=TrackType.SUBTITLE),
PatternTrackSpec(
index=3,
source_index=3,
track_type=TrackType.ATTACHMENT,
attachment_format=AttachmentFormat.TTF,
),
PatternTrackSpec(
index=4,
source_index=4,
track_type=TrackType.ATTACHMENT,
attachment_format=AttachmentFormat.TTF,
),
],
)
completed = run_ffx_convert(
self.workdir,
self.home_dir,
self.database_path,
"--video-encoder",
"copy",
"--no-tmdb",
"--no-prompt",
"--no-signature",
str(source_path),
)
self.assertCompleted(completed)
self.assertIn("Styled ASS subtitles", completed.stdout)
output_path = expected_output_path(self.workdir, source_filename)
streams = ffprobe_json(output_path)["streams"]
self.assertEqual(
[stream["codec_type"] for stream in streams],
["video", "audio", "subtitle", "attachment"],
)
self.assertEqual(streams[2]["codec_name"], "ass")
self.assertEqual(streams[3]["codec_name"], "ttf")
self.assertEqual(get_tag(streams[3], "filename"), "current.ttf")
def test_external_subtitle_file_replaces_payload_and_overrides_metadata(self):
source_filename = "substitute_s01e01.mkv"
self.write_config(

471
tests/prepare.sh Executable file
View File

@@ -0,0 +1,471 @@
#!/usr/bin/env bash
set -u
SCRIPT_DIR="$(cd -- "$(dirname -- "${BASH_SOURCE[0]}")" && pwd)"
ROOT_DIR="$(cd -- "${SCRIPT_DIR}/.." && pwd)"
VENV_DIR="${FFX_TEST_VENV_DIR:-${ROOT_DIR}/.venv}"
VENV_BIN_DIR="${VENV_DIR}/bin"
VENV_PYTHON="${VENV_BIN_DIR}/python"
VENV_PIP="${VENV_BIN_DIR}/pip"
CHECK_ONLY=0
READINESS_FAILURES=0
INSTALL_FAILURES=0
MISSING_REQUIRED_SYSTEM=()
COLOR_RESET=""
COLOR_GREEN=""
COLOR_YELLOW=""
COLOR_RED=""
if [ -t 1 ]; then
COLOR_RESET="$(printf '\033[0m')"
COLOR_GREEN="$(printf '\033[32m')"
COLOR_YELLOW="$(printf '\033[33m')"
COLOR_RED="$(printf '\033[31m')"
fi
usage() {
cat <<EOF
Usage: $(basename "$0") [--check] [--help]
Prepare the repo-local FFX test environment at:
${VENV_DIR}
Actions:
- verify or install required system commands for tests
- create or reuse the repo-local test virtualenv
- install this repository into the venv with Python test and docs extras
Options:
--check Report readiness only. Do not create, install, or modify.
--help Show this help text.
Environment overrides:
FFX_TEST_VENV_DIR Override the test virtualenv path. Defaults to ${ROOT_DIR}/.venv.
Notes:
- This script prepares a project-local test environment, not the persistent user bundle.
- The persistent bundle setup remains owned by tools/setup.sh.
EOF
}
status_ok() {
printf '%sok%s' "${COLOR_GREEN}" "${COLOR_RESET}"
}
status_warn() {
printf '%swarn%s' "${COLOR_YELLOW}" "${COLOR_RESET}"
}
status_fail() {
printf '%sfailed%s' "${COLOR_RED}" "${COLOR_RESET}"
}
report_component() {
local level="$1"
local label="$2"
local detail="$3"
local rendered_status=""
case "${level}" in
ok)
rendered_status="$(status_ok)"
;;
warn)
rendered_status="$(status_warn)"
;;
*)
rendered_status="$(status_fail)"
;;
esac
printf '[%s] %s%s\n' "${rendered_status}" "${label}" "${detail:+: $detail}"
}
command_exists() {
command -v "$1" >/dev/null 2>&1
}
check_python_venv_support() {
python3 -m venv --help >/dev/null 2>&1
}
check_system_command() {
command_exists "$1"
}
check_venv_python() {
[ -x "${VENV_PYTHON}" ]
}
check_venv_pip() {
check_venv_python && "${VENV_PIP}" --version >/dev/null 2>&1
}
check_venv_ffx() {
check_venv_python && "${VENV_PYTHON}" -m ffx version >/dev/null 2>&1
}
check_venv_pytest() {
check_venv_python && "${VENV_PYTHON}" -m pytest --version >/dev/null 2>&1
}
check_venv_sphinx() {
check_venv_python && "${VENV_BIN_DIR}/sphinx-build" --version >/dev/null 2>&1
}
check_venv_docs_packages() {
check_venv_python && "${VENV_PYTHON}" - <<'PY' >/dev/null 2>&1
import esbonio
import sphinx
import sphinx_rtd_theme
PY
}
check_editable_install() {
check_venv_python && FFX_REPO_ROOT="${ROOT_DIR}" "${VENV_PYTHON}" - <<'PY' >/dev/null 2>&1
from __future__ import annotations
import os
from pathlib import Path
import ffx
repo_root = Path(os.environ["FFX_REPO_ROOT"]).resolve()
package_path = Path(ffx.__file__).resolve()
raise SystemExit(0 if repo_root in package_path.parents else 1)
PY
}
check_python_environment_ready() {
check_venv_python &&
check_venv_pip &&
check_venv_pytest &&
check_venv_sphinx &&
check_venv_docs_packages &&
check_venv_ffx &&
check_editable_install
}
command_detail() {
command -v "$1" || printf "command '%s' not found" "$1"
}
python_venv_detail() {
if check_python_venv_support; then
printf 'python3 -m venv is available'
else
printf 'python3 venv support is unavailable'
fi
}
venv_python_detail() {
if check_venv_python; then
printf '%s' "${VENV_PYTHON}"
else
printf 'missing %s' "${VENV_PYTHON}"
fi
}
venv_pip_detail() {
if check_venv_pip; then
"${VENV_PIP}" --version
else
printf 'missing pip in %s' "${VENV_DIR}"
fi
}
venv_ffx_detail() {
if check_venv_ffx; then
printf 'ffx import and CLI entry are available'
else
printf 'ffx is not installed in %s' "${VENV_DIR}"
fi
}
venv_pytest_detail() {
if check_venv_pytest; then
"${VENV_PYTHON}" -m pytest --version 2>/dev/null | head -n 1
else
printf 'pytest is not installed in %s' "${VENV_DIR}"
fi
}
venv_sphinx_detail() {
if check_venv_sphinx; then
"${VENV_BIN_DIR}/sphinx-build" --version 2>&1
else
printf 'sphinx-build is not installed in %s' "${VENV_DIR}"
fi
}
venv_docs_packages_detail() {
if check_venv_docs_packages; then
printf 'Sphinx, Read the Docs theme, and Esbonio packages are importable'
else
printf 'one or more docs packages are missing in %s' "${VENV_DIR}"
fi
}
editable_install_detail() {
if check_editable_install; then
printf 'ffx resolves from %s' "${ROOT_DIR}"
else
printf 'ffx does not resolve from the project source tree'
fi
}
report_required_command() {
local label="$1"
local command_name="$2"
if check_system_command "${command_name}"; then
report_component ok "${label}" "$(command_detail "${command_name}")"
else
report_component failed "${label}" "$(command_detail "${command_name}")"
MISSING_REQUIRED_SYSTEM+=("${command_name}")
READINESS_FAILURES=$((READINESS_FAILURES + 1))
fi
}
print_system_status() {
MISSING_REQUIRED_SYSTEM=()
echo "System toolchain status:"
report_required_command "git" "git"
report_required_command "python3" "python3"
if check_system_command "python3" && check_python_venv_support; then
report_component ok "python3 venv" "$(python_venv_detail)"
else
report_component failed "python3 venv" "$(python_venv_detail)"
MISSING_REQUIRED_SYSTEM+=("python3-venv")
READINESS_FAILURES=$((READINESS_FAILURES + 1))
fi
report_required_command "ffmpeg" "ffmpeg"
report_required_command "ffprobe" "ffprobe"
report_required_command "cpulimit" "cpulimit"
}
print_python_status() {
echo "Repo test and docs virtualenv status:"
if check_venv_python; then
report_component ok "test virtualenv" "$(venv_python_detail)"
else
report_component failed "test virtualenv" "$(venv_python_detail)"
READINESS_FAILURES=$((READINESS_FAILURES + 1))
fi
if check_venv_pip; then
report_component ok "test pip" "$(venv_pip_detail)"
else
report_component failed "test pip" "$(venv_pip_detail)"
READINESS_FAILURES=$((READINESS_FAILURES + 1))
fi
if check_venv_pytest; then
report_component ok "test pytest" "$(venv_pytest_detail)"
else
report_component failed "test pytest" "$(venv_pytest_detail)"
READINESS_FAILURES=$((READINESS_FAILURES + 1))
fi
if check_venv_sphinx; then
report_component ok "docs sphinx" "$(venv_sphinx_detail)"
else
report_component failed "docs sphinx" "$(venv_sphinx_detail)"
READINESS_FAILURES=$((READINESS_FAILURES + 1))
fi
if check_venv_docs_packages; then
report_component ok "docs packages" "$(venv_docs_packages_detail)"
else
report_component failed "docs packages" "$(venv_docs_packages_detail)"
READINESS_FAILURES=$((READINESS_FAILURES + 1))
fi
if check_venv_ffx; then
report_component ok "test ffx" "$(venv_ffx_detail)"
else
report_component failed "test ffx" "$(venv_ffx_detail)"
READINESS_FAILURES=$((READINESS_FAILURES + 1))
fi
if check_editable_install; then
report_component ok "editable source" "$(editable_install_detail)"
else
report_component failed "editable source" "$(editable_install_detail)"
READINESS_FAILURES=$((READINESS_FAILURES + 1))
fi
}
print_status_report() {
READINESS_FAILURES=0
print_system_status
echo
print_python_status
}
detect_package_manager() {
if command_exists apt-get; then
printf 'apt-get\n'
return 0
fi
if command_exists pacman; then
printf 'pacman\n'
return 0
fi
return 1
}
run_root_command() {
if [ "${EUID}" -eq 0 ]; then
"$@"
elif command_exists sudo; then
sudo -n "$@"
else
return 1
fi
}
install_system_requirements() {
local package_manager
if [ "${#MISSING_REQUIRED_SYSTEM[@]}" -eq 0 ]; then
return 0
fi
if ! package_manager="$(detect_package_manager)"; then
printf 'No supported package manager found for automatic system preparation.\n' >&2
INSTALL_FAILURES=$((INSTALL_FAILURES + 1))
return 1
fi
case "${package_manager}" in
apt-get)
printf 'Installing required system dependencies via apt-get...\n'
if ! run_root_command apt-get update; then
printf 'apt-get update failed or requires interactive sudo.\n' >&2
INSTALL_FAILURES=$((INSTALL_FAILURES + 1))
return 1
fi
if ! run_root_command apt-get install -y git python3 python3-venv ffmpeg cpulimit; then
printf 'apt-get install failed or requires interactive sudo.\n' >&2
INSTALL_FAILURES=$((INSTALL_FAILURES + 1))
return 1
fi
;;
pacman)
printf 'Installing required system dependencies via pacman...\n'
if ! run_root_command pacman -Sy --noconfirm git python ffmpeg cpulimit; then
printf 'pacman install failed or requires interactive sudo.\n' >&2
INSTALL_FAILURES=$((INSTALL_FAILURES + 1))
return 1
fi
;;
esac
return 0
}
ensure_test_venv() {
if ! check_venv_python; then
printf 'Creating repo test virtualenv at %s...\n' "${VENV_DIR}"
if ! python3 -m venv "${VENV_DIR}"; then
printf 'Failed to create test virtualenv at %s.\n' "${VENV_DIR}" >&2
INSTALL_FAILURES=$((INSTALL_FAILURES + 1))
return 1
fi
fi
if ! check_venv_pip; then
printf 'Missing pip in %s.\n' "${VENV_DIR}" >&2
INSTALL_FAILURES=$((INSTALL_FAILURES + 1))
return 1
fi
printf 'Installing FFX package with test and docs extras into %s...\n' "${VENV_DIR}"
if ! (
cd "${ROOT_DIR}" &&
"${VENV_PIP}" install --editable '.[test,docs]'
); then
printf 'Failed to install FFX package with test and docs extras into %s.\n' "${VENV_DIR}" >&2
INSTALL_FAILURES=$((INSTALL_FAILURES + 1))
return 1
fi
return 0
}
parse_args() {
while [ "$#" -gt 0 ]; do
case "$1" in
--check)
CHECK_ONLY=1
;;
--help|-h)
usage
exit 0
;;
*)
printf 'Unknown option: %s\n\n' "$1" >&2
usage >&2
exit 2
;;
esac
shift
done
}
main() {
parse_args "$@"
print_status_report
if [ "${CHECK_ONLY}" -eq 0 ]; then
if [ "${#MISSING_REQUIRED_SYSTEM[@]}" -gt 0 ]; then
echo
install_system_requirements
fi
if check_python_environment_ready; then
echo
report_component ok "Python package install" "repo test and docs virtualenv is already ready"
elif check_system_command "python3" && check_python_venv_support; then
echo
ensure_test_venv
fi
echo
print_status_report
fi
echo
if [ "${INSTALL_FAILURES}" -gt 0 ]; then
echo "One or more test preparation steps failed; see the status checks above." >&2
exit 1
fi
if [ "${READINESS_FAILURES}" -gt 0 ]; then
if [ "${CHECK_ONLY}" -eq 1 ]; then
echo "The FFX test and docs environment is incomplete." >&2
else
echo "Required test or docs components are still missing after preparation." >&2
fi
exit 1
fi
if [ "${CHECK_ONLY}" -eq 1 ]; then
echo "The FFX test and docs environment is ready."
else
echo "The FFX test and docs environment is prepared."
fi
}
main "$@"

View File

@@ -18,6 +18,7 @@ if str(SRC_ROOT) not in sys.path:
sys.path.insert(0, str(SRC_ROOT))
from ffx.attachment_format import AttachmentFormat
from ffx.audio_layout import AudioLayout
from ffx.database import databaseContext
from ffx.pattern_controller import PatternController
@@ -56,6 +57,7 @@ class PatternTrackSpec:
tags: Mapping[str, str] = field(default_factory=dict)
dispositions: tuple[TrackDisposition, ...] = ()
audio_layout: AudioLayout = AudioLayout.LAYOUT_STEREO
attachment_format: AttachmentFormat = AttachmentFormat.UNKNOWN
def make_logger(name: str) -> logging.Logger:
@@ -299,6 +301,8 @@ def prepare_pattern_database(database_path: Path, filename_pattern: str, track_s
}
if track.track_type == TrackType.AUDIO:
kwargs[TrackDescriptor.AUDIO_LAYOUT_KEY] = track.audio_layout
if track.track_type == TrackType.ATTACHMENT:
kwargs[TrackDescriptor.ATTACHMENT_FORMAT_KEY] = track.attachment_format
track_descriptors.append(TrackDescriptor(**kwargs))
pattern_id = PatternController(context).savePatternSchema(

View File

@@ -0,0 +1,211 @@
from __future__ import annotations
import os
from pathlib import Path
import sys
import tempfile
import unittest
from unittest.mock import patch
from click.testing import CliRunner
SRC_ROOT = Path(__file__).resolve().parents[2] / "src"
if str(SRC_ROOT) not in sys.path:
sys.path.insert(0, str(SRC_ROOT))
from ffx import cli # noqa: E402
from ffx.diagnostics import FfmpegSkipFileWarning, recordUnremediedIssue # noqa: E402
from ffx.logging_utils import get_ffx_logger # noqa: E402
class _FakeMediaDescriptor:
def getVideoTracks(self):
return []
def getAudioTracks(self):
return []
def getSubtitleTracks(self):
return []
def getAttachmentTracks(self):
return []
def applyOverrides(self, overrides):
return None
class _FakeFileProperties:
def __init__(self, context, source_path):
self.source_path = source_path
def getShowId(self):
return -1
def getSeason(self):
return -1
def getEpisode(self):
return -1
def getMediaDescriptor(self):
return _FakeMediaDescriptor()
def getPattern(self):
return None
class _FakeShiftedSeasonController:
def __init__(self, context):
self.context = context
def shiftSeason(self, show_id, season, episode, patternId=None):
return season, episode
class _FakeShowController:
def __init__(self, context):
self.context = context
def getShowDescriptor(self, show_id):
return None
class _FakeFfxController:
calls: list[str] = []
mode = "skip_first"
def __init__(self, context, *args, **kwargs):
self.context = context
def runJob(self, sourcePath, *args, **kwargs):
self.calls.append(sourcePath)
if self.mode == "clean":
return
if self.mode == "warn_unhandled" and sourcePath.endswith("episode1.avi"):
recordUnremediedIssue(
self.context,
sourcePath,
"unhandled-warning",
)
return
if self.mode == "skip_first" and sourcePath.endswith("episode1.avi"):
message = (
f"Skipping file {sourcePath}: ffmpeg still reported unset packet "
+ "timestamps after retry with -fflags +genpts."
)
recordUnremediedIssue(
self.context,
sourcePath,
"retry-with-generated-pts",
)
self.context["logger"].warning(message)
raise FfmpegSkipFileWarning(message)
class ConvertDiagnosticCliTests(unittest.TestCase):
def setUp(self):
logger = get_ffx_logger()
for handler in list(logger.handlers):
logger.removeHandler(handler)
try:
handler.close()
except Exception:
pass
self.tempdir = tempfile.TemporaryDirectory()
self.home_dir = Path(self.tempdir.name) / "home"
self.home_dir.mkdir()
self.database_path = Path(self.tempdir.name) / "test.db"
self.source_dir = Path(self.tempdir.name) / "source"
self.source_dir.mkdir()
self.source_one = self.source_dir / "episode1.avi"
self.source_two = self.source_dir / "episode2.avi"
self.source_one.write_bytes(b"one")
self.source_two.write_bytes(b"two")
_FakeFfxController.calls = []
_FakeFfxController.mode = "skip_first"
def tearDown(self):
self.tempdir.cleanup()
def test_convert_continues_after_skipping_one_file_due_to_ffmpeg_diagnostic(self):
runner = CliRunner()
with (
patch("ffx.file_properties.FileProperties", _FakeFileProperties),
patch("ffx.ffx_controller.FfxController", _FakeFfxController),
patch(
"ffx.shifted_season_controller.ShiftedSeasonController",
_FakeShiftedSeasonController,
),
patch("ffx.show_controller.ShowController", _FakeShowController),
):
result = runner.invoke(
cli.ffx,
[
"--database-file",
str(self.database_path),
"convert",
"--no-tmdb",
"--no-pattern",
str(self.source_one),
str(self.source_two),
],
env={**os.environ, "HOME": str(self.home_dir)},
)
self.assertEqual(0, result.exit_code, result.output)
self.assertEqual(
[str(self.source_one), str(self.source_two)],
_FakeFfxController.calls,
)
self.assertIn("Skipping file", result.output)
self.assertIn("-fflags +genpts", result.output)
self.assertIn("Files with ffmpeg findings that require review:", result.output)
self.assertIn(
"episode1.avi: retry-with-generated-pts",
result.output,
)
def test_convert_prints_clean_summary_when_no_unremedied_issues_were_seen(self):
runner = CliRunner()
_FakeFfxController.mode = "clean"
with (
patch("ffx.file_properties.FileProperties", _FakeFileProperties),
patch("ffx.ffx_controller.FfxController", _FakeFfxController),
patch(
"ffx.shifted_season_controller.ShiftedSeasonController",
_FakeShiftedSeasonController,
),
patch("ffx.show_controller.ShowController", _FakeShowController),
):
result = runner.invoke(
cli.ffx,
[
"--database-file",
str(self.database_path),
"convert",
"--no-tmdb",
"--no-pattern",
str(self.source_one),
str(self.source_two),
],
env={**os.environ, "HOME": str(self.home_dir)},
)
self.assertEqual(0, result.exit_code, result.output)
self.assertIn(
"All files converted with no issues.",
result.output,
)
if __name__ == "__main__":
unittest.main()

View File

@@ -263,6 +263,47 @@ class CliLazyImportTests(unittest.TestCase):
result["modules"],
)
def test_convert_copy_flags_parse_without_loading_runtime_modules(self):
result = self.run_python(
textwrap.dedent(
f"""
import click
import json
import sys
sys.path.insert(0, {str(SRC_ROOT)!r})
import ffx.cli
context = ffx.cli.convert.make_context(
"convert",
["--copy-video", "--copy-audio"],
resilient_parsing=True,
)
help_output = ffx.cli.convert.get_help(click.Context(ffx.cli.convert))
print(json.dumps({{
"copy_video": context.params["copy_video"],
"copy_audio": context.params["copy_audio"],
"output": help_output,
"modules": {{
module_name: module_name in sys.modules
for module_name in {HEAVY_MODULES!r}
}},
}}))
"""
)
)
self.assertTrue(result["copy_video"])
self.assertTrue(result["copy_audio"])
self.assertIn("--copy-video", result["output"])
self.assertIn("--copy-audio", result["output"])
self.assertTrue(
all(not is_loaded for is_loaded in result["modules"].values()),
result["modules"],
)
def test_edit_command_avoids_database_bootstrap(self):
result = self.run_python(
textwrap.dedent(

View File

@@ -68,11 +68,14 @@ class UpgradeCommandTests(unittest.TestCase):
subprocess_calls.append((args, kwargs))
if args == ['git', 'status', '--porcelain', '--untracked-files=no']:
return self.make_completed(args, stdout="M src/ffx/constants.py\n")
if args == ['git', 'rev-parse', '--abbrev-ref', 'HEAD']:
return self.make_completed(args, stdout="main\n")
return self.make_completed(args)
with (
patch.object(cli, "getBundleRepoPath", return_value=repo_path),
patch.object(cli, "getBundlePipPath", return_value=pip_path),
patch.object(cli, "getBundleVersion", return_value="0.3.2"),
patch.object(cli.os.path, "isdir", return_value=True),
patch.object(cli.os.path, "isfile", return_value=True),
patch.object(cli.subprocess, "run", side_effect=fake_run),
@@ -81,6 +84,7 @@ class UpgradeCommandTests(unittest.TestCase):
self.assertEqual(0, result.exit_code, result.output)
self.assertIn("Tracked local changes detected in the bundle repository:", result.output)
self.assertIn("Updated FFX to version 0.3.2 from branch main.", result.output)
self.assertEqual(
[
['git', 'status', '--porcelain', '--untracked-files=no'],
@@ -89,6 +93,7 @@ class UpgradeCommandTests(unittest.TestCase):
['git', 'checkout', '-B', 'main', 'FETCH_HEAD'],
[pip_path, 'install', '--upgrade', 'pip', 'setuptools', 'wheel'],
[pip_path, 'install', '--editable', '.'],
['git', 'rev-parse', '--abbrev-ref', 'HEAD'],
],
[call[0] for call in subprocess_calls],
)
@@ -106,11 +111,14 @@ class UpgradeCommandTests(unittest.TestCase):
subprocess_calls.append((args, kwargs))
if args == ['git', 'status', '--porcelain', '--untracked-files=no']:
return self.make_completed(args, stdout="")
if args == ['git', 'rev-parse', '--abbrev-ref', 'HEAD']:
return self.make_completed(args, stdout="develop\n")
return self.make_completed(args)
with (
patch.object(cli, "getBundleRepoPath", return_value=repo_path),
patch.object(cli, "getBundlePipPath", return_value=pip_path),
patch.object(cli, "getBundleVersion", return_value="0.3.3"),
patch.object(cli.os.path, "isdir", return_value=True),
patch.object(cli.os.path, "isfile", return_value=True),
patch.object(cli.subprocess, "run", side_effect=fake_run),
@@ -118,12 +126,14 @@ class UpgradeCommandTests(unittest.TestCase):
result = runner.invoke(cli.ffx, ["upgrade"])
self.assertEqual(0, result.exit_code, result.output)
self.assertIn("Updated FFX to version 0.3.3 from branch develop.", result.output)
self.assertEqual(
[
['git', 'status', '--porcelain', '--untracked-files=no'],
['git', 'pull'],
[pip_path, 'install', '--upgrade', 'pip', 'setuptools', 'wheel'],
[pip_path, 'install', '--editable', '.'],
['git', 'rev-parse', '--abbrev-ref', 'HEAD'],
],
[call[0] for call in subprocess_calls],
)

View File

@@ -0,0 +1,196 @@
from __future__ import annotations
from pathlib import Path
import sys
import unittest
from unittest.mock import patch
SRC_ROOT = Path(__file__).resolve().parents[2] / "src"
if str(SRC_ROOT) not in sys.path:
sys.path.insert(0, str(SRC_ROOT))
from ffx.diagnostics import ( # noqa: E402
FfmpegCommandRunner,
FfmpegDiagnosticMonitor,
FfmpegSkipFileWarning,
getUnremediedIssues,
iterUnremediedIssueSummaryLines,
)
class RecordingLogger:
def __init__(self):
self.messages: list[str] = []
def warning(self, message, *args, **kwargs):
if args:
message = message % args
self.messages.append(str(message))
class FfmpegDiagnosticsTests(unittest.TestCase):
def test_command_runner_retries_with_genpts_after_timestamp_warning(self):
logger = RecordingLogger()
context = {
"logger": logger,
"current_source_path": "tests/assets/avi/conan_S01E754_amalgam.avi",
}
runner = FfmpegCommandRunner(context)
commands = []
def fake_execute(commandSequence, **kwargs):
commands.append(list(commandSequence))
stderrLineHandler = kwargs["stderrLineHandler"]
if len(commands) == 1:
self.assertTrue(
stderrLineHandler(
"[matroska @ 0x1] Timestamps are unset in a packet for stream 0. "
+ "This is deprecated and will stop working in the future."
)
)
return "", "timestamp warning\n", -15
return "done", "", 0
with patch("ffx.diagnostics.monitor.executeProcess", side_effect=fake_execute):
out, err, rc = runner.execute(["ffmpeg", "-y", "-i", "input.avi", "output.mkv"])
self.assertEqual("done", out)
self.assertEqual("", err)
self.assertEqual(0, rc)
self.assertEqual(
[
["ffmpeg", "-y", "-i", "input.avi", "output.mkv"],
["ffmpeg", "-fflags", "+genpts", "-y", "-i", "input.avi", "output.mkv"],
],
commands,
)
self.assertEqual(
[
"ffmpeg reported unset packet timestamps for tests/assets/avi/conan_S01E754_amalgam.avi. "
+ "Stopping early and retrying with -fflags +genpts."
],
logger.messages,
)
self.assertEqual({}, getUnremediedIssues(context))
def test_command_runner_skips_file_when_timestamp_warning_persists_after_genpts(self):
logger = RecordingLogger()
context = {
"logger": logger,
"current_source_path": "tests/assets/avi/conan_S01E754_amalgam.avi",
}
runner = FfmpegCommandRunner(context)
def fake_execute(commandSequence, **kwargs):
stderrLineHandler = kwargs["stderrLineHandler"]
self.assertTrue(
stderrLineHandler(
"[matroska @ 0x1] Timestamps are unset in a packet for stream 0. "
+ "This is deprecated and will stop working in the future."
)
)
return "", "timestamp warning\n", -15
with patch("ffx.diagnostics.monitor.executeProcess", side_effect=fake_execute):
with self.assertRaises(FfmpegSkipFileWarning):
runner.execute(
["ffmpeg", "-fflags", "+genpts", "-y", "-i", "input.avi", "output.mkv"]
)
self.assertEqual(
[
"Skipping file tests/assets/avi/conan_S01E754_amalgam.avi: ffmpeg still reported "
+ "unset packet timestamps after retry with -fflags +genpts."
],
logger.messages,
)
self.assertEqual(
{
"tests/assets/avi/conan_S01E754_amalgam.avi": ["retry-with-generated-pts"]
},
getUnremediedIssues(context),
)
def test_monitor_tracks_non_harmless_corrupt_mpeg_audio_remedy_in_summary(self):
logger = RecordingLogger()
context = {
"logger": logger,
"current_source_path": "tests/assets/avi/conan_S01E763_amalgam.avi",
}
monitor = FfmpegDiagnosticMonitor(
context,
["ffmpeg", "-y", "-i", "input.avi", "output.mkv"],
)
self.assertFalse(
monitor.handle_stderr_line("[mp3float @ 0x1] invalid new backstep -1")
)
self.assertFalse(monitor.handle_stderr_line("[mp3float @ 0x1] invalid block type"))
self.assertFalse(
monitor.handle_stderr_line(
"[aist#0:1/mp3 @ 0x2] [dec:mp3float @ 0x3] Error submitting packet to decoder: "
+ "Invalid data found when processing input"
)
)
self.assertEqual(
[
"ffmpeg reported damaged MPEG audio frames while converting "
+ "tests/assets/avi/conan_S01E763_amalgam.avi. FFX will continue, but the "
+ "output audio may contain gaps or glitches."
],
logger.messages,
)
self.assertEqual(
{
"tests/assets/avi/conan_S01E763_amalgam.avi": ["warn-corrupt-mpeg-audio"]
},
getUnremediedIssues(context),
)
self.assertEqual(
["conan_S01E763_amalgam.avi: warn-corrupt-mpeg-audio"],
iterUnremediedIssueSummaryLines(context),
)
def test_monitor_tracks_unhandled_diagnostic_for_summary(self):
context = {
"logger": RecordingLogger(),
"current_source_path": "tests/assets/avi/example.avi",
}
monitor = FfmpegDiagnosticMonitor(
context,
["ffmpeg", "-y", "-i", "input.avi", "output.mkv"],
)
self.assertFalse(
monitor.handle_stderr_line(
"[avi @ 0x1] Strange warning with no automatic remedy is present"
)
)
self.assertEqual(
{
"tests/assets/avi/example.avi": ["unhandled-warning"]
},
getUnremediedIssues(context),
)
self.assertEqual(
["example.avi: unhandled-warning"],
iterUnremediedIssueSummaryLines(context),
)
self.assertEqual(
[
"ffmpeg reported a diagnostic with no automatic remedy while converting "
+ "tests/assets/avi/example.avi. FFX will continue, but review the output "
+ "file. First unhandled line: [avi @ 0x1] Strange warning with no automatic remedy is present"
],
context["logger"].messages,
)
if __name__ == "__main__":
unittest.main()

View File

@@ -15,6 +15,7 @@ if str(SRC_ROOT) not in sys.path:
from ffx.ffx_controller import FfxController # noqa: E402
from ffx.audio_layout import AudioLayout # noqa: E402
from ffx.logging_utils import get_ffx_logger # noqa: E402
from ffx.media_descriptor import MediaDescriptor # noqa: E402
from ffx.show_descriptor import ShowDescriptor # noqa: E402
@@ -43,6 +44,8 @@ class FfxControllerTests(unittest.TestCase):
"video_encoder": video_encoder,
"dry_run": False,
"perform_cut": False,
"copy_video": False,
"copy_audio": False,
"bitrates": {
"stereo": "112k",
"ac3": "256k",
@@ -75,6 +78,56 @@ class FfxControllerTests(unittest.TestCase):
)
return descriptor, source_descriptor
def make_media_descriptors_with_audio(
self,
audio_layout: AudioLayout = AudioLayout.LAYOUT_STEREO,
) -> tuple[MediaDescriptor, MediaDescriptor]:
descriptor = MediaDescriptor(
track_descriptors=[
TrackDescriptor(
index=0,
source_index=0,
sub_index=0,
track_type=TrackType.VIDEO,
codec_name=TrackCodec.H264,
),
TrackDescriptor(
index=1,
source_index=1,
sub_index=0,
track_type=TrackType.AUDIO,
codec_name=TrackCodec.AAC,
audio_layout=audio_layout,
),
]
)
source_descriptor = MediaDescriptor(
track_descriptors=[
TrackDescriptor(
index=0,
source_index=0,
sub_index=0,
track_type=TrackType.VIDEO,
codec_name=TrackCodec.H264,
),
TrackDescriptor(
index=1,
source_index=1,
sub_index=0,
track_type=TrackType.AUDIO,
codec_name=TrackCodec.AAC,
audio_layout=audio_layout,
),
]
)
return descriptor, source_descriptor
def assert_token_pair(self, command: list[str], first: str, second: str):
self.assertTrue(
any(command[index:index + 2] == [first, second] for index in range(len(command) - 1)),
command,
)
def test_vp9_run_job_emits_file_level_encoding_quality_metadata(self):
context = self.make_context(VideoEncoder.VP9)
target_descriptor, source_descriptor = self.make_media_descriptors()
@@ -196,6 +249,79 @@ class FfxControllerTests(unittest.TestCase):
self.assertIn("ENCODING_QUALITY=19", commands[0])
mocked_info.assert_any_call("Setting quality 19 from pattern")
def test_copy_video_uses_single_copy_command_without_video_encoding_options(self):
context = self.make_context(VideoEncoder.VP9)
context["copy_video"] = True
target_descriptor, source_descriptor = self.make_media_descriptors_with_audio()
controller = FfxController(context, target_descriptor, source_descriptor)
commands = []
with patch.object(
controller,
"executeCommandSequence",
side_effect=lambda command: commands.append(command) or ("", "", 0),
):
controller.runJob(
"input.mkv",
"output.mkv",
chainIteration=[
{
"identifier": "quality",
"parameters": {"quality": 27},
},
{
"identifier": "nlmeans",
"parameters": {},
"tokens": ["nlmeans=s=2.0"],
},
],
cropArguments={
"output_width": 1280,
"output_height": 720,
"x_offset": 0,
"y_offset": 0,
},
)
self.assertEqual(1, len(commands))
self.assert_token_pair(commands[0], "-c:v", "copy")
self.assertIn("libopus", commands[0])
self.assertNotIn("libvpx-vp9", commands[0])
self.assertNotIn("-pass", commands[0])
self.assertNotIn("-vf", commands[0])
self.assertFalse(any(token.startswith("ENCODING_QUALITY=") for token in commands[0]))
def test_copy_audio_uses_audio_copy_without_audio_encoding_options(self):
context = self.make_context(VideoEncoder.H264)
context["copy_audio"] = True
target_descriptor, source_descriptor = self.make_media_descriptors_with_audio(
AudioLayout.LAYOUT_5_1
)
controller = FfxController(context, target_descriptor, source_descriptor)
commands = []
with patch.object(
controller,
"executeCommandSequence",
side_effect=lambda command: commands.append(command) or ("", "", 0),
):
controller.runJob(
"input.mkv",
"output.mkv",
chainIteration=[
{
"identifier": "quality",
"parameters": {"quality": 21},
}
],
)
self.assertEqual(1, len(commands))
self.assert_token_pair(commands[0], "-c:a", "copy")
self.assertIn("libx264", commands[0])
self.assertNotIn("libopus", commands[0])
self.assertFalse(any(token.startswith("-b:a") for token in commands[0]))
self.assertFalse(any(token.startswith("-filter:a") for token in commands[0]))
def test_generate_h264_tokens_prefers_libx264_when_available(self):
context = self.make_context(VideoEncoder.H264)
target_descriptor, source_descriptor = self.make_media_descriptors()

View File

@@ -13,6 +13,7 @@ if str(SRC_ROOT) not in sys.path:
from ffx.media_descriptor import MediaDescriptor # noqa: E402
from ffx.media_descriptor_change_set import MediaDescriptorChangeSet # noqa: E402
from ffx.attachment_format import AttachmentFormat # noqa: E402
from ffx.track_descriptor import TrackDescriptor # noqa: E402
from ffx.track_type import TrackType # noqa: E402
from ffx.i18n import set_current_language # noqa: E402
@@ -436,6 +437,47 @@ class MediaDescriptorChangeSetTests(unittest.TestCase):
self.assertNotIn("creation_time=", metadata_tokens)
self.assertNotIn("BPS=", metadata_tokens)
def test_attachment_tracks_are_ignored_for_pattern_comparison(self):
context = {
"logger": get_ffx_logger(),
"config": StaticConfig({}),
}
source_track = TrackDescriptor(
index=0,
source_index=0,
sub_index=0,
track_type=TrackType.ATTACHMENT,
attachment_format=AttachmentFormat.TTF,
tags={"filename": "current.ttf", "mimetype": "font/ttf"},
)
target_track = TrackDescriptor(
index=0,
source_index=0,
sub_index=0,
track_type=TrackType.ATTACHMENT,
attachment_format=AttachmentFormat.TTF,
tags={"filename": "stored.ttf", "mimetype": "font/ttf"},
)
stale_target_track = TrackDescriptor(
index=1,
source_index=1,
sub_index=1,
track_type=TrackType.ATTACHMENT,
attachment_format=AttachmentFormat.TTF,
tags={"filename": "missing.ttf", "mimetype": "font/ttf"},
)
change_set = MediaDescriptorChangeSet(
context,
MediaDescriptor(track_descriptors=[target_track, stale_target_track]),
MediaDescriptor(track_descriptors=[source_track]),
)
self.assertEqual({}, change_set.getChangeSetObj())
self.assertEqual([], change_set.generateMetadataTokens())
self.assertEqual([], change_set.generateDispositionTokens())
def test_normalization_can_be_disabled_per_context(self):
context = {
"logger": get_ffx_logger(),

View File

@@ -193,6 +193,36 @@ class PatternManagementTests(unittest.TestCase):
self.assertIn("at least one track", str(caught.exception))
def test_save_pattern_schema_does_not_persist_attachment_tracks(self):
pattern_id = self.save_pattern(
1,
r"^noattachments_(s[0-9]+e[0-9]+)\.mkv$",
tracks=[
make_track_descriptor(0, track_type=TrackType.VIDEO),
make_track_descriptor(1, track_type=TrackType.ATTACHMENT),
],
)
Session = self.context["database"]["session"]
session = Session()
try:
tracks = session.query(Pattern).filter(Pattern.id == pattern_id).first().tracks
self.assertEqual(1, len(tracks))
self.assertEqual(TrackType.VIDEO, tracks[0].getType())
finally:
session.close()
def test_track_controller_does_not_add_attachment_tracks_to_patterns(self):
pattern_id = self.save_pattern(1, r"^skipadd_(s[0-9]+e[0-9]+)\.mkv$")
added = self.track_controller.addTrack(
make_track_descriptor(1, track_type=TrackType.ATTACHMENT),
patternId=pattern_id,
)
self.assertFalse(added)
self.assertEqual(1, len(self.track_controller.findTracks(pattern_id)))
def test_match_filename_rejects_existing_trackless_pattern_rows(self):
self.insert_trackless_pattern_row(1, r"^invalid_(s[0-9]+e[0-9]+)\.mkv$")

View File

@@ -2,6 +2,7 @@ from __future__ import annotations
from pathlib import Path
import sys
import time
import unittest
from unittest.mock import patch
@@ -51,6 +52,33 @@ class ProcessTests(unittest.TestCase):
self.assertIn("Command timed out", err)
self.assertIn(sys.executable, err)
def test_execute_process_can_stop_early_while_streaming_stderr(self):
start = time.monotonic()
observed_lines = []
out, err, rc = executeProcess(
[
sys.executable,
"-c",
(
"import sys, time; "
"sys.stderr.write('fatal warning\\n'); sys.stderr.flush(); "
"time.sleep(2); "
"sys.stderr.write('late line\\n'); sys.stderr.flush()"
),
],
stderrLineHandler=lambda line: observed_lines.append(line) or ("fatal warning" in line),
)
elapsed = time.monotonic() - start
self.assertLess(elapsed, 1.5)
self.assertNotEqual(0, rc)
self.assertEqual("", out)
self.assertIn("fatal warning", err)
self.assertNotIn("late line", err)
self.assertEqual(["fatal warning\n"], observed_lines)
def test_get_wrapped_command_sequence_leaves_command_unwrapped_when_limits_disabled(self):
wrapped = getWrappedCommandSequence(
["ffmpeg", "-i", "input.mkv"],

View File

@@ -14,12 +14,14 @@ if str(SRC_ROOT) not in sys.path:
from ffx.audio_layout import AudioLayout # noqa: E402
from ffx.helper import DIFF_ADDED_KEY # noqa: E402
from ffx.attachment_format import AttachmentFormat # noqa: E402
from ffx.helper import DIFF_ADDED_KEY, DIFF_REMOVED_KEY # noqa: E402
from ffx.iso_language import IsoLanguage # noqa: E402
from ffx.logging_utils import get_ffx_logger # noqa: E402
from ffx.inspect_details_screen import InspectDetailsScreen # noqa: E402
from ffx.i18n import set_current_language # noqa: E402
from ffx.media_descriptor import MediaDescriptor # noqa: E402
from ffx.media_descriptor_change_set import MediaDescriptorChangeSet # noqa: E402
from ffx.media_edit_screen import MediaEditScreen # noqa: E402
from ffx.pattern_details_screen import PatternDetailsScreen # noqa: E402
from ffx.show_descriptor import ShowDescriptor # noqa: E402
@@ -200,6 +202,32 @@ class TagTableScreenStateTests(unittest.TestCase):
self.assertEqual("German Audio", descriptor.getTitle())
self.assertEqual("value", descriptor.getTags()["KEEP"])
def test_track_details_screen_preserves_attachment_format_for_attachment_tracks(self):
screen = object.__new__(TrackDetailsScreen)
screen.context = {"logger": get_ffx_logger()}
screen._TrackDetailsScreen__trackDescriptor = None
screen._TrackDetailsScreen__patternId = 5
screen._TrackDetailsScreen__index = 4
screen._TrackDetailsScreen__subIndex = 0
screen._TrackDetailsScreen__trackCodec = TrackCodec.UNKNOWN
screen._TrackDetailsScreen__attachmentFormat = AttachmentFormat.TTF
screen._TrackDetailsScreen__draftTrackTags = {"filename": "font.ttf", "mimetype": "font/ttf"}
widgets = {
"#type_select": FakeValueWidget(TrackType.ATTACHMENT),
"#audio_layout_select": FakeValueWidget(AudioLayout.LAYOUT_UNDEFINED),
"#language_select": FakeValueWidget(Select.NULL),
"#title_input": FakeInputWidget(""),
"#dispositions_selection_list": FakeSelectionListWidget(set()),
}
screen.query_one = lambda selector, _widget_type=None: widgets[selector]
descriptor = screen.getTrackDescriptorFromInput()
self.assertEqual(TrackType.ATTACHMENT, descriptor.getType())
self.assertEqual(AttachmentFormat.TTF, descriptor.getAttachmentFormat())
self.assertEqual(TrackCodec.UNKNOWN, descriptor.getCodec())
def test_track_details_screen_auto_sets_localized_title_from_selected_language(self):
set_current_language("de")
screen = object.__new__(TrackDetailsScreen)
@@ -521,6 +549,11 @@ class TagTableScreenStateTests(unittest.TestCase):
screen.tagsTable = FakeTagTable()
screen.shiftedSeasonsTable = FakeTagTable()
screen._PatternDetailsScreen__pattern = object()
screen._PatternDetailsScreen__showDescriptor = None
widgets = {
"#show_quality_hint": FakeStaticWidget(),
}
screen.query_one = lambda selector, _type=None: widgets[selector]
calls = []
screen.updateTags = lambda: calls.append("updateTags")
@@ -534,6 +567,48 @@ class TagTableScreenStateTests(unittest.TestCase):
calls,
)
def test_pattern_details_screen_on_mount_shows_show_quality_hint_for_new_pattern(self):
set_current_language("en")
screen = object.__new__(PatternDetailsScreen)
screen.context = {}
screen._PatternDetailsScreen__showDescriptor = ShowDescriptor(
id=7,
name="Demo",
year=1999,
quality=23,
)
screen._PatternDetailsScreen__pattern = None
widgets = {
"#showlabel": FakeStaticWidget(),
"#show_quality_hint": FakeStaticWidget(),
}
screen.query_one = lambda selector, _type=None: widgets[selector]
screen.on_mount()
self.assertEqual("7 - Demo (1999)", widgets["#showlabel"].value)
self.assertEqual("Show: 23", widgets["#show_quality_hint"].value)
def test_pattern_details_screen_show_quality_hint_is_hidden_when_pattern_quality_exists(self):
set_current_language("en")
screen = object.__new__(PatternDetailsScreen)
screen._PatternDetailsScreen__showDescriptor = ShowDescriptor(
id=7,
name="Demo",
year=1999,
quality=23,
)
screen._PatternDetailsScreen__pattern = type(
"_Pattern",
(),
{"quality": 19},
)()
self.assertEqual("", screen.getShowQualityHintText())
def test_inspect_details_screen_handle_edit_pattern_refreshes_even_without_result(self):
screen = object.__new__(InspectDetailsScreen)
@@ -695,6 +770,142 @@ class TagTableScreenStateTests(unittest.TestCase):
self.assertIn("English Full", screen.tracksTable.rows["row-0"])
self.assertIs(target_track, screen.getSelectedTrackDescriptor())
def test_inspect_details_screen_update_tracks_shows_attachment_format_and_blanks_language(self):
attachment_track = TrackDescriptor(
index=4,
source_index=4,
sub_index=0,
track_type=TrackType.ATTACHMENT,
attachment_format=AttachmentFormat.TTF,
tags={"filename": "font.ttf", "mimetype": "font/ttf"},
)
screen = object.__new__(InspectDetailsScreen)
screen.tracksTable = FakeTagTable()
screen._sourceMediaDescriptor = FakeMediaDescriptor([attachment_track])
screen._targetMediaDescriptor = None
screen._currentPattern = None
screen._trackRowData = {}
screen._applyNormalization = False
screen.updateTracks()
row = screen.tracksTable.rows["row-0"]
self.assertEqual("4", row[0])
self.assertEqual("TTF", row[3])
self.assertEqual(" ", row[5])
self.assertEqual(" ", row[7])
self.assertEqual(" ", row[8])
def test_inspect_details_screen_update_tracks_shows_unknown_for_unknown_attachment_format(self):
attachment_track = TrackDescriptor(
index=5,
source_index=5,
sub_index=0,
track_type=TrackType.ATTACHMENT,
attachment_format=AttachmentFormat.UNKNOWN,
tags={"filename": "blob.bin", "mimetype": "application/octet-stream"},
)
screen = object.__new__(InspectDetailsScreen)
screen.tracksTable = FakeTagTable()
screen._sourceMediaDescriptor = FakeMediaDescriptor([attachment_track])
screen._targetMediaDescriptor = None
screen._currentPattern = None
screen._trackRowData = {}
screen._applyNormalization = False
screen.updateTracks()
row = screen.tracksTable.rows["row-0"]
self.assertEqual("unknown", row[3])
self.assertEqual(" ", row[5])
def test_inspect_details_screen_uses_source_font_attachments_for_styled_ass(self):
class _Config:
def getData(self):
return {}
class _Pattern:
def __init__(self, media_descriptor):
self._media_descriptor = media_descriptor
def getMediaDescriptor(self, _context):
return self._media_descriptor
source_descriptor = MediaDescriptor(
track_descriptors=[
TrackDescriptor(
index=0,
source_index=0,
sub_index=0,
track_type=TrackType.SUBTITLE,
codec_name=TrackCodec.ASS,
tags={"title": "Styled Subtitle"},
),
TrackDescriptor(
index=1,
source_index=1,
sub_index=0,
track_type=TrackType.ATTACHMENT,
attachment_format=AttachmentFormat.TTF,
tags={"filename": "current.ttf", "mimetype": "font/ttf"},
),
]
)
pattern_descriptor = MediaDescriptor(
track_descriptors=[
TrackDescriptor(
index=0,
source_index=0,
sub_index=0,
track_type=TrackType.SUBTITLE,
codec_name=TrackCodec.ASS,
tags={"title": "Styled Subtitle"},
),
TrackDescriptor(
index=1,
source_index=1,
sub_index=0,
track_type=TrackType.ATTACHMENT,
attachment_format=AttachmentFormat.TTF,
tags={"filename": "old.ttf", "mimetype": "font/ttf"},
),
TrackDescriptor(
index=2,
source_index=2,
sub_index=1,
track_type=TrackType.ATTACHMENT,
attachment_format=AttachmentFormat.TTF,
tags={"filename": "missing.ttf", "mimetype": "font/ttf"},
),
]
)
screen = object.__new__(InspectDetailsScreen)
screen.context = {"logger": get_ffx_logger(), "config": _Config()}
resolved_descriptor = screen._resolve_target_media_descriptor(
_Pattern(pattern_descriptor),
source_descriptor,
)
attachment_tracks = resolved_descriptor.getAttachmentTracks()
self.assertEqual(1, len(attachment_tracks))
self.assertEqual({"filename": "current.ttf", "mimetype": "font/ttf"}, attachment_tracks[0].getTags())
change_set = MediaDescriptorChangeSet(
screen.context,
resolved_descriptor,
source_descriptor,
).getChangeSetObj()
self.assertNotIn(
1,
change_set.get("tracks", {}).get(DIFF_REMOVED_KEY, {}),
)
def test_inspect_details_screen_maps_target_selection_back_to_source_track(self):
source_track = TrackDescriptor(
index=3,

View File

@@ -0,0 +1,61 @@
from __future__ import annotations
from pathlib import Path
import sys
import unittest
SRC_ROOT = Path(__file__).resolve().parents[2] / "src"
if str(SRC_ROOT) not in sys.path:
sys.path.insert(0, str(SRC_ROOT))
from ffx.attachment_format import AttachmentFormat # noqa: E402
from ffx.track_codec import TrackCodec # noqa: E402
from ffx.track_descriptor import TrackDescriptor # noqa: E402
from ffx.track_type import TrackType # noqa: E402
class TrackDescriptorProbeTests(unittest.TestCase):
def test_attachment_without_codec_name_uses_font_metadata_to_identify_ttf(self):
descriptor = TrackDescriptor.fromFfprobe(
{
"index": 4,
"codec_type": "attachment",
"disposition": {"default": 0},
"tags": {
"filename": "AmazonEmberTanuki-Italic.ttf",
"mimetype": "font/ttf",
},
},
subIndex=0,
)
self.assertIsNotNone(descriptor)
self.assertEqual(TrackType.ATTACHMENT, descriptor.getType())
self.assertEqual(AttachmentFormat.TTF, descriptor.getAttachmentFormat())
self.assertEqual(AttachmentFormat.TTF, descriptor.getFormatDescriptor())
self.assertEqual(TrackCodec.UNKNOWN, descriptor.getCodec())
def test_attachment_without_codec_name_still_probes_as_unknown_when_not_font(self):
descriptor = TrackDescriptor.fromFfprobe(
{
"index": 9,
"codec_type": "attachment",
"disposition": {"default": 0},
"tags": {
"filename": "cover.bin",
"mimetype": "application/octet-stream",
},
},
subIndex=0,
)
self.assertIsNotNone(descriptor)
self.assertEqual(TrackType.ATTACHMENT, descriptor.getType())
self.assertEqual(AttachmentFormat.UNKNOWN, descriptor.getAttachmentFormat())
self.assertEqual(TrackCodec.UNKNOWN, descriptor.getCodec())
if __name__ == "__main__":
unittest.main()