import os, sys, click from .scenario import Scenario from .helper import createMediaTestFile from ffx.process import executeProcess from ffx.database import databaseContext from .helper import createEmptyDirectory from ffx.helper import getEpisodeFileBasename from ffx.file_properties import FileProperties from ffx.media_descriptor import MediaDescriptor from ffx.track_descriptor import TrackDescriptor from ffx.track_type import TrackType from ffx.track_disposition import TrackDisposition from .media_combinator import MediaCombinator from .indicator_combinator import IndicatorCombinator from ffx.show_descriptor import ShowDescriptor from ffx.tmdb_controller import TmdbController from ffx.tmdb_controller import TMDB_API_KEY_NOT_PRESENT_EXCEPTION class Scenario4(Scenario): TEST_SHOW_IDENTIFIER = 83095 TEST_SHOW_NAME = 'The Rising of the Shield Hero' TEST_SHOW_YEAR = 2019 TEST_FILE_LABEL = 'rotsh' TEST_FILE_EXTENSION = 'mkv' TEST_PATTERN = f"{TEST_FILE_LABEL}_{FileProperties.SE_INDICATOR_PATTERN}.{TEST_FILE_EXTENSION}" EXPECTED_FILE_EXTENSION = 'webm' def __init__(self, context): super().__init__(context) self.__tmdbApiKey = os.environ.get('TMDB_API_KEY', None) if self.__tmdbApiKey is None: raise TMDB_API_KEY_NOT_PRESENT_EXCEPTION self.__ic = IndicatorCombinator(context = context) self.__tc = TmdbController() kwargs = {} kwargs[ShowDescriptor.ID_KEY] = Scenario4.TEST_SHOW_IDENTIFIER kwargs[ShowDescriptor.NAME_KEY] = Scenario4.TEST_SHOW_NAME kwargs[ShowDescriptor.YEAR_KEY] = Scenario4.TEST_SHOW_YEAR self.__testShowDescriptor = ShowDescriptor(**kwargs) def getScenario(self): return self.__class__.__name__[8:] def prepareTestDatabase(self, sourceMediaDescriptor: MediaDescriptor): if not self._context['database'] is None: self._context['database']['engine'].dispose() if os.path.isfile(self._testDbFilePath): os.unlink(self._testDbFilePath) self._context['database'] = None self._logger.debug(f"Creating test db with path {self._testDbFilePath}") self._context['database'] = databaseContext(databasePath=self._testDbFilePath) self._logger.debug(f"Adding test show '{self.__testShowDescriptor.getFilenamePrefix()}' to test db") if not self._sc.updateShow(self.__testShowDescriptor): raise click.ClickException('Could not create test show in db') testPatternDescriptor = { 'show_id': Scenario4.TEST_SHOW_IDENTIFIER, 'pattern': Scenario4.TEST_PATTERN } patternId = self._pc.addPattern(testPatternDescriptor) if patternId: self._mc.setPatternMediaDescriptor(sourceMediaDescriptor, patternId) def job(self, yieldObj: dict): testContext = self._context.copy() if 'preset' not in yieldObj.keys(): raise KeyError('yieldObj did not contain presets') targetYieldObj = yieldObj['target'] presetYieldObj = yieldObj['preset'] identifier = targetYieldObj['identifier'] variantList = targetYieldObj['variants'] variantIdentifier = '-'.join(variantList) variantLabel = f"{self.__class__.__name__} Variant {variantIdentifier}" sourceMediaDescriptor: MediaDescriptor = targetYieldObj['payload'] presetMediaDescriptor: MediaDescriptor = presetYieldObj['payload'] assertSelectorList: list = presetYieldObj['assertSelectors'] assertFuncList = presetYieldObj['assertFuncs'] shouldFail = presetYieldObj['shouldFail'] if self._context['test_variant'] and not variantIdentifier.startswith(self._context['test_variant']): return if (self._context['test_limit'] and (self._context['test_passed_counter'] + self._context['test_failed_counter']) >= self._context['test_limit']): return self._logger.debug(f"Running Job: {variantLabel}") for l in presetMediaDescriptor.getConfiguration(label = 'presetMediaDescriptor'): self._logger.debug(l) for l in sourceMediaDescriptor.getConfiguration(label = 'sourceMediaDescriptor'): self._logger.debug(l) # Phase 1: Setup source files self.clearTestDirectory() testFileList = [] for indicatorObj in [y for y in self.__ic.getYield() if y['indicator']]: indicator = indicatorObj['indicator'] testFileObj = {} testFileObj['season'] = indicatorObj['season'] testFileObj['episode'] = indicatorObj['episode'] testFileObj['basename'] = f"{Scenario4.TEST_FILE_LABEL}_{indicator}" testFileObj['path'] = createMediaTestFile(mediaDescriptor = presetMediaDescriptor, directory = self._testDirectory, baseName = testFileObj['basename'], logger=self._logger, length = 2) testFileObj['filename'] = f"{testFileObj['basename']}.{Scenario4.TEST_FILE_EXTENSION}" testFileList.append(testFileObj) # Phase 2: Prepare database self.createEmptyTestDatabase() self.prepareTestDatabase(sourceMediaDescriptor) # Phase 3: Run ffx commandSequence = [sys.executable, '-m', self._ffxModuleName] if self._context['verbosity']: commandSequence += ['--verbose', str(self._context['verbosity'])] commandSequence += ['--database-file', self._testDbFilePath, 'convert'] commandSequence += [tfo['filename'] for tfo in testFileList] commandSequence += ['--no-prompt', '--no-signature'] out, err, rc = executeProcess(commandSequence, directory = self._testDirectory, context = self._context) if out and self._context['verbosity'] >= 9: self._logger.debug(f"{variantLabel}: Process output: {out}") if rc: self._logger.debug(f"{variantLabel}: Process returned ERROR {rc} ({err})") # Phase 4: Evaluate results resultFilenames = [rf for rf in self.getFilenamesInTestDirectory() if rf.endswith(f".{Scenario4.EXPECTED_FILE_EXTENSION}")] self._logger.debug(f"{variantLabel}: Result filenames: {resultFilenames}") try: jobFailed = bool(rc) self._logger.debug(f"{variantLabel}: Should fail: {shouldFail} / actually failed: {jobFailed}") assert (jobFailed == shouldFail ), f"Process {'failed' if jobFailed else 'did not fail'}" if not jobFailed: for tfo in testFileList: tmdbEpisodeResult = self.__tc.queryEpisode(Scenario4.TEST_SHOW_IDENTIFIER, tfo['season'], tfo['episode']) expectedFileBasename = getEpisodeFileBasename(self.__testShowDescriptor.getFilenamePrefix(), tmdbEpisodeResult['name'], tfo['season'], tfo['episode'], context=testContext) expectedFilename = f"{expectedFileBasename}.{Scenario4.EXPECTED_FILE_EXTENSION}" expectedFilePath = os.path.join(self._testDirectory, expectedFilename) assert (os.path.isfile(expectedFilePath) ), f"Result file '{expectedFilename}' in path '{self._testDirectory}' wasn't created" rfp = FileProperties(testContext, expectedFilePath) self._logger.debug(f"{variantLabel}: Result file properties: {rfp.getFilename()} season={rfp.getSeason()} episode={rfp.getEpisode()}") rmd = rfp.getMediaDescriptor() # rmt = rmd.getAllTrackDescriptors() rmt = rmd.getTrackDescriptors() for l in rmd.getConfiguration(label = 'resultMediaDescriptor'): self._logger.debug(l) # num tracks differ rmd.applySourceIndices(sourceMediaDescriptor) for assertIndex in range(len(assertSelectorList)): assertSelector = assertSelectorList[assertIndex] assertFunc = assertFuncList[assertIndex] assertVariant = variantList[assertIndex] if assertSelector == 'M': assertFunc() for variantIndex in range(len(assertVariant)): assert (assertVariant[variantIndex].lower() == rmt[variantIndex].getType().indicator() ), f"Stream #{variantIndex} is not of type {rmt[variantIndex].getType().label()}" if assertSelector == 'AD' or assertSelector == 'AT': assertFunc({'tracks': rmd.getAudioTracks()}) elif assertSelector == 'SD' or assertSelector == 'ST': assertFunc({'tracks': rmd.getSubtitleTracks()}) elif type(assertSelector) is str: if assertSelector == 'J': assertFunc() self._context['test_passed_counter'] += 1 self._reportLogger.info(f"\n{variantLabel}: Test passed\n") except AssertionError as ae: self._context['test_failed_counter'] += 1 self._reportLogger.error(f"\n{variantLabel}: Test FAILED ({ae})\n") def run(self): MC_list = [MediaCombinator.getClassReference(6)] for MC in MC_list: self._logger.debug(f"MC={MC.__name__}") mc = MC(context = self._context, createPresets = True) for y in mc.getYield(): self.job(y)