Release v0.2.5

This commit is contained in:
Javanaut
2026-04-12 19:58:30 +02:00
32 changed files with 1753 additions and 283 deletions

1
.gitignore vendored
View File

@@ -21,3 +21,4 @@ venv/
*.mkv
*.webm
ffmpeg2pass-0.log
*.sup

View File

@@ -99,6 +99,16 @@ TMDB-backed metadata enrichment requires `TMDB_API_KEY` to be set in the environ
## Version History
### 0.2.5
- show-level quality and notes fields
- pattern-over-show-over-default season-shift resolution with dynamic DB migration loading
- migration prompt now reports the upgrade path and creates an in-place DB backup before applying schema changes
- `upgrade --branch <name>` now fetches remote-only branches before switching
- `unmux` now applies season shifting to subtitle output filenames
- convert now keeps DB-defined target subtitle dispositions authoritative over sidecar filename disposition flags when a pattern definition exists
- focused modern tests added around migrations, unmux, upgrade, and subtitle-disposition import precedence
### 0.2.4
- lightweight CLI commands now stay import-light via lazy runtime loading

View File

@@ -1,7 +1,7 @@
[project]
name = "ffx"
description = "FFX recoding and metadata managing tool"
version = "0.2.4"
version = "0.2.5"
license = {file = "LICENSE.md"}
dependencies = [
"requests",

View File

@@ -436,10 +436,14 @@ def upgrade(ctx, branch):
commandSequences.append(['git', 'reset', '--hard', 'HEAD'])
if branch:
commandSequences.append(['git', 'checkout', branch])
commandSequences += [
['git', 'fetch', 'origin', branch],
['git', 'checkout', '-B', branch, 'FETCH_HEAD'],
]
else:
commandSequences.append(['git', 'pull'])
commandSequences += [
['git', 'pull'],
[bundlePipPath, 'install', '--upgrade', 'pip', 'setuptools', 'wheel'],
[bundlePipPath, 'install', '--editable', '.'],
]
@@ -585,6 +589,7 @@ def unmux(ctx,
cpu):
from ffx.file_properties import FileProperties
from ffx.process import executeProcess
from ffx.shifted_season_controller import ShiftedSeasonController
from ffx.track_disposition import TrackDisposition
from ffx.track_type import TrackType
@@ -605,6 +610,8 @@ def unmux(ctx,
if create_output_directory and existingSourcePaths and not ctx.obj.get('dry_run', False):
os.makedirs(output_directory, exist_ok=True)
shiftedSeasonController = ShiftedSeasonController(ctx.obj)
for sourcePath in existingSourcePaths:
fp = FileProperties(ctx.obj, sourcePath)
@@ -621,8 +628,12 @@ def unmux(ctx,
currentShowDescriptor,
)
season = fp.getSeason()
episode = fp.getEpisode()
season, episode = shiftedSeasonController.shiftSeason(
fp.getShowId(),
season=fp.getSeason(),
episode=fp.getEpisode(),
patternId=currentPattern.getId() if currentPattern is not None else None,
)
#TODO: Recognition für alle Formate anpassen
targetLabel = label if label else fp.getFileBasename()
@@ -966,6 +977,7 @@ def convert(ctx,
from ffx.filter.quality_filter import QualityFilter
from ffx.helper import filterFilename, getEpisodeFileBasename, substituteTmdbFilename
from ffx.shifted_season_controller import ShiftedSeasonController
from ffx.show_controller import ShowController
from ffx.show_descriptor import ShowDescriptor
from ffx.tmdb_controller import TmdbController
from ffx.track_codec import TrackCodec
@@ -1149,6 +1161,7 @@ def convert(ctx,
ctx.obj['logger'].info(f"\nRunning {len(existingSourcePaths) * len(chainYield)} jobs")
jobIndex = 0
showController = ShowController(context)
for sourcePath in existingSourcePaths:
@@ -1181,7 +1194,7 @@ def convert(ctx,
ssc = ShiftedSeasonController(context)
showId = mediaFileProperties.getShowId()
matchedShowId = mediaFileProperties.getShowId()
#HINT: -1 if not set
if 'tmdb' in cliOverrides.keys() and 'season' in cliOverrides['tmdb']:
@@ -1263,7 +1276,8 @@ def convert(ctx,
targetMediaDescriptor.importSubtitles(context['subtitle_directory'],
context['subtitle_prefix'],
showSeason,
showEpisode)
showEpisode,
preserve_dispositions=True)
# ctx.obj['logger'].debug(f"tmd subindices: {[t.getIndex() for t in targetMediaDescriptor.getAllTrackDescriptors()]} {[t.getSubIndex() for t in targetMediaDescriptor.getAllTrackDescriptors()]} {[t.getDispositionFlag(TrackDisposition.DEFAULT) for t in targetMediaDescriptor.getAllTrackDescriptors()]}")
ctx.obj['logger'].debug(f"tmd subindices: {[t.getIndex() for t in targetMediaDescriptor.getTrackDescriptors()]} {[t.getSubIndex() for t in targetMediaDescriptor.getTrackDescriptors()]} {[t.getDispositionFlag(TrackDisposition.DEFAULT) for t in targetMediaDescriptor.getTrackDescriptors()]}")
@@ -1278,6 +1292,14 @@ def convert(ctx,
fc = FfxController(context, targetMediaDescriptor, sourceMediaDescriptor)
qualityShowId = (
cliOverrides['tmdb']['show']
if 'tmdb' in cliOverrides.keys() and 'show' in cliOverrides['tmdb']
else matchedShowId
)
if currentShowDescriptor is None and qualityShowId != -1:
currentShowDescriptor = showController.getShowDescriptor(qualityShowId)
defaultDigitLengths = ShowDescriptor.getDefaultDigitLengths(context)
indexSeasonDigits = currentShowDescriptor.getIndexSeasonDigits() if not currentPattern is None else defaultDigitLengths[ShowDescriptor.INDEX_SEASON_DIGITS_KEY]
@@ -1286,19 +1308,43 @@ def convert(ctx,
indicatorEpisodeDigits = currentShowDescriptor.getIndicatorEpisodeDigits() if not currentPattern is None else defaultDigitLengths[ShowDescriptor.INDICATOR_EPISODE_DIGITS_KEY]
# Shift season and episode if defined for this show
if ('tmdb' not in cliOverrides.keys() and showId != -1
and showSeason != -1 and showEpisode != -1):
shiftedShowSeason, shiftedShowEpisode = ssc.shiftSeason(showId,
showIdForShift = (
cliOverrides['tmdb']['show']
if 'tmdb' in cliOverrides.keys() and 'show' in cliOverrides['tmdb']
else matchedShowId
)
patternIdForShift = currentPattern.getId() if currentPattern is not None else None
hasExplicitTargetSeasonOrEpisode = (
'tmdb' in cliOverrides.keys()
and (
'season' in cliOverrides['tmdb']
or 'episode' in cliOverrides['tmdb']
)
)
# Shift season and episode if defined for the matched pattern or show
if (
not hasExplicitTargetSeasonOrEpisode
and showSeason != -1
and showEpisode != -1
):
shiftedShowSeason, shiftedShowEpisode = ssc.shiftSeason(
showIdForShift,
season=showSeason,
episode=showEpisode)
episode=showEpisode,
patternId=patternIdForShift,
)
else:
shiftedShowSeason = showSeason
shiftedShowEpisode = showEpisode
# Assemble target filename accordingly depending on TMDB lookup is enabled
#HINT: -1 if not set
showId = cliOverrides['tmdb']['show'] if 'tmdb' in cliOverrides.keys() and 'show' in cliOverrides['tmdb'] else (-1 if currentShowDescriptor is None else currentShowDescriptor.getId())
showId = (
cliOverrides['tmdb']['show']
if 'tmdb' in cliOverrides.keys() and 'show' in cliOverrides['tmdb']
else (-1 if currentShowDescriptor is None else currentShowDescriptor.getId())
)
if context['use_tmdb'] and showId != -1 and shiftedShowSeason != -1 and shiftedShowEpisode != -1:
@@ -1384,7 +1430,8 @@ def convert(ctx,
targetFormat,
chainIteration,
cropArguments,
currentPattern)
currentPattern,
currentShowDescriptor)

View File

@@ -1,5 +1,5 @@
VERSION='0.2.4'
DATABASE_VERSION = 2
VERSION='0.2.5'
DATABASE_VERSION = 3
DEFAULT_QUALITY = 32
DEFAULT_AV1_PRESET = 5

View File

@@ -1,6 +1,6 @@
import os, click
import os, shutil, click
from sqlalchemy import create_engine, inspect
from sqlalchemy import create_engine, inspect, text
from sqlalchemy.orm import sessionmaker
# Import the full model package so SQLAlchemy registers every mapped class
@@ -9,6 +9,11 @@ import ffx.model
from ffx.model.show import Base
from ffx.model.property import Property
from ffx.model.migration import (
DatabaseVersionException,
getMigrationPlan,
migrateDatabase,
)
from ffx.constants import DATABASE_VERSION
@@ -16,10 +21,6 @@ from ffx.constants import DATABASE_VERSION
DATABASE_VERSION_KEY = 'database_version'
EXPECTED_TABLE_NAMES = set(Base.metadata.tables.keys())
class DatabaseVersionException(Exception):
def __init__(self, errorMessage):
super().__init__(errorMessage)
def databaseContext(databasePath: str = ''):
databaseContext = {}
@@ -33,7 +34,13 @@ def databaseContext(databasePath: str = ''):
if not os.path.exists(ffxVarDir):
os.makedirs(ffxVarDir)
databasePath = os.path.join(ffxVarDir, 'ffx.db')
else:
databasePath = os.path.expanduser(databasePath)
if databasePath != ':memory:':
databasePath = os.path.abspath(databasePath)
databaseContext['path'] = databasePath
databaseContext['url'] = f"sqlite:///{databasePath}"
databaseContext['engine'] = create_engine(databaseContext['url'])
databaseContext['session'] = sessionmaker(bind=databaseContext['engine'])
@@ -68,14 +75,113 @@ def bootstrapDatabaseIfNeeded(databaseContext):
Base.metadata.create_all(databaseContext['engine'])
def ensureDatabaseVersion(databaseContext):
currentDatabaseVersion = getDatabaseVersion(databaseContext)
if currentDatabaseVersion:
if currentDatabaseVersion != DATABASE_VERSION:
raise DatabaseVersionException(f"Current database version ({currentDatabaseVersion}) does not match required ({DATABASE_VERSION})")
else:
if not currentDatabaseVersion:
setDatabaseVersion(databaseContext, DATABASE_VERSION)
return
if currentDatabaseVersion > DATABASE_VERSION:
raise DatabaseVersionException(
f"Current database version ({currentDatabaseVersion}) does not match required ({DATABASE_VERSION})"
)
if currentDatabaseVersion < DATABASE_VERSION:
promptForDatabaseMigration(databaseContext, currentDatabaseVersion, DATABASE_VERSION)
migrateDatabase(databaseContext, currentDatabaseVersion, DATABASE_VERSION, setDatabaseVersion)
currentDatabaseVersion = getDatabaseVersion(databaseContext)
if currentDatabaseVersion != DATABASE_VERSION:
raise DatabaseVersionException(
f"Current database version ({currentDatabaseVersion}) does not match required ({DATABASE_VERSION})"
)
ensureCurrentSchemaCompatibility(databaseContext)
def ensureCurrentSchemaCompatibility(databaseContext):
engine = databaseContext['engine']
inspector = inspect(engine)
showColumns = {
column['name']
for column in inspector.get_columns('shows')
}
alterStatements = []
if 'quality' not in showColumns:
alterStatements.append("ALTER TABLE shows ADD COLUMN quality INTEGER DEFAULT 0")
if 'notes' not in showColumns:
alterStatements.append("ALTER TABLE shows ADD COLUMN notes TEXT DEFAULT ''")
if not alterStatements:
return
with engine.begin() as connection:
for alterStatement in alterStatements:
connection.execute(text(alterStatement))
def promptForDatabaseMigration(databaseContext, currentDatabaseVersion: int, targetDatabaseVersion: int):
migrationPlan = getMigrationPlan(currentDatabaseVersion, targetDatabaseVersion)
click.echo("Database migration required.")
click.echo(f"Current version: {currentDatabaseVersion}")
click.echo(f"Target version: {targetDatabaseVersion}")
click.echo("Steps required:")
missingSteps = []
for migrationStep in migrationPlan:
moduleStatus = "present" if migrationStep.modulePresent else "missing"
click.echo(
f" {migrationStep.versionFrom} -> {migrationStep.versionTo}: "
+ f"{migrationStep.moduleName} [{moduleStatus}]"
)
if not migrationStep.modulePresent:
missingSteps.append(migrationStep)
if missingSteps:
firstMissingStep = missingSteps[0]
raise DatabaseVersionException(
f"No migration path from database version "
+ f"{firstMissingStep.versionFrom} to {firstMissingStep.versionTo}"
)
if not click.confirm(
"Create a backup and continue with database migration?",
default=True,
):
raise click.ClickException("Database migration aborted by user.")
backupPath = backupDatabaseBeforeMigration(
databaseContext,
currentDatabaseVersion,
targetDatabaseVersion,
)
click.echo(f"Database backup created: {backupPath}")
def backupDatabaseBeforeMigration(databaseContext, currentDatabaseVersion: int, targetDatabaseVersion: int) -> str:
databasePath = databaseContext.get('path', '')
if not databasePath or databasePath == ':memory:':
raise click.ClickException("Database migration backup requires a file-backed SQLite database.")
if not os.path.isfile(databasePath):
raise click.ClickException(f"Database file not found for backup: {databasePath}")
backupPath = f"{databasePath}.v{currentDatabaseVersion}-to-v{targetDatabaseVersion}.bak"
backupIndex = 1
while os.path.exists(backupPath):
backupPath = (
f"{databasePath}.v{currentDatabaseVersion}-to-v{targetDatabaseVersion}.{backupIndex}.bak"
)
backupIndex += 1
databaseContext['engine'].dispose()
shutil.copy2(databasePath, backupPath)
return backupPath
def getDatabaseVersion(databaseContext):

View File

@@ -245,7 +245,8 @@ class FfxController():
targetFormat: str = '',
chainIteration: list = [],
cropArguments: dict = {},
currentPattern: Pattern = None):
currentPattern: Pattern = None,
currentShowDescriptor = None):
# quality: int = DEFAULT_QUALITY,
# preset: int = DEFAULT_AV1_PRESET):
@@ -262,9 +263,11 @@ class FfxController():
if qualityFilters and (quality := qualityFilters[0]['parameters']['quality']):
self.__logger.info(f"Setting quality {quality} from command line parameter")
self.__logger.info(f"Setting quality {quality} from command line")
elif currentPattern is not None and (quality := currentPattern.quality):
self.__logger.info(f"Setting quality {quality} from pattern default")
self.__logger.info(f"Setting quality {quality} from pattern")
elif currentShowDescriptor is not None and (quality := currentShowDescriptor.getQuality()):
self.__logger.info(f"Setting quality {quality} from show")
else:
quality = (QualityFilter.DEFAULT_H264_QUALITY
if (videoEncoder == VideoEncoder.H264)

View File

@@ -500,7 +500,14 @@ class MediaDescriptor:
return subtitleFileDescriptors
def importSubtitles(self, searchDirectory, prefix, season: int = -1, episode: int = -1):
def importSubtitles(
self,
searchDirectory,
prefix,
season: int = -1,
episode: int = -1,
preserve_dispositions: bool = False,
):
# click.echo(f"Season: {season} Episode: {episode}")
self.__logger.debug(f"importSubtitles(): Season: {season} Episode: {episode}")
@@ -543,7 +550,7 @@ class MediaDescriptor:
# Prefer metadata coming from the external single-track source when
# it is provided explicitly by the filename contract.
matchingTrack.getTags()["language"] = msfd["language"]
if msfd["disposition_set"]:
if msfd["disposition_set"] and not preserve_dispositions:
matchingTrack.setDispositionSet(msfd["disposition_set"])

View File

@@ -1,47 +0,0 @@
import os, sys, importlib, inspect, glob, re
from ffx.configuration_controller import ConfigurationController
from ffx.database import databaseContext
from sqlalchemy import Engine
from sqlalchemy.orm import sessionmaker
class Conversion():
def __init__(self):
self._context = {}
self._context['config'] = ConfigurationController()
self._context['database'] = databaseContext(databasePath=self._context['config'].getDatabaseFilePath())
self.__databaseSession: sessionmaker = self._context['database']['session']
self.__databaseEngine: Engine = self._context['database']['engine']
@staticmethod
def list():
basePath = os.path.dirname(__file__)
filenamePattern = re.compile("conversion_([0-9]+)_([0-9]+)\\.py")
filenameList = [os.path.basename(fp) for fp in glob.glob(f"{ basePath }/*.py") if fp != __file__]
versionTupleList = [(fm.group(1), fm.group(2)) for fn in filenameList if (fm := filenamePattern.search(fn))]
return versionTupleList
@staticmethod
def getClassReference(versionFrom, versionTo):
importlib.import_module(f"ffx.model.conversions.conversion_{ versionFrom }_{ versionTo }")
for name, obj in inspect.getmembers(sys.modules[f"ffx.model.conversions.conversion_{ versionFrom }_{ versionTo }"]):
#HINT: Excluding DispositionCombination as it seems to be included by import (?)
if inspect.isclass(obj) and name != 'Conversion' and name.startswith('Conversion'):
return obj
@staticmethod
def getAllClassReferences():
return [Conversion.getClassReference(verFrom, verTo) for verFrom, verTo in Conversion.list()]

View File

@@ -1,17 +0,0 @@
import os, sys, importlib, inspect, glob, re
from .conversion import Conversion
class Conversion_2_3(Conversion):
def __init__(self):
super().__init__()
def applyConversion(self):
s = self.__databaseSession()
e = self.__databaseEngine
with e.connect() as c:
c.execute("ALTER TABLE user ADD COLUMN email VARCHAR(255)")

View File

@@ -1,7 +0,0 @@
import os, sys, importlib, inspect, glob, re
from .conversion import Conversion
class Conversion_3_4(Conversion):
pass

View File

@@ -0,0 +1,82 @@
from __future__ import annotations
from dataclasses import dataclass
import importlib
import importlib.util
class DatabaseVersionException(Exception):
def __init__(self, errorMessage):
super().__init__(errorMessage)
@dataclass(frozen=True)
class MigrationStep:
versionFrom: int
versionTo: int
moduleName: str
modulePresent: bool
def getMigrationStepModuleName(versionFrom: int, versionTo: int) -> str:
return f"ffx.model.migration.step_{int(versionFrom)}_{int(versionTo)}"
def migrationStepModuleExists(versionFrom: int, versionTo: int) -> bool:
moduleName = getMigrationStepModuleName(versionFrom, versionTo)
try:
return importlib.util.find_spec(moduleName) is not None
except ModuleNotFoundError:
return False
def getMigrationPlan(currentVersion: int, targetVersion: int) -> list[MigrationStep]:
version = int(currentVersion)
target = int(targetVersion)
migrationPlan = []
while version < target:
nextVersion = version + 1
migrationPlan.append(
MigrationStep(
versionFrom=version,
versionTo=nextVersion,
moduleName=getMigrationStepModuleName(version, nextVersion),
modulePresent=migrationStepModuleExists(version, nextVersion),
)
)
version = nextVersion
return migrationPlan
def loadMigrationStep(versionFrom: int, versionTo: int):
moduleName = getMigrationStepModuleName(versionFrom, versionTo)
try:
module = importlib.import_module(moduleName)
except ModuleNotFoundError as ex:
if ex.name == moduleName:
raise DatabaseVersionException(
f"No migration path from database version {versionFrom} to {versionTo}"
) from ex
raise
migrationStep = getattr(module, "applyMigration", None)
if migrationStep is None:
raise DatabaseVersionException(
f"Migration module {moduleName} does not define applyMigration()"
)
return migrationStep
def migrateDatabase(databaseContext, currentVersion: int, targetVersion: int, setDatabaseVersion):
for migrationStepInfo in getMigrationPlan(currentVersion, targetVersion):
migrationStep = loadMigrationStep(
migrationStepInfo.versionFrom,
migrationStepInfo.versionTo,
)
migrationStep(databaseContext)
setDatabaseVersion(databaseContext, migrationStepInfo.versionTo)

View File

@@ -0,0 +1,84 @@
from sqlalchemy import inspect, text
def applyMigration(databaseContext):
engine = databaseContext['engine']
inspector = inspect(engine)
shiftedSeasonColumns = {
column['name']
for column in inspector.get_columns('shifted_seasons')
}
showColumns = {
column['name']
for column in inspector.get_columns('shows')
}
with engine.begin() as connection:
if 'pattern_id' not in shiftedSeasonColumns:
connection.execute(text("PRAGMA foreign_keys=OFF"))
connection.execute(
text(
"""
CREATE TABLE shifted_seasons_v3 (
id INTEGER PRIMARY KEY,
show_id INTEGER,
pattern_id INTEGER,
original_season INTEGER,
first_episode INTEGER DEFAULT -1,
last_episode INTEGER DEFAULT -1,
season_offset INTEGER DEFAULT 0,
episode_offset INTEGER DEFAULT 0,
FOREIGN KEY(show_id) REFERENCES shows(id) ON DELETE CASCADE,
FOREIGN KEY(pattern_id) REFERENCES patterns(id) ON DELETE CASCADE,
CHECK (
(show_id IS NOT NULL AND pattern_id IS NULL)
OR (show_id IS NULL AND pattern_id IS NOT NULL)
)
)
"""
)
)
connection.execute(
text(
"""
INSERT INTO shifted_seasons_v3 (
id,
show_id,
pattern_id,
original_season,
first_episode,
last_episode,
season_offset,
episode_offset
)
SELECT
id,
show_id,
NULL,
original_season,
first_episode,
last_episode,
season_offset,
episode_offset
FROM shifted_seasons
"""
)
)
connection.execute(text("DROP TABLE shifted_seasons"))
connection.execute(text("ALTER TABLE shifted_seasons_v3 RENAME TO shifted_seasons"))
connection.execute(
text("CREATE INDEX ix_shifted_seasons_show_id ON shifted_seasons(show_id)")
)
connection.execute(
text("CREATE INDEX ix_shifted_seasons_pattern_id ON shifted_seasons(pattern_id)")
)
connection.execute(text("PRAGMA foreign_keys=ON"))
if 'quality' not in showColumns:
connection.execute(
text("ALTER TABLE shows ADD COLUMN quality INTEGER DEFAULT 0")
)
if 'notes' not in showColumns:
connection.execute(
text("ALTER TABLE shows ADD COLUMN notes TEXT DEFAULT ''")
)

View File

@@ -35,6 +35,7 @@ class Pattern(Base):
tracks = relationship('Track', back_populates='pattern', cascade="all, delete", lazy='joined')
media_tags = relationship('MediaTag', back_populates='pattern', cascade="all, delete", lazy='joined')
shifted_seasons = relationship('ShiftedSeason', back_populates='pattern', cascade="all, delete", lazy='joined')
quality = Column(Integer, default=0)

View File

@@ -1,6 +1,6 @@
import click
from sqlalchemy import Column, Integer, ForeignKey
from sqlalchemy import CheckConstraint, Column, ForeignKey, Index, Integer
from sqlalchemy.orm import relationship
from .show import Base, Show
@@ -9,6 +9,14 @@ from .show import Base, Show
class ShiftedSeason(Base):
__tablename__ = 'shifted_seasons'
__table_args__ = (
CheckConstraint(
"(show_id IS NOT NULL AND pattern_id IS NULL) OR (show_id IS NULL AND pattern_id IS NOT NULL)",
name="ck_shifted_seasons_single_owner",
),
Index("ix_shifted_seasons_show_id", "show_id"),
Index("ix_shifted_seasons_pattern_id", "pattern_id"),
)
# v1.x
id = Column(Integer, primary_key=True)
@@ -19,9 +27,12 @@ class ShiftedSeason(Base):
# pattern: Mapped[str] = mapped_column(String, nullable=False)
# v1.x
show_id = Column(Integer, ForeignKey('shows.id', ondelete="CASCADE"))
show_id = Column(Integer, ForeignKey('shows.id', ondelete="CASCADE"), nullable=True)
show = relationship(Show, back_populates='shifted_seasons', lazy='joined')
pattern_id = Column(Integer, ForeignKey('patterns.id', ondelete="CASCADE"), nullable=True)
pattern = relationship('Pattern', back_populates='shifted_seasons', lazy='joined')
# v2.0
# show_id: Mapped[int] = mapped_column(ForeignKey("shows.id", ondelete="CASCADE"))
# show: Mapped["Show"] = relationship(back_populates="patterns")
@@ -39,6 +50,12 @@ class ShiftedSeason(Base):
def getId(self):
return self.id
def getShowId(self):
return self.show_id
def getPatternId(self):
return self.pattern_id
def getOriginalSeason(self):
return self.original_season
@@ -61,6 +78,8 @@ class ShiftedSeason(Base):
shiftedSeasonObj = {}
shiftedSeasonObj['show_id'] = self.getShowId()
shiftedSeasonObj['pattern_id'] = self.getPatternId()
shiftedSeasonObj['original_season'] = self.getOriginalSeason()
shiftedSeasonObj['first_episode'] = self.getFirstEpisode()
shiftedSeasonObj['last_episode'] = self.getLastEpisode()
@@ -68,4 +87,3 @@ class ShiftedSeason(Base):
shiftedSeasonObj['episode_offset'] = self.getEpisodeOffset()
return shiftedSeasonObj

View File

@@ -1,5 +1,5 @@
# from typing import List
from sqlalchemy import create_engine, Column, Integer, String, ForeignKey
from sqlalchemy import create_engine, Column, Integer, String, Text, ForeignKey
from sqlalchemy.orm import relationship, declarative_base, sessionmaker
from ffx.show_descriptor import ShowDescriptor
@@ -45,6 +45,8 @@ class Show(Base):
index_episode_digits = Column(Integer, default=ShowDescriptor.DEFAULT_INDEX_EPISODE_DIGITS)
indicator_season_digits = Column(Integer, default=ShowDescriptor.DEFAULT_INDICATOR_SEASON_DIGITS)
indicator_episode_digits = Column(Integer, default=ShowDescriptor.DEFAULT_INDICATOR_EPISODE_DIGITS)
quality = Column(Integer, default=0)
notes = Column(Text, default='')
def getDescriptor(self, context):
@@ -58,5 +60,7 @@ class Show(Base):
kwargs[ShowDescriptor.INDEX_EPISODE_DIGITS_KEY] = int(self.index_episode_digits)
kwargs[ShowDescriptor.INDICATOR_SEASON_DIGITS_KEY] = int(self.indicator_season_digits)
kwargs[ShowDescriptor.INDICATOR_EPISODE_DIGITS_KEY] = int(self.indicator_episode_digits)
kwargs[ShowDescriptor.QUALITY_KEY] = int(self.quality or 0)
kwargs[ShowDescriptor.NOTES_KEY] = str(self.notes or '')
return ShowDescriptor(**kwargs)

View File

@@ -9,6 +9,8 @@ from ffx.model.pattern import Pattern
from .track_details_screen import TrackDetailsScreen
from .track_delete_screen import TrackDeleteScreen
from .shifted_season_delete_screen import ShiftedSeasonDeleteScreen
from .shifted_season_details_screen import ShiftedSeasonDetailsScreen
from .tag_details_screen import TagDetailsScreen
from .tag_delete_screen import TagDeleteScreen
@@ -24,6 +26,7 @@ from textual.widgets._data_table import CellDoesNotExist
from ffx.file_properties import FileProperties
from ffx.iso_language import IsoLanguage
from ffx.audio_layout import AudioLayout
from ffx.model.shifted_season import ShiftedSeason
from ffx.helper import formatRichColor, removeRichColor
@@ -34,8 +37,8 @@ class PatternDetailsScreen(Screen):
CSS = """
Grid {
grid-size: 7 17;
grid-rows: 2 2 2 2 2 2 6 2 2 8 2 2 8 2 2 2 2;
grid-size: 7 20;
grid-rows: 2 2 2 2 2 2 6 2 2 8 2 2 8 2 2 8 2 2 2 2;
grid-columns: 25 25 25 25 25 25 25;
height: 100%;
width: 100%;
@@ -115,11 +118,13 @@ class PatternDetailsScreen(Screen):
show=True,
track=True,
tag=True,
shifted_season=True,
)
self.__pc = controllers['pattern']
self.__sc = controllers['show']
self.__tc = controllers['track']
self.__tac = controllers['tag']
self.__ssc = controllers['shifted_season']
self.__pattern : Pattern = self.__pc.getPattern(patternId) if patternId is not None else None
self.__showDescriptor = self.__sc.getShowDescriptor(showId) if showId is not None else None
@@ -258,6 +263,72 @@ class PatternDetailsScreen(Screen):
row = (formatRichColor(tagKey, textColor), formatRichColor(tagValue, textColor))
self.tagsTable.add_row(*map(str, row))
def updateShiftedSeasons(self):
self.shiftedSeasonsTable.clear()
if self.__pattern is None:
return
shiftedSeason: ShiftedSeason
for shiftedSeason in self.__ssc.getShiftedSeasonSiblings(patternId=self.__pattern.getId()):
shiftedSeasonObj = shiftedSeason.getObj()
firstEpisode = shiftedSeasonObj['first_episode']
firstEpisodeStr = str(firstEpisode) if firstEpisode != -1 else ''
lastEpisode = shiftedSeasonObj['last_episode']
lastEpisodeStr = str(lastEpisode) if lastEpisode != -1 else ''
row = (
shiftedSeasonObj['original_season'],
firstEpisodeStr,
lastEpisodeStr,
shiftedSeasonObj['season_offset'],
shiftedSeasonObj['episode_offset'],
)
self.shiftedSeasonsTable.add_row(*map(str, row))
def getSelectedShiftedSeasonObjFromInput(self):
shiftedSeasonObj = {}
try:
row_key, col_key = self.shiftedSeasonsTable.coordinate_to_cell_key(
self.shiftedSeasonsTable.cursor_coordinate
)
if row_key is not None:
selected_row_data = self.shiftedSeasonsTable.get_row(row_key)
def parse_int_or_default(value: str, default: int) -> int:
try:
return int(value)
except (TypeError, ValueError):
return default
shiftedSeasonObj['original_season'] = int(selected_row_data[0])
shiftedSeasonObj['first_episode'] = parse_int_or_default(selected_row_data[1], -1)
shiftedSeasonObj['last_episode'] = parse_int_or_default(selected_row_data[2], -1)
shiftedSeasonObj['season_offset'] = parse_int_or_default(selected_row_data[3], 0)
shiftedSeasonObj['episode_offset'] = parse_int_or_default(selected_row_data[4], 0)
if self.__pattern is not None:
shiftedSeasonId = self.__ssc.findShiftedSeason(
patternId=self.__pattern.getId(),
originalSeason=shiftedSeasonObj['original_season'],
firstEpisode=shiftedSeasonObj['first_episode'],
lastEpisode=shiftedSeasonObj['last_episode'],
)
if shiftedSeasonId is not None:
shiftedSeasonObj['id'] = shiftedSeasonId
except CellDoesNotExist:
pass
return shiftedSeasonObj
def on_mount(self):
@@ -276,6 +347,7 @@ class PatternDetailsScreen(Screen):
self.updateTags()
self.updateTracks()
self.updateShiftedSeasons()
def compose(self):
@@ -304,6 +376,16 @@ class PatternDetailsScreen(Screen):
self.tracksTable.cursor_type = 'row'
self.shiftedSeasonsTable = DataTable(classes="seven")
self.column_key_original_season = self.shiftedSeasonsTable.add_column("Source Season", width=18)
self.column_key_first_episode = self.shiftedSeasonsTable.add_column("First Episode", width=18)
self.column_key_last_episode = self.shiftedSeasonsTable.add_column("Last Episode", width=18)
self.column_key_season_offset = self.shiftedSeasonsTable.add_column("Season Offset", width=18)
self.column_key_episode_offset = self.shiftedSeasonsTable.add_column("Episode Offset", width=18)
self.shiftedSeasonsTable.cursor_type = 'row'
yield Header()
@@ -345,6 +427,27 @@ class PatternDetailsScreen(Screen):
yield Static(" ", classes="seven")
# 9
yield Static("Shifted Seasons")
if self.__pattern is not None:
yield Button("Add", id="button_add_shifted_season")
yield Button("Edit", id="button_edit_shifted_season")
yield Button("Delete", id="button_delete_shifted_season")
else:
yield Static(" ")
yield Static(" ")
yield Static(" ")
yield Static(" ")
yield Static(" ")
yield Static(" ")
# 10
yield self.shiftedSeasonsTable
# 11
yield Static(" ", classes="seven")
# 12
yield Static("Media Tags")
yield Button("Add", id="button_add_tag")
yield Button("Edit", id="button_edit_tag")
@@ -354,13 +457,13 @@ class PatternDetailsScreen(Screen):
yield Static(" ")
yield Static(" ")
# 10
# 13
yield self.tagsTable
# 11
# 14
yield Static(" ", classes="seven")
# 12
# 15
yield Static("Streams")
yield Button("Add", id="button_add_track")
yield Button("Edit", id="button_edit_track")
@@ -370,21 +473,21 @@ class PatternDetailsScreen(Screen):
yield Button("Up", id="button_track_up")
yield Button("Down", id="button_track_down")
# 13
# 16
yield self.tracksTable
# 14
# 17
yield Static(" ", classes="seven")
# 15
# 18
yield Static(" ", classes="seven")
# 16
# 19
yield Button("Save", id="save_button")
yield Button("Cancel", id="cancel_button")
yield Static(" ", classes="five")
# 17
# 20
yield Static(" ", classes="seven")
yield Footer()
@@ -486,6 +589,35 @@ class PatternDetailsScreen(Screen):
if event.button.id == "cancel_button":
self.app.pop_screen()
if event.button.id == "button_add_shifted_season":
if self.__pattern is not None:
self.app.push_screen(
ShiftedSeasonDetailsScreen(patternId=self.__pattern.getId()),
self.handle_update_shifted_season,
)
if event.button.id == "button_edit_shifted_season":
selectedShiftedSeasonObj = self.getSelectedShiftedSeasonObjFromInput()
if 'id' in selectedShiftedSeasonObj.keys():
self.app.push_screen(
ShiftedSeasonDetailsScreen(
patternId=self.__pattern.getId(),
shiftedSeasonId=selectedShiftedSeasonObj['id'],
),
self.handle_update_shifted_season,
)
if event.button.id == "button_delete_shifted_season":
selectedShiftedSeasonObj = self.getSelectedShiftedSeasonObjFromInput()
if 'id' in selectedShiftedSeasonObj.keys():
self.app.push_screen(
ShiftedSeasonDeleteScreen(
patternId=self.__pattern.getId(),
shiftedSeasonId=selectedShiftedSeasonObj['id'],
),
self.handle_delete_shifted_season,
)
numTracks = len(self.getCurrentTrackDescriptors())
@@ -654,3 +786,9 @@ class PatternDetailsScreen(Screen):
self.updateTags()
else:
raise click.ClickException('tag delete failed')
def handle_update_shifted_season(self, screenResult):
self.updateShiftedSeasons()
def handle_delete_shifted_season(self, screenResult):
self.updateShiftedSeasons()

View File

@@ -6,225 +6,433 @@ from ffx.model.shifted_season import ShiftedSeason
class EpisodeOrderException(Exception):
pass
class RangeOverlapException(Exception):
pass
class ShiftedSeasonController():
class ShiftedSeasonOwnerException(Exception):
pass
class ShiftedSeasonController:
def __init__(self, context):
self.context = context
self.Session = self.context['database']['session'] # convenience
def checkShiftedSeason(self, showId: int, shiftedSeasonObj: dict, shiftedSeasonId: int = 0):
"""
Check if for a particula season
def _resolve_owner(self, showId=None, patternId=None):
hasShow = showId is not None
hasPattern = patternId is not None
shiftedSeasonId
if hasShow == hasPattern:
raise ShiftedSeasonOwnerException(
"ShiftedSeason rules require exactly one owner: either showId or patternId."
)
if hasShow:
if type(showId) is not int:
raise ValueError(
"ShiftedSeasonController: Argument showId is required to be of type int"
)
return {
'show_id': int(showId),
'pattern_id': None,
'label': f"show #{int(showId)}",
}
if type(patternId) is not int:
raise ValueError(
"ShiftedSeasonController: Argument patternId is required to be of type int"
)
return {
'show_id': None,
'pattern_id': int(patternId),
'label': f"pattern #{int(patternId)}",
}
def _apply_owner_filter(self, query, owner):
if owner['pattern_id'] is not None:
return query.filter(ShiftedSeason.pattern_id == owner['pattern_id'])
return query.filter(ShiftedSeason.show_id == owner['show_id'])
def _normalize_shifted_season_fields(self, shiftedSeasonObj: dict):
if type(shiftedSeasonObj) is not dict:
raise ValueError(
"ShiftedSeasonController: Argument shiftedSeasonObj is required to be of type dict"
)
fields = {
'original_season': int(shiftedSeasonObj['original_season']),
'first_episode': int(shiftedSeasonObj['first_episode']),
'last_episode': int(shiftedSeasonObj['last_episode']),
'season_offset': int(shiftedSeasonObj['season_offset']),
'episode_offset': int(shiftedSeasonObj['episode_offset']),
}
firstEpisode = fields['first_episode']
lastEpisode = fields['last_episode']
if firstEpisode != -1 and lastEpisode != -1 and lastEpisode < firstEpisode:
raise EpisodeOrderException(
"ShiftedSeason last_episode must be greater than or equal to first_episode."
)
return fields
def _ranges_overlap(self, firstEpisodeA, lastEpisodeA, firstEpisodeB, lastEpisodeB):
startA = float('-inf') if int(firstEpisodeA) == -1 else int(firstEpisodeA)
endA = float('inf') if int(lastEpisodeA) == -1 else int(lastEpisodeA)
startB = float('-inf') if int(firstEpisodeB) == -1 else int(firstEpisodeB)
endB = float('inf') if int(lastEpisodeB) == -1 else int(lastEpisodeB)
return startA <= endB and startB <= endA
def _ordered_query(self, session, owner):
q = self._apply_owner_filter(session.query(ShiftedSeason), owner)
return q.order_by(
ShiftedSeason.original_season.asc(),
ShiftedSeason.first_episode.asc(),
ShiftedSeason.last_episode.asc(),
ShiftedSeason.id.asc(),
)
def _find_matching_rule(self, session, owner, season: int, episode: int):
for shiftedSeasonEntry in self._ordered_query(session, owner).all():
if (
season == shiftedSeasonEntry.getOriginalSeason()
and (
shiftedSeasonEntry.getFirstEpisode() == -1
or episode >= shiftedSeasonEntry.getFirstEpisode()
)
and (
shiftedSeasonEntry.getLastEpisode() == -1
or episode <= shiftedSeasonEntry.getLastEpisode()
)
):
return shiftedSeasonEntry
return None
def checkShiftedSeason(
self,
showId: int | None = None,
shiftedSeasonObj: dict | None = None,
shiftedSeasonId: int = 0,
patternId: int | None = None,
):
"""
Check whether a shifted-season rule is valid within one owner scope.
"""
session = None
try:
s = self.Session()
owner = self._resolve_owner(showId=showId, patternId=patternId)
fields = self._normalize_shifted_season_fields(shiftedSeasonObj)
session = self.Session()
originalSeason = shiftedSeasonObj['original_season']
firstEpisode = int(shiftedSeasonObj['first_episode'])
lastEpisode = int(shiftedSeasonObj['last_episode'])
q = s.query(ShiftedSeason).filter(ShiftedSeason.show_id == int(showId))
q = self._ordered_query(session, owner)
if shiftedSeasonId:
q = q.filter(ShiftedSeason.id != int(shiftedSeasonId))
siblingShiftedSeason: ShiftedSeason
for siblingShiftedSeason in q.all():
if fields['original_season'] != siblingShiftedSeason.getOriginalSeason():
continue
siblingOriginalSeason = siblingShiftedSeason.getOriginalSeason
siblingFirstEpisode = siblingShiftedSeason.getFirstEpisode()
siblingLastEpisode = siblingShiftedSeason.getLastEpisode()
if (originalSeason == siblingOriginalSeason
and lastEpisode >= siblingFirstEpisode
and siblingLastEpisode >= firstEpisode):
if self._ranges_overlap(
fields['first_episode'],
fields['last_episode'],
siblingShiftedSeason.getFirstEpisode(),
siblingShiftedSeason.getLastEpisode(),
):
return False
return True
except (EpisodeOrderException, ShiftedSeasonOwnerException) as ex:
raise click.ClickException(str(ex))
except Exception as ex:
raise click.ClickException(f"ShiftedSeasonController.addShiftedSeason(): {repr(ex)}")
raise click.ClickException(
f"ShiftedSeasonController.checkShiftedSeason(): {repr(ex)}"
)
finally:
s.close()
if session is not None:
session.close()
def addShiftedSeason(
self,
showId: int | None = None,
shiftedSeasonObj: dict | None = None,
patternId: int | None = None,
):
def addShiftedSeason(self, showId: int, shiftedSeasonObj: dict):
if type(showId) is not int:
raise ValueError(f"ShiftedSeasonController.addShiftedSeason(): Argument showId is required to be of type int")
if type(shiftedSeasonObj) is not dict:
raise ValueError(f"ShiftedSeasonController.addShiftedSeason(): Argument shiftedSeasonObj is required to be of type dict")
session = None
try:
s = self.Session()
owner = self._resolve_owner(showId=showId, patternId=patternId)
fields = self._normalize_shifted_season_fields(shiftedSeasonObj)
firstEpisode = int(shiftedSeasonObj['first_episode'])
lastEpisode = int(shiftedSeasonObj['last_episode'])
if not self.checkShiftedSeason(
showId=owner['show_id'],
patternId=owner['pattern_id'],
shiftedSeasonObj=fields,
):
raise RangeOverlapException(
f"ShiftedSeason rule overlaps with an existing rule for {owner['label']}."
)
if lastEpisode < firstEpisode:
raise EpisodeOrderException()
q = s.query(ShiftedSeason).filter(ShiftedSeason.show_id == int(showId))
shiftedSeason = ShiftedSeason(show_id = int(showId),
original_season = int(shiftedSeasonObj['original_season']),
first_episode = firstEpisode,
last_episode = lastEpisode,
season_offset = int(shiftedSeasonObj['season_offset']),
episode_offset = int(shiftedSeasonObj['episode_offset']))
s.add(shiftedSeason)
s.commit()
session = self.Session()
shiftedSeason = ShiftedSeason(
show_id=owner['show_id'],
pattern_id=owner['pattern_id'],
original_season=fields['original_season'],
first_episode=fields['first_episode'],
last_episode=fields['last_episode'],
season_offset=fields['season_offset'],
episode_offset=fields['episode_offset'],
)
session.add(shiftedSeason)
session.commit()
return shiftedSeason.getId()
except (EpisodeOrderException, RangeOverlapException, ShiftedSeasonOwnerException) as ex:
raise click.ClickException(str(ex))
except Exception as ex:
raise click.ClickException(f"ShiftedSeasonController.addShiftedSeason(): {repr(ex)}")
raise click.ClickException(
f"ShiftedSeasonController.addShiftedSeason(): {repr(ex)}"
)
finally:
s.close()
if session is not None:
session.close()
def updateShiftedSeason(self, shiftedSeasonId: int, shiftedSeasonObj: dict):
if type(shiftedSeasonId) is not int:
raise ValueError(f"ShiftedSeasonController.updateShiftedSeason(): Argument shiftedSeasonId is required to be of type int")
if type(shiftedSeasonObj) is not dict:
raise ValueError(f"ShiftedSeasonController.updateShiftedSeason(): Argument shiftedSeasonObj is required to be of type dict")
raise ValueError(
"ShiftedSeasonController.updateShiftedSeason(): Argument shiftedSeasonId is required to be of type int"
)
session = None
try:
s = self.Session()
fields = self._normalize_shifted_season_fields(shiftedSeasonObj)
session = self.Session()
shiftedSeason = s.query(ShiftedSeason).filter(ShiftedSeason.id == int(shiftedSeasonId)).first()
shiftedSeason = (
session.query(ShiftedSeason)
.filter(ShiftedSeason.id == int(shiftedSeasonId))
.first()
)
if shiftedSeason is not None:
shiftedSeason.original_season = int(shiftedSeasonObj['original_season'])
shiftedSeason.first_episode = int(shiftedSeasonObj['first_episode'])
shiftedSeason.last_episode = int(shiftedSeasonObj['last_episode'])
shiftedSeason.season_offset = int(shiftedSeasonObj['season_offset'])
shiftedSeason.episode_offset = int(shiftedSeasonObj['episode_offset'])
s.commit()
return True
else:
if shiftedSeason is None:
return False
owner = self._resolve_owner(
showId=shiftedSeason.getShowId(),
patternId=shiftedSeason.getPatternId(),
)
if not self.checkShiftedSeason(
showId=owner['show_id'],
patternId=owner['pattern_id'],
shiftedSeasonObj=fields,
shiftedSeasonId=shiftedSeasonId,
):
raise RangeOverlapException(
f"ShiftedSeason rule overlaps with an existing rule for {owner['label']}."
)
shiftedSeason.original_season = fields['original_season']
shiftedSeason.first_episode = fields['first_episode']
shiftedSeason.last_episode = fields['last_episode']
shiftedSeason.season_offset = fields['season_offset']
shiftedSeason.episode_offset = fields['episode_offset']
session.commit()
return True
except (EpisodeOrderException, RangeOverlapException, ShiftedSeasonOwnerException) as ex:
raise click.ClickException(str(ex))
except Exception as ex:
raise click.ClickException(f"ShiftedSeasonController.updateShiftedSeason(): {repr(ex)}")
raise click.ClickException(
f"ShiftedSeasonController.updateShiftedSeason(): {repr(ex)}"
)
finally:
s.close()
if session is not None:
session.close()
def findShiftedSeason(self, showId: int, originalSeason: int, firstEpisode: int, lastEpisode: int):
if type(showId) is not int:
raise ValueError(f"ShiftedSeasonController.findShiftedSeason(): Argument shiftedSeasonId is required to be of type int")
def findShiftedSeason(
self,
showId: int | None = None,
originalSeason: int | None = None,
firstEpisode: int | None = None,
lastEpisode: int | None = None,
patternId: int | None = None,
):
if type(originalSeason) is not int:
raise ValueError(f"ShiftedSeasonController.findShiftedSeason(): Argument originalSeason is required to be of type int")
raise ValueError(
"ShiftedSeasonController.findShiftedSeason(): Argument originalSeason is required to be of type int"
)
if type(firstEpisode) is not int:
raise ValueError(f"ShiftedSeasonController.findShiftedSeason(): Argument firstEpisode is required to be of type int")
raise ValueError(
"ShiftedSeasonController.findShiftedSeason(): Argument firstEpisode is required to be of type int"
)
if type(lastEpisode) is not int:
raise ValueError(f"ShiftedSeasonController.findShiftedSeason(): Argument lastEpisode is required to be of type int")
raise ValueError(
"ShiftedSeasonController.findShiftedSeason(): Argument lastEpisode is required to be of type int"
)
session = None
try:
s = self.Session()
shiftedSeason = s.query(ShiftedSeason).filter(
ShiftedSeason.show_id == int(showId),
owner = self._resolve_owner(showId=showId, patternId=patternId)
session = self.Session()
shiftedSeason = (
self._apply_owner_filter(session.query(ShiftedSeason), owner)
.filter(
ShiftedSeason.original_season == int(originalSeason),
ShiftedSeason.first_episode == int(firstEpisode),
ShiftedSeason.last_episode == int(lastEpisode),
).first()
)
.first()
)
return shiftedSeason.getId() if shiftedSeason is not None else None
except ShiftedSeasonOwnerException as ex:
raise click.ClickException(str(ex))
except Exception as ex:
raise click.ClickException(f"PatternController.findShiftedSeason(): {repr(ex)}")
raise click.ClickException(
f"ShiftedSeasonController.findShiftedSeason(): {repr(ex)}"
)
finally:
s.close()
if session is not None:
session.close()
def getShiftedSeasonSiblings(self, showId: int):
if type(showId) is not int:
raise ValueError(f"ShiftedSeasonController.getShiftedSeasonSiblings(): Argument shiftedSeasonId is required to be of type int")
def getShiftedSeasonSiblings(
self,
showId: int | None = None,
patternId: int | None = None,
):
session = None
try:
s = self.Session()
q = s.query(ShiftedSeason).filter(ShiftedSeason.show_id == int(showId))
return q.all()
owner = self._resolve_owner(showId=showId, patternId=patternId)
session = self.Session()
return self._ordered_query(session, owner).all()
except ShiftedSeasonOwnerException as ex:
raise click.ClickException(str(ex))
except Exception as ex:
raise click.ClickException(f"PatternController.getShiftedSeasonSiblings(): {repr(ex)}")
raise click.ClickException(
f"ShiftedSeasonController.getShiftedSeasonSiblings(): {repr(ex)}"
)
finally:
s.close()
if session is not None:
session.close()
def getShiftedSeason(self, shiftedSeasonId: int):
if type(shiftedSeasonId) is not int:
raise ValueError(f"ShiftedSeasonController.getShiftedSeason(): Argument shiftedSeasonId is required to be of type int")
raise ValueError(
"ShiftedSeasonController.getShiftedSeason(): Argument shiftedSeasonId is required to be of type int"
)
session = None
try:
s = self.Session()
return s.query(ShiftedSeason).filter(ShiftedSeason.id == int(shiftedSeasonId)).first()
session = self.Session()
return (
session.query(ShiftedSeason)
.filter(ShiftedSeason.id == int(shiftedSeasonId))
.first()
)
except Exception as ex:
raise click.ClickException(f"ShiftedSeasonController.getShiftedSeason(): {repr(ex)}")
raise click.ClickException(
f"ShiftedSeasonController.getShiftedSeason(): {repr(ex)}"
)
finally:
s.close()
if session is not None:
session.close()
def deleteShiftedSeason(self, shiftedSeasonId):
if type(shiftedSeasonId) is not int:
raise ValueError(f"ShiftedSeasonController.deleteShiftedSeason(): Argument shiftedSeasonId is required to be of type int")
raise ValueError(
"ShiftedSeasonController.deleteShiftedSeason(): Argument shiftedSeasonId is required to be of type int"
)
session = None
try:
s = self.Session()
shiftedSeason = s.query(ShiftedSeason).filter(ShiftedSeason.id == int(shiftedSeasonId)).first()
session = self.Session()
shiftedSeason = (
session.query(ShiftedSeason)
.filter(ShiftedSeason.id == int(shiftedSeasonId))
.first()
)
if shiftedSeason is not None:
#DAFUQ: https://stackoverflow.com/a/19245058
# q.delete()
s.delete(shiftedSeason)
s.commit()
session.delete(shiftedSeason)
session.commit()
return True
return False
except Exception as ex:
raise click.ClickException(f"ShiftedSeasonController.deleteShiftedSeason(): {repr(ex)}")
raise click.ClickException(
f"ShiftedSeasonController.deleteShiftedSeason(): {repr(ex)}"
)
finally:
s.close()
if session is not None:
session.close()
def shiftSeason(self, showId, season, episode, patternId=None):
def shiftSeason(self, showId, season, episode):
if season == -1 or episode == -1:
return season, episode
shiftedSeasonEntry: ShiftedSeason
for shiftedSeasonEntry in self.getShiftedSeasonSiblings(showId):
session = None
try:
session = self.Session()
activeShift = None
if (season == shiftedSeasonEntry.getOriginalSeason()
and (shiftedSeasonEntry.getFirstEpisode() == -1 or episode >= shiftedSeasonEntry.getFirstEpisode())
and (shiftedSeasonEntry.getLastEpisode() == -1 or episode <= shiftedSeasonEntry.getLastEpisode())):
if patternId is not None:
activeShift = self._find_matching_rule(
session,
self._resolve_owner(patternId=patternId),
season=int(season),
episode=int(episode),
)
shiftedSeason = season + shiftedSeasonEntry.getSeasonOffset()
shiftedEpisode = episode + shiftedSeasonEntry.getEpisodeOffset()
if activeShift is None and showId is not None and showId != -1:
activeShift = self._find_matching_rule(
session,
self._resolve_owner(showId=showId),
season=int(season),
episode=int(episode),
)
self.context['logger'].info(f"Shifting season: {season} episode: {episode} "
+f"-> season: {shiftedSeason} episode: {shiftedEpisode}")
if activeShift is None:
shiftedSeason = season
shiftedEpisode = episode
sourceLabel = "default"
else:
shiftedSeason = season + activeShift.getSeasonOffset()
shiftedEpisode = episode + activeShift.getEpisodeOffset()
sourceLabel = (
"pattern"
if activeShift.getPatternId() is not None
else "show"
)
self.context['logger'].info(
f"Setting season shift {season}/{episode} -> {shiftedSeason}/{shiftedEpisode} from {sourceLabel}"
)
return shiftedSeason, shiftedEpisode
return season, episode
except ShiftedSeasonOwnerException as ex:
raise click.ClickException(str(ex))
except Exception as ex:
raise click.ClickException(
f"ShiftedSeasonController.shiftSeason(): {repr(ex)}"
)
finally:
if session is not None:
session.close()

View File

@@ -43,7 +43,7 @@ class ShiftedSeasonDeleteScreen(Screen):
}
"""
def __init__(self, showId = None, shiftedSeasonId = None):
def __init__(self, showId = None, patternId = None, shiftedSeasonId = None):
super().__init__()
self.context = self.app.getContext()
@@ -52,6 +52,7 @@ class ShiftedSeasonDeleteScreen(Screen):
self.__ssc = ShiftedSeasonController(context = self.context)
self._showId = showId
self._patternId = patternId
self.__shiftedSeasonId = shiftedSeasonId
@@ -59,7 +60,12 @@ class ShiftedSeasonDeleteScreen(Screen):
shiftedSeason: ShiftedSeason = self.__ssc.getShiftedSeason(self.__shiftedSeasonId)
self.query_one("#static_show_id", Static).update(str(self._showId))
ownerLabel = (
f"pattern #{self._patternId}"
if self._patternId is not None
else f"show #{self._showId}"
)
self.query_one("#static_owner", Static).update(ownerLabel)
self.query_one("#static_original_season", Static).update(str(shiftedSeason.getOriginalSeason()))
self.query_one("#static_first_episode", Static).update(str(shiftedSeason.getFirstEpisode()))
self.query_one("#static_last_episode", Static).update(str(shiftedSeason.getLastEpisode()))
@@ -77,12 +83,12 @@ class ShiftedSeasonDeleteScreen(Screen):
yield Static(" ", classes="two")
yield Static("from show")
yield Static(" ", id="static_show_id")
yield Static("from")
yield Static(" ", id="static_owner")
yield Static(" ", classes="two")
yield Static("Original season")
yield Static("Source season")
yield Static(" ", id="static_original_season")
yield Static("First episode")
@@ -122,4 +128,3 @@ class ShiftedSeasonDeleteScreen(Screen):
if event.button.id == "cancel_button":
self.app.pop_screen()

View File

@@ -81,7 +81,7 @@ class ShiftedSeasonDetailsScreen(Screen):
}
"""
def __init__(self, showId = None, shiftedSeasonId = None):
def __init__(self, showId = None, patternId = None, shiftedSeasonId = None):
super().__init__()
self.context = self.app.getContext()
@@ -90,8 +90,14 @@ class ShiftedSeasonDetailsScreen(Screen):
self.__ssc = ShiftedSeasonController(context = self.context)
self.__showId = showId
self.__patternId = patternId
self.__shiftedSeasonId = shiftedSeasonId
def _owner_kwargs(self):
if self.__patternId is not None:
return {'patternId': self.__patternId}
return {'showId': self.__showId}
def on_mount(self):
if self.__shiftedSeasonId is not None:
@@ -126,7 +132,7 @@ class ShiftedSeasonDetailsScreen(Screen):
yield Static(" ", classes="three")
# 3
yield Static("Original season")
yield Static("Source season")
yield Input(id="input_original_season", classes="two")
# 4
@@ -203,8 +209,11 @@ class ShiftedSeasonDetailsScreen(Screen):
if self.__shiftedSeasonId is not None:
if self.__ssc.checkShiftedSeason(self.__showId, shiftedSeasonObj,
shiftedSeasonId = self.__shiftedSeasonId):
if self.__ssc.checkShiftedSeason(
shiftedSeasonObj=shiftedSeasonObj,
shiftedSeasonId=self.__shiftedSeasonId,
**self._owner_kwargs(),
):
if self.__ssc.updateShiftedSeason(self.__shiftedSeasonId, shiftedSeasonObj):
self.dismiss((self.__shiftedSeasonId, shiftedSeasonObj))
else:
@@ -212,8 +221,14 @@ class ShiftedSeasonDetailsScreen(Screen):
self.app.pop_screen()
else:
if self.__ssc.checkShiftedSeason(self.__showId, shiftedSeasonObj):
self.__shiftedSeasonId = self.__ssc.addShiftedSeason(self.__showId, shiftedSeasonObj)
if self.__ssc.checkShiftedSeason(
shiftedSeasonObj=shiftedSeasonObj,
**self._owner_kwargs(),
):
self.__shiftedSeasonId = self.__ssc.addShiftedSeason(
shiftedSeasonObj=shiftedSeasonObj,
**self._owner_kwargs(),
)
self.dismiss((self.__shiftedSeasonId, shiftedSeasonObj))

View File

@@ -62,7 +62,9 @@ class ShowController():
index_season_digits = showDescriptor.getIndexSeasonDigits(),
index_episode_digits = showDescriptor.getIndexEpisodeDigits(),
indicator_season_digits = showDescriptor.getIndicatorSeasonDigits(),
indicator_episode_digits = showDescriptor.getIndicatorEpisodeDigits())
indicator_episode_digits = showDescriptor.getIndicatorEpisodeDigits(),
quality = showDescriptor.getQuality(),
notes = showDescriptor.getNotes())
s.add(show)
s.commit()
@@ -88,6 +90,12 @@ class ShowController():
if currentShow.indicator_episode_digits != int(showDescriptor.getIndicatorEpisodeDigits()):
currentShow.indicator_episode_digits = int(showDescriptor.getIndicatorEpisodeDigits())
changed = True
if int(currentShow.quality or 0) != int(showDescriptor.getQuality()):
currentShow.quality = int(showDescriptor.getQuality())
changed = True
if str(currentShow.notes or '') != str(showDescriptor.getNotes()):
currentShow.notes = str(showDescriptor.getNotes())
changed = True
if changed:
s.commit()

View File

@@ -21,6 +21,8 @@ class ShowDescriptor():
INDEX_EPISODE_DIGITS_KEY = 'index_episode_digits'
INDICATOR_SEASON_DIGITS_KEY = 'indicator_season_digits'
INDICATOR_EPISODE_DIGITS_KEY = 'indicator_episode_digits'
QUALITY_KEY = 'quality'
NOTES_KEY = 'notes'
DEFAULT_INDEX_SEASON_DIGITS = DEFAULT_SHOW_INDEX_SEASON_DIGITS
DEFAULT_INDEX_EPISODE_DIGITS = DEFAULT_SHOW_INDEX_EPISODE_DIGITS
@@ -124,6 +126,20 @@ class ShowDescriptor():
else:
self.__indicatorEpisodeDigits = defaultDigitLengths[ShowDescriptor.INDICATOR_EPISODE_DIGITS_KEY]
if ShowDescriptor.QUALITY_KEY in kwargs.keys():
if type(kwargs[ShowDescriptor.QUALITY_KEY]) is not int:
raise TypeError(f"ShowDescriptor.__init__(): Argument {ShowDescriptor.QUALITY_KEY} is required to be of type int")
self.__quality = kwargs[ShowDescriptor.QUALITY_KEY]
else:
self.__quality = 0
if ShowDescriptor.NOTES_KEY in kwargs.keys():
if type(kwargs[ShowDescriptor.NOTES_KEY]) is not str:
raise TypeError(f"ShowDescriptor.__init__(): Argument {ShowDescriptor.NOTES_KEY} is required to be of type str")
self.__notes = kwargs[ShowDescriptor.NOTES_KEY]
else:
self.__notes = ''
def getId(self):
return self.__showId
@@ -140,6 +156,10 @@ class ShowDescriptor():
return self.__indicatorSeasonDigits
def getIndicatorEpisodeDigits(self):
return self.__indicatorEpisodeDigits
def getQuality(self):
return self.__quality
def getNotes(self):
return self.__notes
def getFilenamePrefix(self):
return f"{self.__showName} ({str(self.__showYear)})"

View File

@@ -1,7 +1,7 @@
import click
from textual.screen import Screen
from textual.widgets import Header, Footer, Static, Button, DataTable, Input
from textual.widgets import Header, Footer, Static, Button, DataTable, Input, TextArea
from textual.containers import Grid
from textual.widgets._data_table import CellDoesNotExist
@@ -25,8 +25,8 @@ class ShowDetailsScreen(Screen):
CSS = """
Grid {
grid-size: 5 16;
grid-rows: 2 2 2 2 2 2 2 2 2 2 2 9 2 9 2 2;
grid-size: 5 18;
grid-rows: 2 2 2 2 2 2 6 2 2 2 2 2 2 9 2 9 2 2;
grid-columns: 30 30 30 30 30;
height: 100%;
width: 100%;
@@ -77,6 +77,10 @@ class ShowDetailsScreen(Screen):
height: 100%;
border: solid green;
}
.note_box {
min-height: 6;
}
"""
BINDINGS = [
@@ -150,6 +154,10 @@ class ShowDetailsScreen(Screen):
self.query_one("#index_episode_digits_input", Input).value = str(self.__showDescriptor.getIndexEpisodeDigits())
self.query_one("#indicator_season_digits_input", Input).value = str(self.__showDescriptor.getIndicatorSeasonDigits())
self.query_one("#indicator_episode_digits_input", Input).value = str(self.__showDescriptor.getIndicatorEpisodeDigits())
if self.__showDescriptor.getQuality():
self.query_one("#quality_input", Input).value = str(self.__showDescriptor.getQuality())
if self.__showDescriptor.getNotes():
self.query_one("#notes_textarea", TextArea).text = str(self.__showDescriptor.getNotes())
#raise click.ClickException(f"show_id {showId}")
@@ -211,11 +219,17 @@ class ShowDetailsScreen(Screen):
if row_key is not None:
selected_row_data = self.shiftedSeasonsTable.get_row(row_key)
def parse_int_or_default(value: str, default: int) -> int:
try:
return int(value)
except (TypeError, ValueError):
return default
shiftedSeasonObj['original_season'] = int(selected_row_data[0])
shiftedSeasonObj['first_episode'] = int(selected_row_data[1]) if selected_row_data[1].isnumeric() else -1
shiftedSeasonObj['last_episode'] = int(selected_row_data[2]) if selected_row_data[2].isnumeric() else -1
shiftedSeasonObj['season_offset'] = int(selected_row_data[3]) if selected_row_data[3].isnumeric() else 0
shiftedSeasonObj['episode_offset'] = int(selected_row_data[4]) if selected_row_data[4].isnumeric() else 0
shiftedSeasonObj['first_episode'] = parse_int_or_default(selected_row_data[1], -1)
shiftedSeasonObj['last_episode'] = parse_int_or_default(selected_row_data[2], -1)
shiftedSeasonObj['season_offset'] = parse_int_or_default(selected_row_data[3], 0)
shiftedSeasonObj['episode_offset'] = parse_int_or_default(selected_row_data[4], 0)
if self.__showDescriptor is not None:
@@ -308,7 +322,7 @@ class ShowDetailsScreen(Screen):
self.shiftedSeasonsTable = DataTable(classes="five")
self.column_key_original_season = self.shiftedSeasonsTable.add_column("Original Season", width=30)
self.column_key_original_season = self.shiftedSeasonsTable.add_column("Source Season", width=30)
self.column_key_first_episode = self.shiftedSeasonsTable.add_column("First Episode", width=30)
self.column_key_last_episode = self.shiftedSeasonsTable.add_column("Last Episode", width=30)
self.column_key_season_offset = self.shiftedSeasonsTable.add_column("Season Offset", width=30)
@@ -342,28 +356,36 @@ class ShowDetailsScreen(Screen):
yield Input(type="integer", id="year_input", classes="four")
#5
yield Static(" ", classes="five")
yield Static("Quality")
yield Input(type="integer", id="quality_input", classes="four")
#6
yield Static("Notes")
yield Static(" ", classes="four")
#7
yield TextArea(id="notes_textarea", classes="five note_box")
#8
yield Static("Index Season Digits")
yield Input(type="integer", id="index_season_digits_input", classes="four")
#7
#9
yield Static("Index Episode Digits")
yield Input(type="integer", id="index_episode_digits_input", classes="four")
#8
#10
yield Static("Indicator Season Digits")
yield Input(type="integer", id="indicator_season_digits_input", classes="four")
#9
#11
yield Static("Indicator Edisode Digits")
yield Input(type="integer", id="indicator_episode_digits_input", classes="four")
# 10
# 12
yield Static(" ", classes="five")
# 11
# 13
yield Static("Shifted seasons", classes="two")
if self.__showDescriptor is not None:
@@ -375,18 +397,18 @@ class ShowDetailsScreen(Screen):
yield Static(" ")
yield Static(" ")
# 12
# 14
yield self.shiftedSeasonsTable
# 13
# 15
yield Static("File patterns", classes="five")
# 14
# 16
yield self.patternTable
# 15
# 17
yield Static(" ", classes="five")
# 16
# 18
yield Button("Save", id="save_button")
yield Button("Cancel", id="cancel_button")
@@ -432,6 +454,11 @@ class ShowDetailsScreen(Screen):
kwargs[ShowDescriptor.INDICATOR_EPISODE_DIGITS_KEY] = int(self.query_one("#indicator_episode_digits_input", Input).value)
except ValueError:
pass
try:
kwargs[ShowDescriptor.QUALITY_KEY] = int(self.query_one("#quality_input", Input).value)
except ValueError:
pass
kwargs[ShowDescriptor.NOTES_KEY] = str(self.query_one("#notes_textarea", TextArea).text)
return ShowDescriptor(**kwargs)

View File

@@ -18,6 +18,7 @@ from tests.support.ffx_bundle import (
from ffx.pattern_controller import PatternController
from ffx.show_controller import ShowController
from ffx.show_descriptor import ShowDescriptor
from ffx.shifted_season_controller import ShiftedSeasonController
from ffx.track_codec import TrackCodec
from ffx.track_descriptor import TrackDescriptor
from ffx.track_type import TrackType
@@ -109,6 +110,31 @@ class UnmuxCliTests(unittest.TestCase):
finally:
dispose_controller_context(context)
def add_show_shift(
self,
*,
show_id: int,
original_season: int,
first_episode: int,
last_episode: int,
season_offset: int,
episode_offset: int,
) -> None:
context = build_controller_context(self.database_path)
try:
ShiftedSeasonController(context).addShiftedSeason(
showId=show_id,
shiftedSeasonObj={
"original_season": original_season,
"first_episode": first_episode,
"last_episode": last_episode,
"season_offset": season_offset,
"episode_offset": episode_offset,
},
)
finally:
dispose_controller_context(context)
def test_subtitles_only_without_output_directory_uses_configured_base_plus_label(self):
self.write_config(
{
@@ -223,6 +249,55 @@ class UnmuxCliTests(unittest.TestCase):
output_filenames,
)
def test_unmux_applies_shifted_season_mapping_to_output_filenames(self):
self.seed_matching_show(
r"^unmux_([sS][0-9]+[eE][0-9]+)\.mkv$",
indicator_season_digits=2,
indicator_episode_digits=2,
)
self.add_show_shift(
show_id=1,
original_season=1,
first_episode=1,
last_episode=99,
season_offset=1,
episode_offset=-88,
)
source_filename = "unmux_s01e89.mkv"
output_directory = self.workdir / "unmux-output"
output_directory.mkdir()
source_path = create_source_fixture(
self.workdir,
source_filename,
[
SourceTrackSpec(TrackType.VIDEO, identity="video-0"),
SourceTrackSpec(
TrackType.SUBTITLE,
identity="subtitle-1",
language="eng",
subtitle_lines=("subtitle payload",),
),
],
)
completed = run_ffx_unmux(
self.workdir,
self.home_dir,
self.database_path,
"--label",
"dball",
"--output-directory",
str(output_directory),
"--subtitles-only",
str(source_path),
)
self.assertCompleted(completed)
self.assertIn(
"Unmuxing stream 1 into file dball_S02E01_1_eng",
completed.stderr,
)
if __name__ == "__main__":
unittest.main()

View File

@@ -57,7 +57,7 @@ class UpgradeCommandTests(unittest.TestCase):
self.assertTrue(subprocess_calls[0][1]["capture_output"])
self.assertTrue(subprocess_calls[0][1]["text"])
def test_upgrade_resets_before_checkout_and_pull_when_user_confirms(self):
def test_upgrade_resets_then_fetches_and_checks_out_requested_branch_when_user_confirms(self):
runner = CliRunner()
repo_path = "/tmp/ffx-repo"
pip_path = "/tmp/ffx-venv/bin/pip"
@@ -85,8 +85,8 @@ class UpgradeCommandTests(unittest.TestCase):
[
['git', 'status', '--porcelain', '--untracked-files=no'],
['git', 'reset', '--hard', 'HEAD'],
['git', 'checkout', 'main'],
['git', 'pull'],
['git', 'fetch', 'origin', 'main'],
['git', 'checkout', '-B', 'main', 'FETCH_HEAD'],
[pip_path, 'install', '--upgrade', 'pip', 'setuptools', 'wheel'],
[pip_path, 'install', '--editable', '.'],
],
@@ -95,6 +95,39 @@ class UpgradeCommandTests(unittest.TestCase):
for args, kwargs in subprocess_calls[1:]:
self.assertEqual(repo_path, kwargs["cwd"], args)
def test_upgrade_pulls_current_branch_when_no_branch_is_requested(self):
runner = CliRunner()
repo_path = "/tmp/ffx-repo"
pip_path = "/tmp/ffx-venv/bin/pip"
subprocess_calls = []
def fake_run(args, **kwargs):
subprocess_calls.append((args, kwargs))
if args == ['git', 'status', '--porcelain', '--untracked-files=no']:
return self.make_completed(args, stdout="")
return self.make_completed(args)
with (
patch.object(cli, "getBundleRepoPath", return_value=repo_path),
patch.object(cli, "getBundlePipPath", return_value=pip_path),
patch.object(cli.os.path, "isdir", return_value=True),
patch.object(cli.os.path, "isfile", return_value=True),
patch.object(cli.subprocess, "run", side_effect=fake_run),
):
result = runner.invoke(cli.ffx, ["upgrade"])
self.assertEqual(0, result.exit_code, result.output)
self.assertEqual(
[
['git', 'status', '--porcelain', '--untracked-files=no'],
['git', 'pull'],
[pip_path, 'install', '--upgrade', 'pip', 'setuptools', 'wheel'],
[pip_path, 'install', '--editable', '.'],
],
[call[0] for call in subprocess_calls],
)
if __name__ == "__main__":
unittest.main()

View File

@@ -1,11 +1,14 @@
from __future__ import annotations
from pathlib import Path
import sqlite3
import sys
import tempfile
import unittest
from unittest.mock import patch
import click
SRC_ROOT = Path(__file__).resolve().parents[2] / "src"
@@ -15,8 +18,18 @@ if str(SRC_ROOT) not in sys.path:
from ffx.constants import DATABASE_VERSION # noqa: E402
from ffx.database import DATABASE_VERSION_KEY, databaseContext, getDatabaseVersion # noqa: E402
from ffx.model.shifted_season import ShiftedSeason # noqa: E402
from ffx.model.property import Property # noqa: E402
from ffx.model.show import Show # noqa: E402
from ffx.model.show import Base # noqa: E402
from ffx.show_controller import ShowController # noqa: E402
from ffx.show_descriptor import ShowDescriptor # noqa: E402
from ffx.shifted_season_controller import ShiftedSeasonController # noqa: E402
class StaticConfig:
def getData(self):
return {}
class DatabaseContextTests(unittest.TestCase):
@@ -27,6 +40,115 @@ class DatabaseContextTests(unittest.TestCase):
def tearDown(self):
self.tempdir.cleanup()
def create_demo_show_with_shift(self):
database_context = databaseContext(str(self.database_path))
context = {
"database": database_context,
"config": StaticConfig(),
"logger": object(),
}
try:
ShowController(context).updateShow(
ShowDescriptor(id=1, name="Demo", year=2000)
)
shifted_season_id = ShiftedSeasonController(context).addShiftedSeason(
showId=1,
shiftedSeasonObj={
"original_season": 1,
"first_episode": 1,
"last_episode": 10,
"season_offset": 1,
"episode_offset": -10,
},
)
finally:
database_context["engine"].dispose()
return shifted_season_id
def rewrite_shows_table_without_show_fields(self, cursor):
cursor.execute("ALTER TABLE shows RENAME TO shows_current")
cursor.execute(
"""
CREATE TABLE shows (
id INTEGER PRIMARY KEY,
name VARCHAR,
year INTEGER,
index_season_digits INTEGER,
index_episode_digits INTEGER,
indicator_season_digits INTEGER,
indicator_episode_digits INTEGER
)
"""
)
cursor.execute(
"""
INSERT INTO shows (
id,
name,
year,
index_season_digits,
index_episode_digits,
indicator_season_digits,
indicator_episode_digits
)
SELECT
id,
name,
year,
index_season_digits,
index_episode_digits,
indicator_season_digits,
indicator_episode_digits
FROM shows_current
"""
)
cursor.execute("DROP TABLE shows_current")
def rewrite_shifted_seasons_table_without_pattern_owner(self, cursor):
cursor.execute("DROP INDEX IF EXISTS ix_shifted_seasons_show_id")
cursor.execute("DROP INDEX IF EXISTS ix_shifted_seasons_pattern_id")
cursor.execute(
"ALTER TABLE shifted_seasons RENAME TO shifted_seasons_current"
)
cursor.execute(
"""
CREATE TABLE shifted_seasons (
id INTEGER PRIMARY KEY,
show_id INTEGER,
original_season INTEGER,
first_episode INTEGER DEFAULT -1,
last_episode INTEGER DEFAULT -1,
season_offset INTEGER DEFAULT 0,
episode_offset INTEGER DEFAULT 0,
FOREIGN KEY(show_id) REFERENCES shows(id) ON DELETE CASCADE
)
"""
)
cursor.execute(
"""
INSERT INTO shifted_seasons (
id,
show_id,
original_season,
first_episode,
last_episode,
season_offset,
episode_offset
)
SELECT
id,
show_id,
original_season,
first_episode,
last_episode,
season_offset,
episode_offset
FROM shifted_seasons_current
"""
)
cursor.execute("DROP TABLE shifted_seasons_current")
def test_database_context_bootstraps_new_database_with_current_version(self):
with patch("ffx.database.Base.metadata.create_all", wraps=Base.metadata.create_all) as mocked_create_all:
context = databaseContext(str(self.database_path))
@@ -78,6 +200,127 @@ class DatabaseContextTests(unittest.TestCase):
mocked_create_all.assert_not_called()
def test_database_context_migrates_v2_shifted_seasons_schema_to_v3(self):
shifted_season_id = self.create_demo_show_with_shift()
connection = sqlite3.connect(self.database_path)
try:
cursor = connection.cursor()
cursor.execute("PRAGMA foreign_keys=OFF")
self.rewrite_shifted_seasons_table_without_pattern_owner(cursor)
self.rewrite_shows_table_without_show_fields(cursor)
cursor.execute(
"UPDATE properties SET value = '2' WHERE key = ?",
(DATABASE_VERSION_KEY,),
)
connection.commit()
finally:
connection.close()
with patch("ffx.database.click.confirm", return_value=True) as mocked_confirm, patch(
"ffx.database.click.echo"
) as mocked_echo:
reopened_context = databaseContext(str(self.database_path))
try:
self.assertEqual(DATABASE_VERSION, getDatabaseVersion(reopened_context))
mocked_confirm.assert_called_once()
backup_path = Path(f"{self.database_path}.v2-to-v{DATABASE_VERSION}.bak")
self.assertTrue(backup_path.exists())
Session = reopened_context["session"]
session = Session()
try:
migrated_shifted_season = (
session.query(ShiftedSeason)
.filter(ShiftedSeason.id == shifted_season_id)
.first()
)
self.assertIsNotNone(migrated_shifted_season)
self.assertEqual(1, migrated_shifted_season.getShowId())
self.assertIsNone(migrated_shifted_season.getPatternId())
self.assertEqual(1, migrated_shifted_season.getOriginalSeason())
self.assertEqual(1, migrated_shifted_season.getFirstEpisode())
self.assertEqual(10, migrated_shifted_season.getLastEpisode())
migrated_show = session.query(Show).filter(Show.id == 1).first()
self.assertIsNotNone(migrated_show)
self.assertEqual(0, int(migrated_show.quality or 0))
self.assertEqual('', str(migrated_show.notes or ''))
finally:
session.close()
finally:
reopened_context["engine"].dispose()
echoedLines = [call.args[0] for call in mocked_echo.call_args_list]
self.assertIn("Database migration required.", echoedLines)
self.assertIn("Current version: 2", echoedLines)
self.assertIn(f"Target version: {DATABASE_VERSION}", echoedLines)
self.assertIn(
" 2 -> 3: ffx.model.migration.step_2_3 [present]",
echoedLines,
)
def test_database_context_aborts_migration_when_confirmation_is_declined(self):
context = databaseContext(str(self.database_path))
try:
Session = context["session"]
session = Session()
try:
version_row = (
session.query(Property)
.filter(Property.key == DATABASE_VERSION_KEY)
.first()
)
version_row.value = "2"
session.commit()
finally:
session.close()
finally:
context["engine"].dispose()
with patch("ffx.database.click.confirm", return_value=False), patch(
"ffx.database.click.echo"
):
with self.assertRaises(click.ClickException) as raisedContext:
databaseContext(str(self.database_path))
self.assertEqual("Database migration aborted by user.", str(raisedContext.exception))
self.assertFalse(Path(f"{self.database_path}.v2-to-v{DATABASE_VERSION}.bak").exists())
def test_database_context_repairs_current_show_schema_without_version_bump(self):
self.create_demo_show_with_shift()
connection = sqlite3.connect(self.database_path)
try:
cursor = connection.cursor()
cursor.execute("PRAGMA foreign_keys=OFF")
self.rewrite_shows_table_without_show_fields(cursor)
connection.commit()
finally:
connection.close()
with patch("ffx.database.click.confirm") as mocked_confirm, patch(
"ffx.database.click.echo"
) as mocked_echo:
reopened_context = databaseContext(str(self.database_path))
try:
self.assertEqual(DATABASE_VERSION, getDatabaseVersion(reopened_context))
Session = reopened_context["session"]
session = Session()
try:
repaired_show = session.query(Show).filter(Show.id == 1).first()
self.assertIsNotNone(repaired_show)
self.assertEqual(0, int(repaired_show.quality or 0))
self.assertEqual('', str(repaired_show.notes or ''))
finally:
session.close()
finally:
reopened_context["engine"].dispose()
mocked_confirm.assert_not_called()
mocked_echo.assert_not_called()
if __name__ == "__main__":
unittest.main()

View File

@@ -4,6 +4,7 @@ from pathlib import Path
import sys
import unittest
from unittest.mock import patch
from types import SimpleNamespace
SRC_ROOT = Path(__file__).resolve().parents[2] / "src"
@@ -15,6 +16,7 @@ if str(SRC_ROOT) not in sys.path:
from ffx.ffx_controller import FfxController # noqa: E402
from ffx.logging_utils import get_ffx_logger # noqa: E402
from ffx.media_descriptor import MediaDescriptor # noqa: E402
from ffx.show_descriptor import ShowDescriptor # noqa: E402
from ffx.track_codec import TrackCodec # noqa: E402
from ffx.track_descriptor import TrackDescriptor # noqa: E402
from ffx.track_type import TrackType # noqa: E402
@@ -134,6 +136,62 @@ class FfxControllerTests(unittest.TestCase):
self.assertIn("ENCODING_QUALITY=29", commands[0])
self.assertIn("ENCODING_PRESET=7", commands[0])
def test_run_job_uses_show_quality_when_pattern_quality_is_unset(self):
context = self.make_context(VideoEncoder.H264)
target_descriptor, source_descriptor = self.make_media_descriptors()
controller = FfxController(context, target_descriptor, source_descriptor)
commands = []
show_descriptor = ShowDescriptor(id=1, name="Show", year=2024, quality=23)
pattern = SimpleNamespace(quality=0)
with (
patch.object(
controller,
"executeCommandSequence",
side_effect=lambda command: commands.append(command) or ("", "", 0),
),
patch.object(context["logger"], "info") as mocked_info,
):
controller.runJob(
"input.mkv",
"output.mkv",
chainIteration=[],
currentPattern=pattern,
currentShowDescriptor=show_descriptor,
)
self.assertEqual(1, len(commands))
self.assertIn("ENCODING_QUALITY=23", commands[0])
mocked_info.assert_any_call("Setting quality 23 from show")
def test_run_job_prefers_pattern_quality_over_show_quality(self):
context = self.make_context(VideoEncoder.H264)
target_descriptor, source_descriptor = self.make_media_descriptors()
controller = FfxController(context, target_descriptor, source_descriptor)
commands = []
show_descriptor = ShowDescriptor(id=1, name="Show", year=2024, quality=23)
pattern = SimpleNamespace(quality=19)
with (
patch.object(
controller,
"executeCommandSequence",
side_effect=lambda command: commands.append(command) or ("", "", 0),
),
patch.object(context["logger"], "info") as mocked_info,
):
controller.runJob(
"input.mkv",
"output.mkv",
chainIteration=[],
currentPattern=pattern,
currentShowDescriptor=show_descriptor,
)
self.assertEqual(1, len(commands))
self.assertIn("ENCODING_QUALITY=19", commands[0])
mocked_info.assert_any_call("Setting quality 19 from pattern")
if __name__ == "__main__":
unittest.main()

View File

@@ -0,0 +1,79 @@
from __future__ import annotations
from pathlib import Path
import sys
import tempfile
import unittest
SRC_ROOT = Path(__file__).resolve().parents[2] / "src"
if str(SRC_ROOT) not in sys.path:
sys.path.insert(0, str(SRC_ROOT))
from ffx.logging_utils import get_ffx_logger # noqa: E402
from ffx.media_descriptor import MediaDescriptor # noqa: E402
from ffx.track_descriptor import TrackDescriptor # noqa: E402
from ffx.track_disposition import TrackDisposition # noqa: E402
from ffx.track_type import TrackType # noqa: E402
class MediaDescriptorImportSubtitlesTests(unittest.TestCase):
def make_descriptor(self) -> MediaDescriptor:
return MediaDescriptor(
context={"logger": get_ffx_logger()},
track_descriptors=[
TrackDescriptor(
index=3,
source_index=3,
sub_index=0,
track_type=TrackType.SUBTITLE,
tags={"language": "eng", "title": "DB Subtitle"},
disposition_set={TrackDisposition.DEFAULT},
)
],
)
def test_import_subtitles_preserves_target_dispositions_when_requested(self):
descriptor = self.make_descriptor()
with tempfile.TemporaryDirectory() as tmpdir:
sidecar_path = Path(tmpdir) / "dball_S01E01_3_deu_FOR.vtt"
sidecar_path.write_text("WEBVTT\n\n", encoding="utf-8")
descriptor.importSubtitles(
tmpdir,
"dball",
season=1,
episode=1,
preserve_dispositions=True,
)
track = descriptor.getSubtitleTracks()[0]
self.assertEqual(str(sidecar_path), track.getExternalSourceFilePath())
self.assertEqual("deu", track.getTags()["language"])
self.assertEqual({TrackDisposition.DEFAULT}, track.getDispositionSet())
def test_import_subtitles_uses_sidecar_dispositions_by_default(self):
descriptor = self.make_descriptor()
with tempfile.TemporaryDirectory() as tmpdir:
sidecar_path = Path(tmpdir) / "dball_S01E01_3_deu_FOR.vtt"
sidecar_path.write_text("WEBVTT\n\n", encoding="utf-8")
descriptor.importSubtitles(
tmpdir,
"dball",
season=1,
episode=1,
)
track = descriptor.getSubtitleTracks()[0]
self.assertEqual(str(sidecar_path), track.getExternalSourceFilePath())
self.assertEqual("deu", track.getTags()["language"])
self.assertEqual({TrackDisposition.FORCED}, track.getDispositionSet())
if __name__ == "__main__":
unittest.main()

View File

@@ -0,0 +1,47 @@
from __future__ import annotations
from pathlib import Path
import sys
import unittest
SRC_ROOT = Path(__file__).resolve().parents[2] / "src"
if str(SRC_ROOT) not in sys.path:
sys.path.insert(0, str(SRC_ROOT))
from ffx.model.migration import ( # noqa: E402
DatabaseVersionException,
getMigrationPlan,
loadMigrationStep,
migrateDatabase,
)
class MigrationTests(unittest.TestCase):
def test_get_migration_plan_lists_known_step_with_module_presence(self):
migrationPlan = getMigrationPlan(2, 3)
self.assertEqual(1, len(migrationPlan))
self.assertEqual(2, migrationPlan[0].versionFrom)
self.assertEqual(3, migrationPlan[0].versionTo)
self.assertEqual("ffx.model.migration.step_2_3", migrationPlan[0].moduleName)
self.assertTrue(migrationPlan[0].modulePresent)
def test_load_migration_step_returns_known_step(self):
migrationStep = loadMigrationStep(2, 3)
self.assertTrue(callable(migrationStep))
def test_migrate_database_raises_when_step_module_is_missing(self):
updatedVersions = []
with self.assertRaises(DatabaseVersionException):
migrateDatabase({}, 1, 2, lambda context, version: updatedVersions.append(version))
self.assertEqual([], updatedVersions)
if __name__ == "__main__":
unittest.main()

View File

@@ -0,0 +1,208 @@
from __future__ import annotations
import logging
from pathlib import Path
import sys
import tempfile
import unittest
from unittest.mock import patch
SRC_ROOT = Path(__file__).resolve().parents[2] / "src"
if str(SRC_ROOT) not in sys.path:
sys.path.insert(0, str(SRC_ROOT))
from ffx.database import databaseContext # noqa: E402
from ffx.model.pattern import Pattern # noqa: E402
from ffx.model.track import Track # noqa: E402
from ffx.show_controller import ShowController # noqa: E402
from ffx.show_descriptor import ShowDescriptor # noqa: E402
from ffx.shifted_season_controller import ShiftedSeasonController # noqa: E402
from ffx.track_type import TrackType # noqa: E402
class StaticConfig:
def __init__(self, data: dict | None = None):
self._data = data or {}
def getData(self):
return self._data
def make_logger(name: str) -> logging.Logger:
logger = logging.getLogger(name)
logger.handlers = []
logger.setLevel(logging.DEBUG)
logger.propagate = False
logger.addHandler(logging.NullHandler())
return logger
def make_context(database_path: Path) -> dict:
return {
"logger": make_logger(f"ffx-test-shifted-{database_path.stem}"),
"config": StaticConfig(),
"database": databaseContext(str(database_path)),
}
class ShiftedSeasonControllerTests(unittest.TestCase):
def setUp(self):
self.tempdir = tempfile.TemporaryDirectory()
self.database_path = Path(self.tempdir.name) / "shifted-season-test.db"
self.context = make_context(self.database_path)
self.show_controller = ShowController(self.context)
self.shifted_season_controller = ShiftedSeasonController(self.context)
def tearDown(self):
self.context["database"]["engine"].dispose()
self.tempdir.cleanup()
def add_show(self, show_id: int, name: str = "Demo Show"):
self.show_controller.updateShow(
ShowDescriptor(id=show_id, name=name, year=2000 + show_id)
)
def add_pattern(self, show_id: int, expression: str) -> int:
self.add_show(show_id)
Session = self.context["database"]["session"]
session = Session()
try:
pattern = Pattern(show_id=show_id, pattern=expression)
session.add(pattern)
session.flush()
session.add(
Track(
pattern_id=pattern.getId(),
track_type=TrackType.VIDEO.index(),
codec_name="h264",
index=0,
source_index=0,
disposition_flags=0,
audio_layout=0,
)
)
session.commit()
return pattern.getId()
finally:
session.close()
def test_shift_season_uses_show_mapping_when_no_pattern_mapping_exists(self):
pattern_id = self.add_pattern(1, r"^demo_(s[0-9]+e[0-9]+)\.mkv$")
self.shifted_season_controller.addShiftedSeason(
showId=1,
shiftedSeasonObj={
"original_season": 1,
"first_episode": 1,
"last_episode": 10,
"season_offset": 2,
"episode_offset": 5,
},
)
with patch.object(self.context["logger"], "info") as mocked_info:
shifted_season, shifted_episode = self.shifted_season_controller.shiftSeason(
showId=1,
patternId=pattern_id,
season=1,
episode=3,
)
self.assertEqual((3, 8), (shifted_season, shifted_episode))
mocked_info.assert_called_once_with(
"Setting season shift 1/3 -> 3/8 from show"
)
def test_shift_season_prefers_pattern_mapping_over_show_mapping(self):
pattern_id = self.add_pattern(1, r"^demo_(s[0-9]+e[0-9]+)\.mkv$")
self.shifted_season_controller.addShiftedSeason(
showId=1,
shiftedSeasonObj={
"original_season": 1,
"first_episode": 1,
"last_episode": 10,
"season_offset": 2,
"episode_offset": 5,
},
)
self.shifted_season_controller.addShiftedSeason(
patternId=pattern_id,
shiftedSeasonObj={
"original_season": 1,
"first_episode": 1,
"last_episode": 10,
"season_offset": 1,
"episode_offset": -2,
},
)
with patch.object(self.context["logger"], "info") as mocked_info:
shifted_season, shifted_episode = self.shifted_season_controller.shiftSeason(
showId=1,
patternId=pattern_id,
season=1,
episode=3,
)
self.assertEqual((2, 1), (shifted_season, shifted_episode))
mocked_info.assert_called_once_with(
"Setting season shift 1/3 -> 2/1 from pattern"
)
def test_shift_season_pattern_zero_offsets_override_show_mapping_to_identity(self):
pattern_id = self.add_pattern(1, r"^demo_(s[0-9]+e[0-9]+)\.mkv$")
self.shifted_season_controller.addShiftedSeason(
showId=1,
shiftedSeasonObj={
"original_season": 1,
"first_episode": 1,
"last_episode": 10,
"season_offset": 2,
"episode_offset": 5,
},
)
self.shifted_season_controller.addShiftedSeason(
patternId=pattern_id,
shiftedSeasonObj={
"original_season": 1,
"first_episode": 1,
"last_episode": 10,
"season_offset": 0,
"episode_offset": 0,
},
)
with patch.object(self.context["logger"], "info") as mocked_info:
shifted_season, shifted_episode = self.shifted_season_controller.shiftSeason(
showId=1,
patternId=pattern_id,
season=1,
episode=3,
)
self.assertEqual((1, 3), (shifted_season, shifted_episode))
mocked_info.assert_called_once_with(
"Setting season shift 1/3 -> 1/3 from pattern"
)
def test_shift_season_falls_back_to_identity_when_no_rule_matches(self):
pattern_id = self.add_pattern(1, r"^demo_(s[0-9]+e[0-9]+)\.mkv$")
with patch.object(self.context["logger"], "info") as mocked_info:
shifted_season, shifted_episode = self.shifted_season_controller.shiftSeason(
showId=1,
patternId=pattern_id,
season=4,
episode=20,
)
self.assertEqual((4, 20), (shifted_season, shifted_episode))
mocked_info.assert_called_once_with(
"Setting season shift 4/20 -> 4/20 from default"
)
if __name__ == "__main__":
unittest.main()

View File

@@ -56,6 +56,8 @@ class ShowDescriptorDefaultTests(unittest.TestCase):
self.assertEqual(3, descriptor.getIndexEpisodeDigits())
self.assertEqual(3, descriptor.getIndicatorSeasonDigits())
self.assertEqual(4, descriptor.getIndicatorEpisodeDigits())
self.assertEqual(0, descriptor.getQuality())
self.assertEqual("", descriptor.getNotes())
def test_show_descriptor_without_context_uses_shared_constants(self):
descriptor = ShowDescriptor(id=1, name="Default Show", year=2024)
@@ -70,6 +72,18 @@ class ShowDescriptorDefaultTests(unittest.TestCase):
DEFAULT_SHOW_INDICATOR_EPISODE_DIGITS,
descriptor.getIndicatorEpisodeDigits(),
)
self.assertEqual(0, descriptor.getQuality())
self.assertEqual("", descriptor.getNotes())
def test_show_descriptor_preserves_explicit_quality(self):
descriptor = ShowDescriptor(id=1, name="Quality Show", year=2024, quality=23)
self.assertEqual(23, descriptor.getQuality())
def test_show_descriptor_preserves_explicit_notes(self):
descriptor = ShowDescriptor(id=1, name="Notes Show", year=2024, notes="show notes")
self.assertEqual("show notes", descriptor.getNotes())
def test_episode_basename_uses_configured_digit_defaults_when_omitted(self):
basename = getEpisodeFileBasename(