from __future__ import annotations from pathlib import Path import sqlite3 import sys import tempfile import unittest from unittest.mock import patch SRC_ROOT = Path(__file__).resolve().parents[2] / "src" if str(SRC_ROOT) not in sys.path: sys.path.insert(0, str(SRC_ROOT)) from ffx.constants import DATABASE_VERSION # noqa: E402 from ffx.database import DATABASE_VERSION_KEY, databaseContext, getDatabaseVersion # noqa: E402 from ffx.model.shifted_season import ShiftedSeason # noqa: E402 from ffx.model.property import Property # noqa: E402 from ffx.model.show import Base # noqa: E402 from ffx.show_controller import ShowController # noqa: E402 from ffx.show_descriptor import ShowDescriptor # noqa: E402 from ffx.shifted_season_controller import ShiftedSeasonController # noqa: E402 class StaticConfig: def getData(self): return {} class DatabaseContextTests(unittest.TestCase): def setUp(self): self.tempdir = tempfile.TemporaryDirectory() self.database_path = Path(self.tempdir.name) / "ffx-test.db" def tearDown(self): self.tempdir.cleanup() def test_database_context_bootstraps_new_database_with_current_version(self): with patch("ffx.database.Base.metadata.create_all", wraps=Base.metadata.create_all) as mocked_create_all: context = databaseContext(str(self.database_path)) try: self.assertTrue(self.database_path.exists()) self.assertEqual(DATABASE_VERSION, getDatabaseVersion(context)) finally: context["engine"].dispose() mocked_create_all.assert_called_once() def test_database_context_skips_create_all_when_schema_is_already_present(self): initial_context = databaseContext(str(self.database_path)) initial_context["engine"].dispose() with patch("ffx.database.Base.metadata.create_all") as mocked_create_all: context = databaseContext(str(self.database_path)) try: self.assertEqual(DATABASE_VERSION, getDatabaseVersion(context)) finally: context["engine"].dispose() mocked_create_all.assert_not_called() def test_database_context_restores_missing_version_property_without_schema_bootstrap(self): context = databaseContext(str(self.database_path)) Session = context["session"] try: session = Session() try: version_row = ( session.query(Property) .filter(Property.key == DATABASE_VERSION_KEY) .first() ) session.delete(version_row) session.commit() finally: session.close() finally: context["engine"].dispose() with patch("ffx.database.Base.metadata.create_all") as mocked_create_all: reopened_context = databaseContext(str(self.database_path)) try: self.assertEqual(DATABASE_VERSION, getDatabaseVersion(reopened_context)) finally: reopened_context["engine"].dispose() mocked_create_all.assert_not_called() def test_database_context_migrates_v2_shifted_seasons_schema_to_v3(self): database_context = databaseContext(str(self.database_path)) context = { "database": database_context, "config": StaticConfig(), "logger": object(), } try: ShowController(context).updateShow( ShowDescriptor(id=1, name="Demo", year=2000) ) shifted_season_id = ShiftedSeasonController(context).addShiftedSeason( showId=1, shiftedSeasonObj={ "original_season": 1, "first_episode": 1, "last_episode": 10, "season_offset": 1, "episode_offset": -10, }, ) finally: database_context["engine"].dispose() connection = sqlite3.connect(self.database_path) try: cursor = connection.cursor() cursor.execute("DROP INDEX IF EXISTS ix_shifted_seasons_show_id") cursor.execute("DROP INDEX IF EXISTS ix_shifted_seasons_pattern_id") cursor.execute( "ALTER TABLE shifted_seasons RENAME TO shifted_seasons_v3_current" ) cursor.execute( """ CREATE TABLE shifted_seasons ( id INTEGER PRIMARY KEY, show_id INTEGER, original_season INTEGER, first_episode INTEGER DEFAULT -1, last_episode INTEGER DEFAULT -1, season_offset INTEGER DEFAULT 0, episode_offset INTEGER DEFAULT 0, FOREIGN KEY(show_id) REFERENCES shows(id) ON DELETE CASCADE ) """ ) cursor.execute( """ INSERT INTO shifted_seasons ( id, show_id, original_season, first_episode, last_episode, season_offset, episode_offset ) SELECT id, show_id, original_season, first_episode, last_episode, season_offset, episode_offset FROM shifted_seasons_v3_current """ ) cursor.execute("DROP TABLE shifted_seasons_v3_current") cursor.execute( "UPDATE properties SET value = '2' WHERE key = ?", (DATABASE_VERSION_KEY,), ) connection.commit() finally: connection.close() reopened_context = databaseContext(str(self.database_path)) try: self.assertEqual(DATABASE_VERSION, getDatabaseVersion(reopened_context)) Session = reopened_context["session"] session = Session() try: migrated_shifted_season = ( session.query(ShiftedSeason) .filter(ShiftedSeason.id == shifted_season_id) .first() ) self.assertIsNotNone(migrated_shifted_season) self.assertEqual(1, migrated_shifted_season.getShowId()) self.assertIsNone(migrated_shifted_season.getPatternId()) self.assertEqual(1, migrated_shifted_season.getOriginalSeason()) self.assertEqual(1, migrated_shifted_season.getFirstEpisode()) self.assertEqual(10, migrated_shifted_season.getLastEpisode()) finally: session.close() finally: reopened_context["engine"].dispose() if __name__ == "__main__": unittest.main()