from __future__ import annotations import logging from pathlib import Path import sys import tempfile import unittest import click SRC_ROOT = Path(__file__).resolve().parents[2] / "src" if str(SRC_ROOT) not in sys.path: sys.path.insert(0, str(SRC_ROOT)) from ffx.audio_layout import AudioLayout # noqa: E402 from ffx.database import databaseContext # noqa: E402 from ffx.file_properties import FileProperties # noqa: E402 from ffx.model.pattern import Pattern # noqa: E402 from ffx.pattern_controller import ( # noqa: E402 DuplicatePatternMatchError, InvalidPatternSchemaError, PatternController, ) from ffx.show_controller import ShowController # noqa: E402 from ffx.show_descriptor import ShowDescriptor # noqa: E402 from ffx.track_controller import TrackController # noqa: E402 from ffx.track_descriptor import TrackDescriptor # noqa: E402 from ffx.track_disposition import TrackDisposition # noqa: E402 from ffx.track_type import TrackType # noqa: E402 class StaticConfig: def __init__(self, data: dict | None = None): self._data = data or {} def getData(self): return self._data def make_logger(name: str) -> logging.Logger: logger = logging.getLogger(name) logger.handlers = [] logger.setLevel(logging.DEBUG) logger.propagate = False logger.addHandler(logging.NullHandler()) return logger def make_context(database_path: Path) -> dict: return { "logger": make_logger(f"ffx-test-pattern-{database_path.stem}"), "config": StaticConfig(), "database": databaseContext(str(database_path)), "use_pattern": True, } def make_track_descriptor( index: int = 0, *, source_index: int | None = None, track_type: TrackType = TrackType.VIDEO, title: str = "", dispositions: set[TrackDisposition] | None = None, ) -> TrackDescriptor: kwargs = { TrackDescriptor.INDEX_KEY: index, TrackDescriptor.SOURCE_INDEX_KEY: index if source_index is None else source_index, TrackDescriptor.TRACK_TYPE_KEY: track_type, TrackDescriptor.TAGS_KEY: {"title": title} if title else {}, TrackDescriptor.DISPOSITION_SET_KEY: dispositions or set(), } if track_type == TrackType.AUDIO: kwargs[TrackDescriptor.AUDIO_LAYOUT_KEY] = AudioLayout.LAYOUT_STEREO return TrackDescriptor(**kwargs) class PatternManagementTests(unittest.TestCase): def setUp(self): self.tempdir = tempfile.TemporaryDirectory() self.database_path = Path(self.tempdir.name) / "pattern-test.db" self.context = make_context(self.database_path) self.pattern_controller = PatternController(self.context) self.track_controller = TrackController(self.context) self.show_controller = ShowController(self.context) PatternController._clear_regex_cache() def tearDown(self): self.context["database"]["engine"].dispose() self.tempdir.cleanup() PatternController._clear_regex_cache() def add_show(self, show_id: int, name: str) -> None: self.show_controller.updateShow( ShowDescriptor( id=show_id, name=name, year=2000 + show_id, ) ) def save_pattern( self, show_id: int, pattern_expression: str, *, tracks: list[TrackDescriptor] | None = None, ) -> int: self.add_show(show_id, f"Show {show_id}") return self.pattern_controller.savePatternSchema( { "show_id": show_id, "pattern": pattern_expression, "quality": 0, "notes": "", }, trackDescriptors=tracks or [make_track_descriptor(0)], ) def insert_trackless_pattern_row(self, show_id: int, pattern_expression: str) -> int: self.add_show(show_id, f"Show {show_id}") Session = self.context["database"]["session"] session = Session() try: pattern = Pattern(show_id=show_id, pattern=pattern_expression) session.add(pattern) session.commit() return int(pattern.id) finally: session.close() def test_match_filename_returns_single_matching_pattern(self): pattern_id = self.save_pattern(1, r"^single_(s[0-9]+e[0-9]+)\.mkv$") match = self.pattern_controller.matchFilename("single_s01e01.mkv") self.assertEqual(pattern_id, match["pattern"].getId()) self.assertEqual("s01e01", match["match"].group(1)) def test_match_filename_raises_for_duplicate_matches_in_same_show(self): self.save_pattern(1, r"^same_(s[0-9]+e[0-9]+)\.mkv$") self.save_pattern(1, r"^same_.*$") with self.assertRaises(DuplicatePatternMatchError) as caught: self.pattern_controller.matchFilename("same_s01e01.mkv") self.assertIn("matched more than one pattern", str(caught.exception)) self.assertIn("show #1", str(caught.exception)) def test_match_filename_raises_for_duplicate_matches_across_shows(self): self.save_pattern(1, r"^cross_(s[0-9]+e[0-9]+)\.mkv$") self.save_pattern(2, r"^cross_.*$") with self.assertRaises(DuplicatePatternMatchError) as caught: self.pattern_controller.matchFilename("cross_s01e01.mkv") self.assertIn("show #1", str(caught.exception)) self.assertIn("show #2", str(caught.exception)) def test_update_pattern_refreshes_regex_matching_after_change(self): pattern_id = self.save_pattern(1, r"^before_(s[0-9]+e[0-9]+)\.mkv$") self.assertTrue( self.pattern_controller.updatePattern( pattern_id, { "show_id": 1, "pattern": r"^after_(s[0-9]+e[0-9]+)\.mkv$", "quality": 0, "notes": "", }, ) ) self.assertEqual({}, self.pattern_controller.matchFilename("before_s01e01.mkv")) match = self.pattern_controller.matchFilename("after_s01e01.mkv") self.assertEqual(pattern_id, match["pattern"].getId()) def test_save_pattern_schema_rejects_zero_track_patterns(self): self.add_show(1, "Empty Pattern Show") with self.assertRaises(InvalidPatternSchemaError) as caught: self.pattern_controller.savePatternSchema( { "show_id": 1, "pattern": r"^empty_(s[0-9]+e[0-9]+)\.mkv$", }, trackDescriptors=[], ) self.assertIn("at least one track", str(caught.exception)) def test_match_filename_rejects_existing_trackless_pattern_rows(self): self.insert_trackless_pattern_row(1, r"^invalid_(s[0-9]+e[0-9]+)\.mkv$") with self.assertRaises(InvalidPatternSchemaError) as caught: self.pattern_controller.matchFilename("invalid_s01e01.mkv") self.assertIn("has no tracks", str(caught.exception)) def test_file_properties_skips_pattern_matching_when_disabled(self): self.save_pattern(1, r"^nopattern_(s[0-9]+e[0-9]+)\.mkv$") self.save_pattern(2, r"^nopattern_.*$") no_pattern_context = dict(self.context) no_pattern_context["use_pattern"] = False file_properties = FileProperties( no_pattern_context, "/tmp/nopattern_s01e01.mkv", ) self.assertIsNone(file_properties.getPattern()) self.assertEqual(-1, file_properties.getShowId()) self.assertEqual(1, file_properties.getSeason()) self.assertEqual(1, file_properties.getEpisode()) def test_track_controller_refuses_to_delete_last_track(self): pattern_id = self.save_pattern(1, r"^delete_(s[0-9]+e[0-9]+)\.mkv$") track = self.track_controller.getTrack(pattern_id, 0) with self.assertRaises(click.ClickException) as caught: self.track_controller.deleteTrack(track.getId()) self.assertIn("last track", str(caught.exception)) def test_exact_duplicate_pattern_definition_is_rejected(self): self.save_pattern(1, r"^unique_(s[0-9]+e[0-9]+)\.mkv$") with self.assertRaises(click.ClickException) as caught: self.save_pattern(1, r"^unique_(s[0-9]+e[0-9]+)\.mkv$") self.assertIn("already exists", str(caught.exception)) if __name__ == "__main__": unittest.main()