Opt pattern matching

This commit is contained in:
Javanaut
2026-04-09 16:11:51 +02:00
parent be0f4b4c4e
commit d19e69990a
16 changed files with 1248 additions and 522 deletions

View File

@@ -0,0 +1 @@

View File

@@ -0,0 +1,138 @@
from __future__ import annotations
from pathlib import Path
import tempfile
import unittest
from tests.support.ffx_bundle import (
PatternTrackSpec,
SourceTrackSpec,
add_show,
build_controller_context,
create_source_fixture,
dispose_controller_context,
expected_output_path,
run_ffx_convert,
)
from ffx.pattern_controller import PatternController
from ffx.track_type import TrackType
try:
import pytest
except ImportError: # pragma: no cover - unittest-only environments
pytest = None
if pytest is not None:
pytestmark = [pytest.mark.integration, pytest.mark.pattern_management]
class PatternManagementCliTests(unittest.TestCase):
def setUp(self):
self.tempdir = tempfile.TemporaryDirectory()
self.workdir = Path(self.tempdir.name)
self.home_dir = self.workdir / "home"
self.home_dir.mkdir()
self.database_path = self.workdir / "test.db"
def tearDown(self):
self.tempdir.cleanup()
def prepare_duplicate_matching_patterns(self):
context = build_controller_context(self.database_path)
try:
add_show(context, show_id=1)
add_show(context, show_id=2)
controller = PatternController(context)
track_descriptors = [
PatternTrackSpec(index=0, source_index=0, track_type=TrackType.VIDEO)
]
def to_track_descriptor(spec: PatternTrackSpec):
from ffx.track_descriptor import TrackDescriptor
kwargs = {
TrackDescriptor.INDEX_KEY: spec.index,
TrackDescriptor.SOURCE_INDEX_KEY: spec.source_index,
TrackDescriptor.TRACK_TYPE_KEY: spec.track_type,
TrackDescriptor.TAGS_KEY: dict(spec.tags),
TrackDescriptor.DISPOSITION_SET_KEY: set(spec.dispositions),
}
return TrackDescriptor(**kwargs)
controller.savePatternSchema(
{"show_id": 1, "pattern": r"^dup_(s[0-9]+e[0-9]+)\.mkv$"},
[to_track_descriptor(track_descriptors[0])],
)
controller.savePatternSchema(
{"show_id": 2, "pattern": r"^dup_.*$"},
[to_track_descriptor(track_descriptors[0])],
)
finally:
dispose_controller_context(context)
def test_convert_fails_when_filename_matches_more_than_one_pattern(self):
self.prepare_duplicate_matching_patterns()
source_filename = "dup_s01e01.mkv"
source_path = create_source_fixture(
self.workdir,
source_filename,
[
SourceTrackSpec(TrackType.VIDEO, identity="video-0"),
SourceTrackSpec(TrackType.AUDIO, identity="audio-1", language="eng"),
],
)
completed = run_ffx_convert(
self.workdir,
self.home_dir,
self.database_path,
"--video-encoder",
"copy",
"--no-tmdb",
"--no-prompt",
"--no-signature",
str(source_path),
)
self.assertNotEqual(completed.returncode, 0)
error_output = f"{completed.stdout}\n{completed.stderr}"
self.assertIn("matched more than one pattern", error_output)
self.assertFalse(expected_output_path(self.workdir, source_filename).exists())
def test_convert_can_ignore_duplicate_matches_when_no_pattern_is_requested(self):
self.prepare_duplicate_matching_patterns()
source_filename = "dup_s01e01.mkv"
source_path = create_source_fixture(
self.workdir,
source_filename,
[
SourceTrackSpec(TrackType.VIDEO, identity="video-0"),
SourceTrackSpec(TrackType.AUDIO, identity="audio-1", language="eng"),
],
)
completed = run_ffx_convert(
self.workdir,
self.home_dir,
self.database_path,
"--video-encoder",
"copy",
"--no-pattern",
"--no-tmdb",
"--no-prompt",
"--no-signature",
str(source_path),
)
self.assertEqual(
0,
completed.returncode,
f"STDOUT:\n{completed.stdout}\nSTDERR:\n{completed.stderr}",
)
self.assertTrue(expected_output_path(self.workdir, source_filename).exists())
if __name__ == "__main__":
unittest.main()

View File

@@ -22,7 +22,6 @@ from ffx.database import databaseContext
from ffx.pattern_controller import PatternController
from ffx.show_controller import ShowController
from ffx.show_descriptor import ShowDescriptor
from ffx.track_controller import TrackController
from ffx.track_descriptor import TrackDescriptor
from ffx.track_disposition import TrackDisposition
from ffx.track_type import TrackType
@@ -219,44 +218,41 @@ def create_source_fixture(workdir: Path, filename: str, tracks: list[SourceTrack
return output_path
def add_show_and_pattern(context: dict, filename_pattern: str, show_id: int = 1) -> int:
def add_show(context: dict, show_id: int = 1) -> None:
show_descriptor = ShowDescriptor(
id=show_id,
name="Bundle Test Show",
year=2000,
)
ShowController(context).updateShow(show_descriptor)
pattern_id = PatternController(context).addPattern(
{
"show_id": show_id,
"pattern": filename_pattern,
}
)
if not pattern_id:
raise AssertionError("Failed to create pattern in test database")
return pattern_id
def add_pattern_tracks(context: dict, pattern_id: int, track_specs: list[PatternTrackSpec]) -> None:
track_controller = TrackController(context)
for track in track_specs:
kwargs = {
TrackDescriptor.INDEX_KEY: track.index,
TrackDescriptor.SOURCE_INDEX_KEY: track.source_index,
TrackDescriptor.TRACK_TYPE_KEY: track.track_type,
TrackDescriptor.TAGS_KEY: dict(track.tags),
TrackDescriptor.DISPOSITION_SET_KEY: set(track.dispositions),
}
if track.track_type == TrackType.AUDIO:
kwargs[TrackDescriptor.AUDIO_LAYOUT_KEY] = track.audio_layout
track_controller.addTrack(TrackDescriptor(**kwargs), pattern_id)
def prepare_pattern_database(database_path: Path, filename_pattern: str, track_specs: list[PatternTrackSpec], show_id: int = 1) -> None:
context = build_controller_context(database_path)
try:
pattern_id = add_show_and_pattern(context, filename_pattern, show_id=show_id)
add_pattern_tracks(context, pattern_id, track_specs)
add_show(context, show_id=show_id)
track_descriptors = []
for track in track_specs:
kwargs = {
TrackDescriptor.INDEX_KEY: track.index,
TrackDescriptor.SOURCE_INDEX_KEY: track.source_index,
TrackDescriptor.TRACK_TYPE_KEY: track.track_type,
TrackDescriptor.TAGS_KEY: dict(track.tags),
TrackDescriptor.DISPOSITION_SET_KEY: set(track.dispositions),
}
if track.track_type == TrackType.AUDIO:
kwargs[TrackDescriptor.AUDIO_LAYOUT_KEY] = track.audio_layout
track_descriptors.append(TrackDescriptor(**kwargs))
pattern_id = PatternController(context).savePatternSchema(
{
"show_id": show_id,
"pattern": filename_pattern,
},
trackDescriptors=track_descriptors,
)
if not pattern_id:
raise AssertionError("Failed to create pattern in test database")
finally:
dispose_controller_context(context)

View File

@@ -0,0 +1,240 @@
from __future__ import annotations
import logging
from pathlib import Path
import sys
import tempfile
import unittest
import click
SRC_ROOT = Path(__file__).resolve().parents[2] / "src"
if str(SRC_ROOT) not in sys.path:
sys.path.insert(0, str(SRC_ROOT))
from ffx.audio_layout import AudioLayout # noqa: E402
from ffx.database import databaseContext # noqa: E402
from ffx.file_properties import FileProperties # noqa: E402
from ffx.model.pattern import Pattern # noqa: E402
from ffx.pattern_controller import ( # noqa: E402
DuplicatePatternMatchError,
InvalidPatternSchemaError,
PatternController,
)
from ffx.show_controller import ShowController # noqa: E402
from ffx.show_descriptor import ShowDescriptor # noqa: E402
from ffx.track_controller import TrackController # noqa: E402
from ffx.track_descriptor import TrackDescriptor # noqa: E402
from ffx.track_disposition import TrackDisposition # noqa: E402
from ffx.track_type import TrackType # noqa: E402
class StaticConfig:
def __init__(self, data: dict | None = None):
self._data = data or {}
def getData(self):
return self._data
def make_logger(name: str) -> logging.Logger:
logger = logging.getLogger(name)
logger.handlers = []
logger.setLevel(logging.DEBUG)
logger.propagate = False
logger.addHandler(logging.NullHandler())
return logger
def make_context(database_path: Path) -> dict:
return {
"logger": make_logger(f"ffx-test-pattern-{database_path.stem}"),
"config": StaticConfig(),
"database": databaseContext(str(database_path)),
"use_pattern": True,
}
def make_track_descriptor(
index: int = 0,
*,
source_index: int | None = None,
track_type: TrackType = TrackType.VIDEO,
title: str = "",
dispositions: set[TrackDisposition] | None = None,
) -> TrackDescriptor:
kwargs = {
TrackDescriptor.INDEX_KEY: index,
TrackDescriptor.SOURCE_INDEX_KEY: index if source_index is None else source_index,
TrackDescriptor.TRACK_TYPE_KEY: track_type,
TrackDescriptor.TAGS_KEY: {"title": title} if title else {},
TrackDescriptor.DISPOSITION_SET_KEY: dispositions or set(),
}
if track_type == TrackType.AUDIO:
kwargs[TrackDescriptor.AUDIO_LAYOUT_KEY] = AudioLayout.LAYOUT_STEREO
return TrackDescriptor(**kwargs)
class PatternManagementTests(unittest.TestCase):
def setUp(self):
self.tempdir = tempfile.TemporaryDirectory()
self.database_path = Path(self.tempdir.name) / "pattern-test.db"
self.context = make_context(self.database_path)
self.pattern_controller = PatternController(self.context)
self.track_controller = TrackController(self.context)
self.show_controller = ShowController(self.context)
PatternController._clear_regex_cache()
def tearDown(self):
self.context["database"]["engine"].dispose()
self.tempdir.cleanup()
PatternController._clear_regex_cache()
def add_show(self, show_id: int, name: str) -> None:
self.show_controller.updateShow(
ShowDescriptor(
id=show_id,
name=name,
year=2000 + show_id,
)
)
def save_pattern(
self,
show_id: int,
pattern_expression: str,
*,
tracks: list[TrackDescriptor] | None = None,
) -> int:
self.add_show(show_id, f"Show {show_id}")
return self.pattern_controller.savePatternSchema(
{
"show_id": show_id,
"pattern": pattern_expression,
"quality": 0,
"notes": "",
},
trackDescriptors=tracks or [make_track_descriptor(0)],
)
def insert_trackless_pattern_row(self, show_id: int, pattern_expression: str) -> int:
self.add_show(show_id, f"Show {show_id}")
Session = self.context["database"]["session"]
session = Session()
try:
pattern = Pattern(show_id=show_id, pattern=pattern_expression)
session.add(pattern)
session.commit()
return int(pattern.id)
finally:
session.close()
def test_match_filename_returns_single_matching_pattern(self):
pattern_id = self.save_pattern(1, r"^single_(s[0-9]+e[0-9]+)\.mkv$")
match = self.pattern_controller.matchFilename("single_s01e01.mkv")
self.assertEqual(pattern_id, match["pattern"].getId())
self.assertEqual("s01e01", match["match"].group(1))
def test_match_filename_raises_for_duplicate_matches_in_same_show(self):
self.save_pattern(1, r"^same_(s[0-9]+e[0-9]+)\.mkv$")
self.save_pattern(1, r"^same_.*$")
with self.assertRaises(DuplicatePatternMatchError) as caught:
self.pattern_controller.matchFilename("same_s01e01.mkv")
self.assertIn("matched more than one pattern", str(caught.exception))
self.assertIn("show #1", str(caught.exception))
def test_match_filename_raises_for_duplicate_matches_across_shows(self):
self.save_pattern(1, r"^cross_(s[0-9]+e[0-9]+)\.mkv$")
self.save_pattern(2, r"^cross_.*$")
with self.assertRaises(DuplicatePatternMatchError) as caught:
self.pattern_controller.matchFilename("cross_s01e01.mkv")
self.assertIn("show #1", str(caught.exception))
self.assertIn("show #2", str(caught.exception))
def test_update_pattern_refreshes_regex_matching_after_change(self):
pattern_id = self.save_pattern(1, r"^before_(s[0-9]+e[0-9]+)\.mkv$")
self.assertTrue(
self.pattern_controller.updatePattern(
pattern_id,
{
"show_id": 1,
"pattern": r"^after_(s[0-9]+e[0-9]+)\.mkv$",
"quality": 0,
"notes": "",
},
)
)
self.assertEqual({}, self.pattern_controller.matchFilename("before_s01e01.mkv"))
match = self.pattern_controller.matchFilename("after_s01e01.mkv")
self.assertEqual(pattern_id, match["pattern"].getId())
def test_save_pattern_schema_rejects_zero_track_patterns(self):
self.add_show(1, "Empty Pattern Show")
with self.assertRaises(InvalidPatternSchemaError) as caught:
self.pattern_controller.savePatternSchema(
{
"show_id": 1,
"pattern": r"^empty_(s[0-9]+e[0-9]+)\.mkv$",
},
trackDescriptors=[],
)
self.assertIn("at least one track", str(caught.exception))
def test_match_filename_rejects_existing_trackless_pattern_rows(self):
self.insert_trackless_pattern_row(1, r"^invalid_(s[0-9]+e[0-9]+)\.mkv$")
with self.assertRaises(InvalidPatternSchemaError) as caught:
self.pattern_controller.matchFilename("invalid_s01e01.mkv")
self.assertIn("has no tracks", str(caught.exception))
def test_file_properties_skips_pattern_matching_when_disabled(self):
self.save_pattern(1, r"^nopattern_(s[0-9]+e[0-9]+)\.mkv$")
self.save_pattern(2, r"^nopattern_.*$")
no_pattern_context = dict(self.context)
no_pattern_context["use_pattern"] = False
file_properties = FileProperties(
no_pattern_context,
"/tmp/nopattern_s01e01.mkv",
)
self.assertIsNone(file_properties.getPattern())
self.assertEqual(-1, file_properties.getShowId())
self.assertEqual(1, file_properties.getSeason())
self.assertEqual(1, file_properties.getEpisode())
def test_track_controller_refuses_to_delete_last_track(self):
pattern_id = self.save_pattern(1, r"^delete_(s[0-9]+e[0-9]+)\.mkv$")
track = self.track_controller.getTrack(pattern_id, 0)
with self.assertRaises(click.ClickException) as caught:
self.track_controller.deleteTrack(track.getId())
self.assertIn("last track", str(caught.exception))
def test_exact_duplicate_pattern_definition_is_rejected(self):
self.save_pattern(1, r"^unique_(s[0-9]+e[0-9]+)\.mkv$")
with self.assertRaises(click.ClickException) as caught:
self.save_pattern(1, r"^unique_(s[0-9]+e[0-9]+)\.mkv$")
self.assertIn("already exists", str(caught.exception))
if __name__ == "__main__":
unittest.main()