You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
287 lines
10 KiB
Python
287 lines
10 KiB
Python
import os, sys, click
|
|
|
|
from .scenario import Scenario
|
|
|
|
from ffx.test.helper import createMediaTestFile
|
|
from ffx.process import executeProcess
|
|
from ffx.database import databaseContext
|
|
from ffx.test.helper import createEmptyDirectory
|
|
|
|
from ffx.file_properties import FileProperties
|
|
|
|
from ffx.media_descriptor import MediaDescriptor
|
|
from ffx.track_descriptor import TrackDescriptor
|
|
|
|
from ffx.track_type import TrackType
|
|
from ffx.track_disposition import TrackDisposition
|
|
|
|
from ffx.test.media_combinator import MediaCombinator
|
|
from ffx.test.indicator_combinator import IndicatorCombinator
|
|
|
|
from ffx.show_descriptor import ShowDescriptor
|
|
|
|
|
|
from ffx.tmdb_controller import TmdbController
|
|
from ffx.tmdb_controller import TMDB_API_KEY_NOT_PRESENT_EXCEPTION
|
|
|
|
class Scenario4(Scenario):
|
|
|
|
TEST_SHOW_IDENTIFIER = 83095
|
|
TEST_SHOW_NAME = 'The Rising of the Shield Hero'
|
|
TEST_SHOW_YEAR = 2019
|
|
|
|
TEST_FILE_LABEL = 'rotsh'
|
|
TEST_FILE_EXTENSION = 'mkv'
|
|
|
|
TEST_PATTERN = f"{TEST_FILE_LABEL}_{FileProperties.SE_INDICATOR_PATTERN}.{TEST_FILE_EXTENSION}"
|
|
|
|
EXPECTED_FILE_EXTENSION = 'webm'
|
|
|
|
|
|
def __init__(self, context):
|
|
super().__init__(context)
|
|
|
|
self.__tmdbApiKey = os.environ.get('TMDB_API_KEY', None)
|
|
if self.__tmdbApiKey is None:
|
|
raise TMDB_API_KEY_NOT_PRESENT_EXCEPTION
|
|
|
|
self.__ic = IndicatorCombinator(context = context)
|
|
self.__tc = TmdbController()
|
|
|
|
|
|
kwargs = {}
|
|
kwargs[ShowDescriptor.ID_KEY] = Scenario4.TEST_SHOW_IDENTIFIER
|
|
kwargs[ShowDescriptor.NAME_KEY] = Scenario4.TEST_SHOW_NAME
|
|
kwargs[ShowDescriptor.YEAR_KEY] = Scenario4.TEST_SHOW_YEAR
|
|
|
|
self.__testShowDescriptor = ShowDescriptor(**kwargs)
|
|
|
|
def getScenario(self):
|
|
return self.__class__.__name__[8:]
|
|
|
|
|
|
def prepareTestDatabase(self, sourceMediaDescriptor: MediaDescriptor):
|
|
|
|
if not self._context['database'] is None:
|
|
self._context['database']['engine'].dispose()
|
|
|
|
if os.path.isfile(self._testDbFilePath):
|
|
os.unlink(self._testDbFilePath)
|
|
self._context['database'] = None
|
|
|
|
self._logger.debug(f"Creating test db with path {self._testDbFilePath}")
|
|
self._context['database'] = databaseContext(databasePath=self._testDbFilePath)
|
|
|
|
|
|
self._logger.debug(f"Adding test show '{self.__testShowDescriptor.getFilenamePrefix()}' to test db")
|
|
if not self._sc.updateShow(self.__testShowDescriptor):
|
|
raise click.ClickException('Could not create test show in db')
|
|
|
|
testPatternDescriptor = {
|
|
'show_id': Scenario4.TEST_SHOW_IDENTIFIER,
|
|
'pattern': Scenario4.TEST_PATTERN
|
|
}
|
|
patternId = self._pc.addPattern(testPatternDescriptor)
|
|
|
|
if patternId:
|
|
self._mc.setPatternMediaDescriptor(sourceMediaDescriptor, patternId)
|
|
|
|
|
|
def job(self, yieldObj: dict):
|
|
|
|
testContext = self._context.copy()
|
|
|
|
if 'preset' not in yieldObj.keys():
|
|
raise KeyError('yieldObj did not contain presets')
|
|
|
|
targetYieldObj = yieldObj['target']
|
|
presetYieldObj = yieldObj['preset']
|
|
|
|
identifier = targetYieldObj['identifier']
|
|
variantList = targetYieldObj['variants']
|
|
|
|
variantIdentifier = '-'.join(variantList)
|
|
variantLabel = f"{self.__class__.__name__} Variant {variantIdentifier}"
|
|
|
|
sourceMediaDescriptor: MediaDescriptor = targetYieldObj['payload']
|
|
presetMediaDescriptor: MediaDescriptor = presetYieldObj['payload']
|
|
|
|
assertSelectorList: list = presetYieldObj['assertSelectors']
|
|
assertFuncList = presetYieldObj['assertFuncs']
|
|
shouldFail = presetYieldObj['shouldFail']
|
|
|
|
try:
|
|
jellyfinSelectorIndex = assertSelectorList.index('J')
|
|
jellyfinVariant = variantList[jellyfinSelectorIndex]
|
|
testContext['use_jellyfin'] = jellyfinVariant == 'J1'
|
|
except ValueError:
|
|
jellyfinSelectorIndex = -1
|
|
|
|
|
|
if self._context['test_variant'] and not variantIdentifier.startswith(self._context['test_variant']):
|
|
return
|
|
|
|
if ((self._context['test_passed_counter'] + self._context['test_failed_counter'])
|
|
>= self._context['test_limit']):
|
|
return
|
|
|
|
self._logger.debug(f"Running Job: {variantLabel}")
|
|
|
|
|
|
for l in presetMediaDescriptor.getConfiguration(label = 'presetMediaDescriptor'):
|
|
self._logger.debug(l)
|
|
|
|
for l in sourceMediaDescriptor.getConfiguration(label = 'sourceMediaDescriptor'):
|
|
self._logger.debug(l)
|
|
|
|
|
|
# Phase 1: Setup source files
|
|
|
|
self.clearTestDirectory()
|
|
|
|
testFileList = []
|
|
for indicatorObj in [y for y in self.__ic.getYield() if y['indicator']]:
|
|
|
|
indicator = indicatorObj['indicator']
|
|
|
|
testFileObj = {}
|
|
testFileObj['season'] = indicatorObj['season']
|
|
testFileObj['episode'] = indicatorObj['episode']
|
|
|
|
testFileObj['basename'] = f"{Scenario4.TEST_FILE_LABEL}_{indicator}"
|
|
|
|
testFileObj['path'] = createMediaTestFile(mediaDescriptor = presetMediaDescriptor,
|
|
directory = self._testDirectory,
|
|
baseName = testFileObj['basename'],
|
|
logger=self._logger,
|
|
length = 2)
|
|
testFileObj['filename'] = f"{testFileObj['basename']}.{Scenario4.TEST_FILE_EXTENSION}"
|
|
|
|
testFileList.append(testFileObj)
|
|
|
|
|
|
# Phase 2: Prepare database
|
|
|
|
self.createEmptyTestDatabase()
|
|
self.prepareTestDatabase(sourceMediaDescriptor)
|
|
|
|
|
|
# Phase 3: Run ffx
|
|
|
|
commandSequence = [sys.executable,
|
|
self._ffxExecutablePath]
|
|
|
|
if self._context['verbosity']:
|
|
commandSequence += ['--verbose',
|
|
str(self._context['verbosity'])]
|
|
|
|
commandSequence += ['--database-file',
|
|
self._testDbFilePath,
|
|
'convert']
|
|
commandSequence += [tfo['filename'] for tfo in testFileList]
|
|
|
|
commandSequence += ['--no-prompt', '--no-signature']
|
|
|
|
# if not testContext['use_jellyfin']:
|
|
# commandSequence += ['--no-jellyfin']
|
|
|
|
self._logger.debug(f"{variantLabel}: Test sequence: {commandSequence}")
|
|
|
|
out, err, rc = executeProcess(commandSequence, directory = self._testDirectory, niceness=self._niceness, cpu_percent=self._cpuPercent)
|
|
|
|
if out and self._context['verbosity'] >= 9:
|
|
self._logger.debug(f"{variantLabel}: Process output: {out}")
|
|
if rc:
|
|
self._logger.debug(f"{variantLabel}: Process returned ERROR {rc} ({err})")
|
|
|
|
|
|
# Phase 4: Evaluate results
|
|
|
|
resultFilenames = [rf for rf in self.getFilenamesInTestDirectory() if rf.endswith(f".{Scenario4.EXPECTED_FILE_EXTENSION}")]
|
|
|
|
self._logger.debug(f"{variantLabel}: Result filenames: {resultFilenames}")
|
|
|
|
|
|
try:
|
|
|
|
jobFailed = bool(rc)
|
|
self._logger.debug(f"{variantLabel}: Should fail: {shouldFail} / actually failed: {jobFailed}")
|
|
|
|
assert (jobFailed == shouldFail
|
|
), f"Process {'failed' if jobFailed else 'did not fail'}"
|
|
|
|
if not jobFailed:
|
|
|
|
for tfo in testFileList:
|
|
|
|
tmdbEpisodeResult = self.__tc.queryEpisode(Scenario4.TEST_SHOW_IDENTIFIER,
|
|
tfo['season'], tfo['episode'])
|
|
|
|
expectedFileBasename = TmdbController.getEpisodeFileBasename(self.__testShowDescriptor.getFilenamePrefix(),
|
|
tmdbEpisodeResult['name'],
|
|
tfo['season'], tfo['episode'])
|
|
|
|
expectedFilename = f"{expectedFileBasename}.{Scenario4.EXPECTED_FILE_EXTENSION}"
|
|
expectedFilePath = os.path.join(self._testDirectory, expectedFilename)
|
|
|
|
assert (os.path.isfile(expectedFilePath)
|
|
), f"Result file '{expectedFilename}' in path '{self._testDirectory}' wasn't created"
|
|
|
|
|
|
rfp = FileProperties(testContext, expectedFilePath)
|
|
self._logger.debug(f"{variantLabel}: Result file properties: {rfp.getFilename()} season={rfp.getSeason()} episode={rfp.getEpisode()}")
|
|
|
|
rmd = rfp.getMediaDescriptor()
|
|
rmt = rmd.getAllTrackDescriptors()
|
|
|
|
for l in rmd.getConfiguration(label = 'resultMediaDescriptor'):
|
|
self._logger.debug(l)
|
|
|
|
# if testContext['use_jellyfin']:
|
|
# sourceMediaDescriptor.applyJellyfinOrder()
|
|
|
|
# num tracks differ
|
|
rmd.applySourceIndices(sourceMediaDescriptor)
|
|
|
|
|
|
for assertIndex in range(len(assertSelectorList)):
|
|
|
|
assertSelector = assertSelectorList[assertIndex]
|
|
assertFunc = assertFuncList[assertIndex]
|
|
assertVariant = variantList[assertIndex]
|
|
|
|
if assertSelector == 'M':
|
|
assertFunc()
|
|
for variantIndex in range(len(assertVariant)):
|
|
assert (assertVariant[variantIndex].lower() == rmt[variantIndex].getType().indicator()
|
|
), f"Stream #{variantIndex} is not of type {rmt[variantIndex].getType().label()}"
|
|
|
|
if assertSelector == 'AD' or assertSelector == 'AT':
|
|
assertFunc({'tracks': rmd.getAudioTracks()})
|
|
|
|
elif assertSelector == 'SD' or assertSelector == 'ST':
|
|
assertFunc({'tracks': rmd.getSubtitleTracks()})
|
|
|
|
elif type(assertSelector) is str:
|
|
if assertSelector == 'J':
|
|
assertFunc()
|
|
|
|
|
|
self._context['test_passed_counter'] += 1
|
|
self._reportLogger.info(f"\n{variantLabel}: Test passed\n")
|
|
|
|
except AssertionError as ae:
|
|
|
|
self._context['test_failed_counter'] += 1
|
|
self._reportLogger.error(f"\n{variantLabel}: Test FAILED ({ae})\n")
|
|
|
|
|
|
def run(self):
|
|
|
|
MC_list = [MediaCombinator.getClassReference(6)]
|
|
for MC in MC_list:
|
|
self._logger.debug(f"MC={MC.__name__}")
|
|
mc = MC(context = self._context, createPresets = True)
|
|
for y in mc.getYield():
|
|
self.job(y)
|