241 lines
8.3 KiB
Python
241 lines
8.3 KiB
Python
from __future__ import annotations
|
|
|
|
import logging
|
|
from pathlib import Path
|
|
import sys
|
|
import tempfile
|
|
import unittest
|
|
|
|
import click
|
|
|
|
|
|
SRC_ROOT = Path(__file__).resolve().parents[2] / "src"
|
|
|
|
if str(SRC_ROOT) not in sys.path:
|
|
sys.path.insert(0, str(SRC_ROOT))
|
|
|
|
|
|
from ffx.audio_layout import AudioLayout # noqa: E402
|
|
from ffx.database import databaseContext # noqa: E402
|
|
from ffx.file_properties import FileProperties # noqa: E402
|
|
from ffx.model.pattern import Pattern # noqa: E402
|
|
from ffx.pattern_controller import ( # noqa: E402
|
|
DuplicatePatternMatchError,
|
|
InvalidPatternSchemaError,
|
|
PatternController,
|
|
)
|
|
from ffx.show_controller import ShowController # noqa: E402
|
|
from ffx.show_descriptor import ShowDescriptor # noqa: E402
|
|
from ffx.track_controller import TrackController # noqa: E402
|
|
from ffx.track_descriptor import TrackDescriptor # noqa: E402
|
|
from ffx.track_disposition import TrackDisposition # noqa: E402
|
|
from ffx.track_type import TrackType # noqa: E402
|
|
|
|
|
|
class StaticConfig:
|
|
def __init__(self, data: dict | None = None):
|
|
self._data = data or {}
|
|
|
|
def getData(self):
|
|
return self._data
|
|
|
|
|
|
def make_logger(name: str) -> logging.Logger:
|
|
logger = logging.getLogger(name)
|
|
logger.handlers = []
|
|
logger.setLevel(logging.DEBUG)
|
|
logger.propagate = False
|
|
logger.addHandler(logging.NullHandler())
|
|
return logger
|
|
|
|
|
|
def make_context(database_path: Path) -> dict:
|
|
return {
|
|
"logger": make_logger(f"ffx-test-pattern-{database_path.stem}"),
|
|
"config": StaticConfig(),
|
|
"database": databaseContext(str(database_path)),
|
|
"use_pattern": True,
|
|
}
|
|
|
|
|
|
def make_track_descriptor(
|
|
index: int = 0,
|
|
*,
|
|
source_index: int | None = None,
|
|
track_type: TrackType = TrackType.VIDEO,
|
|
title: str = "",
|
|
dispositions: set[TrackDisposition] | None = None,
|
|
) -> TrackDescriptor:
|
|
kwargs = {
|
|
TrackDescriptor.INDEX_KEY: index,
|
|
TrackDescriptor.SOURCE_INDEX_KEY: index if source_index is None else source_index,
|
|
TrackDescriptor.TRACK_TYPE_KEY: track_type,
|
|
TrackDescriptor.TAGS_KEY: {"title": title} if title else {},
|
|
TrackDescriptor.DISPOSITION_SET_KEY: dispositions or set(),
|
|
}
|
|
if track_type == TrackType.AUDIO:
|
|
kwargs[TrackDescriptor.AUDIO_LAYOUT_KEY] = AudioLayout.LAYOUT_STEREO
|
|
return TrackDescriptor(**kwargs)
|
|
|
|
|
|
class PatternManagementTests(unittest.TestCase):
|
|
def setUp(self):
|
|
self.tempdir = tempfile.TemporaryDirectory()
|
|
self.database_path = Path(self.tempdir.name) / "pattern-test.db"
|
|
self.context = make_context(self.database_path)
|
|
self.pattern_controller = PatternController(self.context)
|
|
self.track_controller = TrackController(self.context)
|
|
self.show_controller = ShowController(self.context)
|
|
PatternController._clear_regex_cache()
|
|
|
|
def tearDown(self):
|
|
self.context["database"]["engine"].dispose()
|
|
self.tempdir.cleanup()
|
|
PatternController._clear_regex_cache()
|
|
|
|
def add_show(self, show_id: int, name: str) -> None:
|
|
self.show_controller.updateShow(
|
|
ShowDescriptor(
|
|
id=show_id,
|
|
name=name,
|
|
year=2000 + show_id,
|
|
)
|
|
)
|
|
|
|
def save_pattern(
|
|
self,
|
|
show_id: int,
|
|
pattern_expression: str,
|
|
*,
|
|
tracks: list[TrackDescriptor] | None = None,
|
|
) -> int:
|
|
self.add_show(show_id, f"Show {show_id}")
|
|
return self.pattern_controller.savePatternSchema(
|
|
{
|
|
"show_id": show_id,
|
|
"pattern": pattern_expression,
|
|
"quality": 0,
|
|
"notes": "",
|
|
},
|
|
trackDescriptors=tracks or [make_track_descriptor(0)],
|
|
)
|
|
|
|
def insert_trackless_pattern_row(self, show_id: int, pattern_expression: str) -> int:
|
|
self.add_show(show_id, f"Show {show_id}")
|
|
Session = self.context["database"]["session"]
|
|
session = Session()
|
|
try:
|
|
pattern = Pattern(show_id=show_id, pattern=pattern_expression)
|
|
session.add(pattern)
|
|
session.commit()
|
|
return int(pattern.id)
|
|
finally:
|
|
session.close()
|
|
|
|
def test_match_filename_returns_single_matching_pattern(self):
|
|
pattern_id = self.save_pattern(1, r"^single_(s[0-9]+e[0-9]+)\.mkv$")
|
|
|
|
match = self.pattern_controller.matchFilename("single_s01e01.mkv")
|
|
|
|
self.assertEqual(pattern_id, match["pattern"].getId())
|
|
self.assertEqual("s01e01", match["match"].group(1))
|
|
|
|
def test_match_filename_raises_for_duplicate_matches_in_same_show(self):
|
|
self.save_pattern(1, r"^same_(s[0-9]+e[0-9]+)\.mkv$")
|
|
self.save_pattern(1, r"^same_.*$")
|
|
|
|
with self.assertRaises(DuplicatePatternMatchError) as caught:
|
|
self.pattern_controller.matchFilename("same_s01e01.mkv")
|
|
|
|
self.assertIn("matched more than one pattern", str(caught.exception))
|
|
self.assertIn("show #1", str(caught.exception))
|
|
|
|
def test_match_filename_raises_for_duplicate_matches_across_shows(self):
|
|
self.save_pattern(1, r"^cross_(s[0-9]+e[0-9]+)\.mkv$")
|
|
self.save_pattern(2, r"^cross_.*$")
|
|
|
|
with self.assertRaises(DuplicatePatternMatchError) as caught:
|
|
self.pattern_controller.matchFilename("cross_s01e01.mkv")
|
|
|
|
self.assertIn("show #1", str(caught.exception))
|
|
self.assertIn("show #2", str(caught.exception))
|
|
|
|
def test_update_pattern_refreshes_regex_matching_after_change(self):
|
|
pattern_id = self.save_pattern(1, r"^before_(s[0-9]+e[0-9]+)\.mkv$")
|
|
|
|
self.assertTrue(
|
|
self.pattern_controller.updatePattern(
|
|
pattern_id,
|
|
{
|
|
"show_id": 1,
|
|
"pattern": r"^after_(s[0-9]+e[0-9]+)\.mkv$",
|
|
"quality": 0,
|
|
"notes": "",
|
|
},
|
|
)
|
|
)
|
|
|
|
self.assertEqual({}, self.pattern_controller.matchFilename("before_s01e01.mkv"))
|
|
match = self.pattern_controller.matchFilename("after_s01e01.mkv")
|
|
self.assertEqual(pattern_id, match["pattern"].getId())
|
|
|
|
def test_save_pattern_schema_rejects_zero_track_patterns(self):
|
|
self.add_show(1, "Empty Pattern Show")
|
|
|
|
with self.assertRaises(InvalidPatternSchemaError) as caught:
|
|
self.pattern_controller.savePatternSchema(
|
|
{
|
|
"show_id": 1,
|
|
"pattern": r"^empty_(s[0-9]+e[0-9]+)\.mkv$",
|
|
},
|
|
trackDescriptors=[],
|
|
)
|
|
|
|
self.assertIn("at least one track", str(caught.exception))
|
|
|
|
def test_match_filename_rejects_existing_trackless_pattern_rows(self):
|
|
self.insert_trackless_pattern_row(1, r"^invalid_(s[0-9]+e[0-9]+)\.mkv$")
|
|
|
|
with self.assertRaises(InvalidPatternSchemaError) as caught:
|
|
self.pattern_controller.matchFilename("invalid_s01e01.mkv")
|
|
|
|
self.assertIn("has no tracks", str(caught.exception))
|
|
|
|
def test_file_properties_skips_pattern_matching_when_disabled(self):
|
|
self.save_pattern(1, r"^nopattern_(s[0-9]+e[0-9]+)\.mkv$")
|
|
self.save_pattern(2, r"^nopattern_.*$")
|
|
|
|
no_pattern_context = dict(self.context)
|
|
no_pattern_context["use_pattern"] = False
|
|
|
|
file_properties = FileProperties(
|
|
no_pattern_context,
|
|
"/tmp/nopattern_s01e01.mkv",
|
|
)
|
|
|
|
self.assertIsNone(file_properties.getPattern())
|
|
self.assertEqual(-1, file_properties.getShowId())
|
|
self.assertEqual(1, file_properties.getSeason())
|
|
self.assertEqual(1, file_properties.getEpisode())
|
|
|
|
def test_track_controller_refuses_to_delete_last_track(self):
|
|
pattern_id = self.save_pattern(1, r"^delete_(s[0-9]+e[0-9]+)\.mkv$")
|
|
track = self.track_controller.getTrack(pattern_id, 0)
|
|
|
|
with self.assertRaises(click.ClickException) as caught:
|
|
self.track_controller.deleteTrack(track.getId())
|
|
|
|
self.assertIn("last track", str(caught.exception))
|
|
|
|
def test_exact_duplicate_pattern_definition_is_rejected(self):
|
|
self.save_pattern(1, r"^unique_(s[0-9]+e[0-9]+)\.mkv$")
|
|
|
|
with self.assertRaises(click.ClickException) as caught:
|
|
self.save_pattern(1, r"^unique_(s[0-9]+e[0-9]+)\.mkv$")
|
|
|
|
self.assertIn("already exists", str(caught.exception))
|
|
|
|
|
|
if __name__ == "__main__":
|
|
unittest.main()
|