Scenario 1 MWE

click-textual
Maveno 11 months ago
parent b9185b5b07
commit 696b2b56d3

@ -491,13 +491,19 @@ def convert(ctx,
extra = ['ffx'] if sourceFilenameExtension == FfxController.DEFAULT_FILE_EXTENSION else []
click.echo(f"label={label if label else 'Falsy'}")
click.echo(f"sourceFileBasename={sourceFileBasename}")
targetFilename = (sourceFileBasename if context['use_tmdb']
else mediaFileProperties.assembleTargetFileBasename(label if label else sourceFileBasename,
else mediaFileProperties.assembleTargetFileBasename(label,
q if len(q_list) > 1 else -1,
extraTokens = extra))
targetPath = os.path.join(output_directory if output_directory else sourceDirectory, targetFilename)
# media_S01E02_S01E02
click.echo(f"targetPath={targetPath}")
fc.runJob(sourcePath,
targetPath,
context['video_encoder'],

@ -213,9 +213,10 @@ class FileProperties():
# targetFilenameExtension = FfxController.DEFAULT_FILE_EXTENSION if extension is None else str(extension)
if not label:
targetFilenameTokens = [self.__sourceFileBasename]
else:
click.echo(f"assembleTargetFileBasename(): label={label} is {'truthy' if label else 'falsy'}")
if label:
targetFilenameTokens = [label]
if fileIndex > -1:
@ -225,6 +226,10 @@ class FileProperties():
elif self.__episode > -1:
targetFilenameTokens += [f"E{self.__episode:0{episode_digits}d}"]
else:
targetFilenameTokens = [self.__sourceFileBasename]
if quality != -1:
targetFilenameTokens += [f"q{quality}"]
@ -235,6 +240,7 @@ class FileProperties():
targetFilename = '_'.join(targetFilenameTokens)
self.__logger.debug(f"assembleTargetFileBasename(): Target filename: {targetFilename}")
#self.__logger.debug(f"assembleTargetFileBasename(): Target filename: {targetFilename}")
click.echo(f"assembleTargetFileBasename(): Target filename: {targetFilename}")
return targetFilename

@ -4,6 +4,8 @@ class BasenameCombinator():
IDENTIFIER = 'basename'
BASENAME = 'media'
def __init__(self, context = None):
self._context = context
self._logger = context['logger']

@ -8,9 +8,13 @@ from ffx.media_descriptor import MediaDescriptor
from .basename_combinator import BasenameCombinator
from .indicator_combinator import IndicatorCombinator
from .label_combinator import LabelCombinator
from .title_combinator import TitleCombinator
from .site_combinator import SiteCombinator
class BasenameCombinator0(BasenameCombinator):
"""base_indicator"""
VARIANT = 'B0'
@ -23,62 +27,95 @@ class BasenameCombinator0(BasenameCombinator):
def getVariant(self):
return BasenameCombinator0.VARIANT
def getPayload(self):
return {}
def getPayload(self, indicator = '', label = ''):
basename = BasenameCombinator.BASENAME
expectedBasename = label if label else BasenameCombinator.BASENAME
if indicator:
basename += f"_{indicator}"
expectedBasename += f"_{indicator}"
return {'basename': basename,
'label': label,
'expectedBasename': expectedBasename}
# testParameterBasename = yieldObj['payload']['basename']
# testParameterExtension = 'mkv'
# testParameterFilename = f"{testParameterBasename}.{testParameterExtension}"
#
# #TODO: alles in payload verpacken
# testParameterLabel = yieldObj['payload']['label']
# testExpectedExtension = 'webm'
# testExpectedFilename = f"{testParameterBasename}.{testExpectedExtension}"
def assertFunc(self, indicator = '', label = ''):
def f(testObj: dict = {}):
if not 'filenames' in testObj.keys():
raise KeyError("testObj does not contain key 'filenames'")
fNames = testObj['filenames']
assert len(fNames) == 1, "More than one result file was created"
resultFilename = fNames[0]
fTokens = resultFilename.split('.')
resultBasename = '.'.join(fTokens[:-1])
resultExtension = fTokens[-1]
if not indicator and not label:
assert resultBasename == BasenameCombinator.BASENAME, f"Result basename is not {BasenameCombinator.BASENAME}"
if not indicator and label:
assert resultBasename == label, f"Result basename is not {label}"
if indicator and not label:
assert resultBasename == f"{BasenameCombinator.BASENAME}_{indicator}", f"Result basename is not {BasenameCombinator.BASENAME}_{indicator}"
if indicator and label:
assert resultBasename == f"{label}_{indicator}", f"Result basename is not {label}_{indicator}"
return f
def assertFunc(self, testObj = {}):
pass
def shouldFail(self):
return False
def getYield(self):
for MTC in MediaTagCombinator.getAllClassReferences():
for DC2_A in DispositionCombinator2.getAllClassReferences():
for TC2_A in TrackTagCombinator2.getAllClassReferences():
for DC3_S in DispositionCombinator3.getAllClassReferences():
for TC3_S in TrackTagCombinator3.getAllClassReferences():
for J in JellyfinCombinator.getAllClassReferences():
ic = IndicatorCombinator(self._context)
j = J(self._context)
self._context['use_jellyfin'] = j.getPayload()
for L in LabelCombinator.getAllClassReferences():
for i in ic.getYield():
# for S in SiteCombinator.getAllClassReferences():
# for T in TitleCombinator.getAllClassReferences():
#
dc2a = DC2_A(self._context)
tc2a = TC2_A(self._context)
dc3s = DC3_S(self._context)
tc3s = TC3_S(self._context)
mtc = MTC(self._context)
l = L(self._context)
indicator = i['indicator']
indicatorVariant = i['variant']
yieldObj = {}
yieldObj['identifier'] = self.getIdentifier()
yieldObj['variants'] = [self.getVariant(),
f"A:{dc2a.getVariant()}",
f"A:{tc2a.getVariant()}",
f"S:{dc3s.getVariant()}",
f"S:{tc3s.getVariant()}",
mtc.getVariant(),
j.getVariant()]
l.getVariant(),
indicatorVariant]
yieldObj['payload'] = self.getPayload(dc2a.getPayload(), tc2a.getPayload(), dc3s.getPayload(), tc3s.getPayload())
yieldObj['payload'] = self.getPayload(indicator = indicator,
label = l.getPayload())
yieldObj['assertSelectors'] = ['M', 'AD', 'AT', 'SD', 'ST', 'MT', 'J']
yieldObj['assertSelectors'] = ['B', 'L', 'I']
yieldObj['assertFuncs'] = [self.assertFunc,
dc2a.assertFunc,
tc2a.assertFunc,
dc3s.assertFunc,
tc3s.assertFunc,
mtc.assertFunc,
j.assertFunc]
yieldObj['assertFuncs'] = [self.assertFunc(indicator, l.getPayload()), l.assertFunc, ic.assertFunc]
yieldObj['shouldFail'] = (self.shouldFail()
| dc2a.shouldFail()
| tc2a.shouldFail()
| dc3s.shouldFail()
| tc3s.shouldFail()
| mtc.shouldFail()
| j.shouldFail())
| l.shouldFail()
| ic.shouldFail())
yield yieldObj

@ -8,9 +8,13 @@ from ffx.media_descriptor import MediaDescriptor
from .basename_combinator import BasenameCombinator
from .indicator_combinator import IndicatorCombinator
from .label_combinator import LabelCombinator
from .title_combinator import TitleCombinator
from .site_combinator import SiteCombinator
class BasenameCombinator1(BasenameCombinator):
"""show_indicator_group"""
VARIANT = 'B1'
@ -27,64 +31,40 @@ class BasenameCombinator1(BasenameCombinator):
return BasenameCombinator1.VARIANT
def getPayload(self):
return {'From': 'Encoders'}
return ''
def assertFunc(self, mediaDescriptor: MediaDescriptor):
mediaTags = mediaDescriptor.getTags()
assert 'From' in mediaTags.keys(), "'From' not in media tag keys"
assert mediaTags.keys()['From'] == 'Encoders', "Media tag value not 'Encoders' for key 'To'"
pass
def shouldFail(self):
return False
def getYield(self):
for MTC in MediaTagCombinator.getAllClassReferences():
for DC2_A in DispositionCombinator2.getAllClassReferences():
for TC2_A in TrackTagCombinator2.getAllClassReferences():
for DC3_S in DispositionCombinator3.getAllClassReferences():
for TC3_S in TrackTagCombinator3.getAllClassReferences():
for J in JellyfinCombinator.getAllClassReferences():
def getYield(self):
j = J(self._context)
self._context['use_jellyfin'] = j.getPayload()
for L in LabelCombinator.getAllClassReferences():
# for I in IndicatorCombinator.getAllClassReferences():
# for S in SiteCombinator.getAllClassReferences():
# for T in TitleCombinator.getAllClassReferences():
#
dc2a = DC2_A(self._context)
tc2a = TC2_A(self._context)
dc3s = DC3_S(self._context)
tc3s = TC3_S(self._context)
mtc = MTC(self._context)
l = L(self._context)
yieldObj = {}
yieldObj['identifier'] = self.getIdentifier()
yieldObj['variants'] = [self.getVariant(),
f"A:{dc2a.getVariant()}",
f"A:{tc2a.getVariant()}",
f"S:{dc3s.getVariant()}",
f"S:{tc3s.getVariant()}",
mtc.getVariant(),
j.getVariant()]
l.getVariant()]
yieldObj['payload'] = self.getPayload(dc2a.getPayload(), tc2a.getPayload(), dc3s.getPayload(), tc3s.getPayload())
yieldObj['payload'] = self.getPayload()
yieldObj['assertSelectors'] = ['M', 'AD', 'AT', 'SD', 'ST', 'MT', 'J']
yieldObj['assertSelectors'] = ['B', 'L']
yieldObj['assertFuncs'] = [self.assertFunc,
dc2a.assertFunc,
tc2a.assertFunc,
dc3s.assertFunc,
tc3s.assertFunc,
mtc.assertFunc,
j.assertFunc]
l.assertFunc]
yieldObj['shouldFail'] = (self.shouldFail()
| dc2a.shouldFail()
| tc2a.shouldFail()
| dc3s.shouldFail()
| tc3s.shouldFail()
| mtc.shouldFail()
| j.shouldFail())
| l.shouldFail())
yield yieldObj

@ -9,8 +9,11 @@ from .basename_combinator import BasenameCombinator
from .indicator_combinator import IndicatorCombinator
from .label_combinator import LabelCombinator
from .title_combinator import TitleCombinator
from .site_combinator import SiteCombinator
class BasenameCombinator2(BasenameCombinator):
"""documentation_site"""
VARIANT = 'B2'
@ -25,16 +28,10 @@ class BasenameCombinator2(BasenameCombinator):
return BasenameCombinator2.VARIANT
def getPayload(self):
return {'To': 'Fanz',
'Yolo': 'Holo'}
return ''
def assertFunc(self, mediaDescriptor: MediaDescriptor):
mediaTags = mediaDescriptor.getTags()
assert 'To' in mediaTags.keys(), "'To' not in media tag keys"
assert mediaTags.keys()['To'] == 'Fanz', "Media tag value not 'Fanz' for key 'To'"
assert 'Yolo' in mediaTags.keys(), "'Yolo' not in media tag keys"
assert mediaTags.keys()['Yolo'] == 'Holo', "Media tag value not 'Holo' for key 'Yolo'"
pass
def shouldFail(self):
return False
@ -42,51 +39,28 @@ class BasenameCombinator2(BasenameCombinator):
def getYield(self):
for L in LabelCombinator.getAllClassReferences():
for I in IndicatorCombinator.getAllClassReferences():
for TC2_A in TrackTagCombinator2.getAllClassReferences():
for DC3_S in DispositionCombinator3.getAllClassReferences():
for TC3_S in TrackTagCombinator3.getAllClassReferences():
for J in JellyfinCombinator.getAllClassReferences():
# for I in IndicatorCombinator.getAllClassReferences():
# for S in SiteCombinator.getAllClassReferences():
# for T in TitleCombinator.getAllClassReferences():
#
j = J(self._context)
self._context['use_jellyfin'] = j.getPayload()
dc2a = DC2_A(self._context)
tc2a = TC2_A(self._context)
dc3s = DC3_S(self._context)
tc3s = TC3_S(self._context)
mtc = MTC(self._context)
l = L(self._context)
yieldObj = {}
yieldObj['identifier'] = self.getIdentifier()
yieldObj['variants'] = [self.getVariant(),
f"A:{dc2a.getVariant()}",
f"A:{tc2a.getVariant()}",
f"S:{dc3s.getVariant()}",
f"S:{tc3s.getVariant()}",
mtc.getVariant(),
j.getVariant()]
l.getVariant()]
yieldObj['payload'] = self.getPayload(dc2a.getPayload(), tc2a.getPayload(), dc3s.getPayload(), tc3s.getPayload())
yieldObj['payload'] = {'label': l.getPayload()}
yieldObj['assertSelectors'] = ['M', 'AD', 'AT', 'SD', 'ST', 'MT', 'J']
yieldObj['assertSelectors'] = ['B', 'L']
yieldObj['assertFuncs'] = [self.assertFunc,
dc2a.assertFunc,
tc2a.assertFunc,
dc3s.assertFunc,
tc3s.assertFunc,
mtc.assertFunc,
j.assertFunc]
l.assertFunc]
yieldObj['shouldFail'] = (self.shouldFail()
| dc2a.shouldFail()
| tc2a.shouldFail()
| dc3s.shouldFail()
| tc3s.shouldFail()
| mtc.shouldFail()
| j.shouldFail())
| l.shouldFail())
yield yieldObj

@ -13,6 +13,18 @@ class IndicatorCombinator():
def getIdentifier(self):
return IndicatorCombinator.IDENTIFIER
def getPayload(self, season: int = -1, episode: int = -1):
if season == -1 and episode == -1:
return {
'variant': 'S00E00',
'indicator': ''
}
else:
return {
'variant': f"S{season+1:02d}E{episode+1:02d}",
'indicator': f"S{season+1:02d}E{episode+1:02d}"
}
def assertFunc(self, testObj = {}):
pass
@ -21,6 +33,7 @@ class IndicatorCombinator():
def getYield(self):
yield self.getPayload()
for season in range(IndicatorCombinator.MAX_SEASON):
for episode in range(IndicatorCombinator.MAX_EPISODE):
yield f"S{season+1:02d}E{episode+1:02d}"
yield self.getPayload(season, episode)

@ -5,7 +5,7 @@ class LabelCombinator():
IDENTIFIER = 'label'
PREFIX = 'label_combinator_'
LABEL = 'media'
LABEL = 'ffx'
def __init__(self, context = None):
self._context = context

@ -98,10 +98,16 @@ class Scenario():
def clearTestDirectory(self):
testFiles = glob.glob("f{self._testDirectory}/*")
testFiles = glob.glob(f"{self._testDirectory}/*")
for f in testFiles:
os.remove(f)
def getFilePathsInTestDirectory(self):
return [f for f in glob.glob(f"{self._testDirectory}/*")]
def getFilenamesInTestDirectory(self):
return [os.path.basename(f) for f in self.getFilePathsInTestDirectory()]
@staticmethod
def list():

@ -1,4 +1,4 @@
import os, sys, click
import os, sys, click, glob
from .scenario import Scenario
@ -13,7 +13,9 @@ from ffx.track_descriptor import TrackDescriptor
from ffx.track_type import TrackType
from ffx.track_disposition import TrackDisposition
from ffx.test.media_combinator import MediaCombinator
from ffx.test.media_combinator_0 import MediaCombinator0
from ffx.test.basename_combinator import BasenameCombinator
class Scenario1(Scenario):
@ -21,6 +23,9 @@ class Scenario1(Scenario):
Converting to VaA, vp9/opus/opus
No tmdb, default parameters"""
TESTFILE_EXTENSION = 'mkv'
EXPECTED_EXTENSION = 'webm'
def __init__(self, context):
context['use_jellyfin'] = True
@ -30,6 +35,9 @@ class Scenario1(Scenario):
super().__init__(context)
def getScenario(self):
return self.__class__.__name__[8:]
def job(self, yieldObj: dict):
@ -42,17 +50,20 @@ class Scenario1(Scenario):
variantIdentifier = '-'.join(variantList)
variantLabel = f"{self.__class__.__name__} Variant {variantIdentifier}"
sourceMediaDescriptor: MediaDescriptor = yieldObj['payload']
mc0 = MediaCombinator0(context = testContext)
sourceMediaDescriptor: MediaDescriptor = mc0.getPayload()
assertSelectorList: list = yieldObj['assertSelectors']
assertFuncList = yieldObj['assertFuncs']
shouldFail = yieldObj['shouldFail']
try:
jellyfinSelectorIndex = assertSelectorList.index('J')
jellyfinVariant = variantList[jellyfinSelectorIndex]
testContext['use_jellyfin'] = jellyfinVariant == 'J1'
except ValueError:
jellyfinSelectorIndex = -1
variantPayload = yieldObj['payload']
variantBasename = variantPayload['basename']
variantFilenameLabel = variantPayload['label']
expectedBasename = variantPayload['expectedBasename']
variantFilename = f"{variantBasename}.{Scenario1.TESTFILE_EXTENSION}"
expectedFilename = f"{expectedBasename}.{Scenario1.EXPECTED_EXTENSION}"
if self._context['test_variant'] and variantIdentifier != self._context['test_variant']:
@ -61,22 +72,48 @@ class Scenario1(Scenario):
self._logger.debug(f"Running Job: {variantLabel}")
# Phase 1: Setup source files
if not variantBasename:
raise ValueError(f"{variantLabel}: Testfile basename is falsy")
self.clearTestDirectory()
mediaFilePath = createMediaTestFile(mediaDescriptor=sourceMediaDescriptor, directory=self._testDirectory, logger=self._logger, length = 2)
self._logger.debug(f"Creating test file: {variantFilename}")
mediaFilePath = createMediaTestFile(mediaDescriptor=sourceMediaDescriptor,
baseName=variantBasename,
directory=self._testDirectory,
logger=self._logger,
length = 2)
# # Phase 2: Prepare database
#
# Phase 3: Run ffx
commandSequence = [sys.executable,
self._ffxExecutablePath,
'convert',
self._ffxExecutablePath]
# if self._context['verbosity']:
# commandSequence += ['--verbose',
# str(self._context['verbosity'])]
commandSequence += ['convert',
mediaFilePath,
'--no-prompt']
if variantFilenameLabel:
commandSequence += ['--label', variantFilenameLabel]
if not testContext['use_jellyfin']:
commandSequence += ['--no-jellyfin']
commandSequence += ['--no-pattern']
commandSequence += ['--no-tmdb']
self._logger.debug(f"{variantLabel}: Test sequence: {commandSequence}")
out, err, rc = executeProcess(commandSequence, directory = self._testDirectory)
@ -86,8 +123,14 @@ class Scenario1(Scenario):
if rc:
self._logger.debug(f"{variantLabel}: Process returned ERROR {rc} ({err})")
# Phase 4: Evaluate results
# Phase 4: Find result files
resultFilenames = [rf for rf in self.getFilenamesInTestDirectory() if rf != 'ffmpeg2pass-0.log' and rf != variantFilename]
self._logger.debug(f"{variantLabel}: Result filenames: {resultFilenames}")
# Phase 5: Evaluate results
try:
jobFailed = bool(rc)
@ -98,18 +141,18 @@ class Scenario1(Scenario):
if not jobFailed:
resultFile = os.path.join(self._testDirectory, 'media.webm')
assert os.path.isfile(resultFile), f"Result file 'media.webm' in path '{self._testDirectory}' wasn't created"
expectedResultFilePath = os.path.join(self._testDirectory, f"{expectedFilename}")
resultFileProperties = FileProperties(testContext, resultFile)
resultMediaDescriptor = resultFileProperties.getMediaDescriptor()
assert os.path.isfile(expectedResultFilePath), f"Result file {expectedFilename} in path '{self._testDirectory}' wasn't created"
if testContext['use_jellyfin']:
sourceMediaDescriptor.applyJellyfinOrder()
resultMediaDescriptor.applySourceIndices(sourceMediaDescriptor)
resultMediaTracks = resultMediaDescriptor.getAllTrackDescriptors()
# resultFileProperties = FileProperties(testContext, resultFile)
# resultMediaDescriptor = resultFileProperties.getMediaDescriptor()
#
# if testContext['use_jellyfin']:
# sourceMediaDescriptor.applyJellyfinOrder()
# resultMediaDescriptor.applySourceIndices(sourceMediaDescriptor)
#
# resultMediaTracks = resultMediaDescriptor.getAllTrackDescriptors()
for assertIndex in range(len(assertSelectorList)):
@ -117,22 +160,15 @@ class Scenario1(Scenario):
assertFunc = assertFuncList[assertIndex]
assertVariant = variantList[assertIndex]
if assertSelector == 'M':
if assertSelector == 'B':
#TODO: per file find
testObj = {'filenames': resultFilenames}
assertFunc(testObj=testObj)
if assertSelector == 'L':
assertFunc()
for variantIndex in range(len(assertVariant)):
assert assertVariant[variantIndex].lower() == resultMediaTracks[variantIndex].getType().indicator(), f"Stream #{variantIndex} is not of type {resultMediaTracks[variantIndex].getType().label()}"
elif assertSelector == 'AD' or assertSelector == 'AT':
assertFunc(resultMediaDescriptor.getAudioTracks())
elif assertSelector == 'SD' or assertSelector == 'ST':
assertFunc(resultMediaDescriptor.getSubtitleTracks())
elif type(assertSelector) is str:
if assertSelector == 'J':
if assertSelector == 'I':
assertFunc()
self._reportLogger.info(f"{variantLabel}: Test passed")
except AssertionError as ae:
@ -141,9 +177,8 @@ class Scenario1(Scenario):
def run(self):
MC_list = MediaCombinator.getAllClassReferences()
for MC in MC_list:
self._logger.debug(f"MC={MC.__name__}")
mc = MC(context = self._context)
for y in mc.getYield():
for BC in BasenameCombinator.getAllClassReferences():
self._logger.debug(f"BC={BC.__name__}")
bc = BC(context = self._context)
for y in bc.getYield():
self.job(y)

@ -30,122 +30,122 @@ class Scenario2(Scenario):
super().__init__(context)
def getScenario(self):
return self.__class__.__name__[8:]
# def job(self, yieldObj: dict):
#
# testContext = self._context.copy()
#
# identifier = yieldObj['identifier']
# variantList = yieldObj['variants']
#
# variantIdentifier = '-'.join(variantList)
# variantLabel = f"{self.__class__.__name__} Variant {variantIdentifier}"
#
# sourceMediaDescriptor: MediaDescriptor = yieldObj['payload']
# assertSelectorList: list = yieldObj['assertSelectors']
# assertFuncList = yieldObj['assertFuncs']
# shouldFail = yieldObj['shouldFail']
#
# try:
# jellyfinSelectorIndex = assertSelectorList.index('J')
# jellyfinVariant = variantList[jellyfinSelectorIndex]
# testContext['use_jellyfin'] = jellyfinVariant == 'J1'
# except ValueError:
# jellyfinSelectorIndex = -1
#
#
# if self._context['test_variant'] and variantIdentifier != self._context['test_variant']:
# return
#
#
# self._logger.debug(f"Running Job: {variantLabel}")
#
# # Phase 1: Setup source files
# self.clearTestDirectory()
# mediaFilePath = createMediaTestFile(mediaDescriptor=sourceMediaDescriptor, directory=self._testDirectory, logger=self._logger, length = 2)
#
# # # Phase 2: Prepare database
# #
# # Phase 3: Run ffx
# commandSequence = [sys.executable,
# self._ffxExecutablePath,
# 'convert',
# mediaFilePath,
# '--no-prompt']
#
# if not testContext['use_jellyfin']:
# commandSequence += ['--no-jellyfin']
#
# self._logger.debug(f"{variantLabel}: Test sequence: {commandSequence}")
#
# out, err, rc = executeProcess(commandSequence, directory = self._testDirectory)
#
# if out:
# self._logger.debug(f"{variantLabel}: Process output: {out}")
# if rc:
# self._logger.debug(f"{variantLabel}: Process returned ERROR {rc} ({err})")
#
# # Phase 4: Evaluate results
#
# try:
#
# jobFailed = bool(rc)
#
# self._logger.debug(f"{variantLabel}: Should fail: {shouldFail} / actually failed: {jobFailed}")
#
# assert jobFailed == shouldFail, f"Process {'failed' if jobFailed else 'did not fail'}"
#
# if not jobFailed:
#
# resultFile = os.path.join(self._testDirectory, 'media.webm')
#
# assert os.path.isfile(resultFile), f"Result file 'media.webm' in path '{self._testDirectory}' wasn't created"
#
# resultFileProperties = FileProperties(testContext, resultFile)
# resultMediaDescriptor = resultFileProperties.getMediaDescriptor()
#
# if testContext['use_jellyfin']:
# sourceMediaDescriptor.applyJellyfinOrder()
# resultMediaDescriptor.applySourceIndices(sourceMediaDescriptor)
#
# resultMediaTracks = resultMediaDescriptor.getAllTrackDescriptors()
#
# for assertIndex in range(len(assertSelectorList)):
#
# assertSelector = assertSelectorList[assertIndex]
# assertFunc = assertFuncList[assertIndex]
# assertVariant = variantList[assertIndex]
#
# if assertSelector == 'M':
# assertFunc()
# for variantIndex in range(len(assertVariant)):
# assert assertVariant[variantIndex].lower() == resultMediaTracks[variantIndex].getType().indicator(), f"Stream #{variantIndex} is not of type {resultMediaTracks[variantIndex].getType().label()}"
#
# elif assertSelector == 'AD' or assertSelector == 'AT':
# assertFunc(resultMediaDescriptor.getAudioTracks())
#
# elif assertSelector == 'SD' or assertSelector == 'ST':
# assertFunc(resultMediaDescriptor.getSubtitleTracks())
#
# elif type(assertSelector) is str:
# if assertSelector == 'J':
# assertFunc()
#
#
# self._reportLogger.info(f"{variantLabel}: Test passed")
#
# except AssertionError as ae:
def job(self, yieldObj: dict):
testContext = self._context.copy()
identifier = yieldObj['identifier']
variantList = yieldObj['variants']
variantIdentifier = '-'.join(variantList)
variantLabel = f"{self.__class__.__name__} Variant {variantIdentifier}"
sourceMediaDescriptor: MediaDescriptor = yieldObj['payload']
assertSelectorList: list = yieldObj['assertSelectors']
assertFuncList = yieldObj['assertFuncs']
shouldFail = yieldObj['shouldFail']
try:
jellyfinSelectorIndex = assertSelectorList.index('J')
jellyfinVariant = variantList[jellyfinSelectorIndex]
testContext['use_jellyfin'] = jellyfinVariant == 'J1'
except ValueError:
jellyfinSelectorIndex = -1
if self._context['test_variant'] and variantIdentifier != self._context['test_variant']:
return
self._logger.debug(f"Running Job: {variantLabel}")
# Phase 1: Setup source files
self.clearTestDirectory()
mediaFilePath = createMediaTestFile(mediaDescriptor=sourceMediaDescriptor, directory=self._testDirectory, logger=self._logger, length = 2)
# # Phase 2: Prepare database
#
# self._reportLogger.error(f"{variantLabel}: Test FAILED ({ae})")
# Phase 3: Run ffx
commandSequence = [sys.executable,
self._ffxExecutablePath,
'convert',
mediaFilePath,
'--no-prompt']
if not testContext['use_jellyfin']:
commandSequence += ['--no-jellyfin']
self._logger.debug(f"{variantLabel}: Test sequence: {commandSequence}")
out, err, rc = executeProcess(commandSequence, directory = self._testDirectory)
if out:
self._logger.debug(f"{variantLabel}: Process output: {out}")
if rc:
self._logger.debug(f"{variantLabel}: Process returned ERROR {rc} ({err})")
# Phase 4: Evaluate results
try:
jobFailed = bool(rc)
self._logger.debug(f"{variantLabel}: Should fail: {shouldFail} / actually failed: {jobFailed}")
assert jobFailed == shouldFail, f"Process {'failed' if jobFailed else 'did not fail'}"
if not jobFailed:
resultFile = os.path.join(self._testDirectory, 'media.webm')
assert os.path.isfile(resultFile), f"Result file 'media.webm' in path '{self._testDirectory}' wasn't created"
resultFileProperties = FileProperties(testContext, resultFile)
resultMediaDescriptor = resultFileProperties.getMediaDescriptor()
if testContext['use_jellyfin']:
sourceMediaDescriptor.applyJellyfinOrder()
resultMediaDescriptor.applySourceIndices(sourceMediaDescriptor)
resultMediaTracks = resultMediaDescriptor.getAllTrackDescriptors()
for assertIndex in range(len(assertSelectorList)):
assertSelector = assertSelectorList[assertIndex]
assertFunc = assertFuncList[assertIndex]
assertVariant = variantList[assertIndex]
if assertSelector == 'M':
assertFunc()
for variantIndex in range(len(assertVariant)):
assert assertVariant[variantIndex].lower() == resultMediaTracks[variantIndex].getType().indicator(), f"Stream #{variantIndex} is not of type {resultMediaTracks[variantIndex].getType().label()}"
elif assertSelector == 'AD' or assertSelector == 'AT':
assertFunc(resultMediaDescriptor.getAudioTracks())
elif assertSelector == 'SD' or assertSelector == 'ST':
assertFunc(resultMediaDescriptor.getSubtitleTracks())
elif type(assertSelector) is str:
if assertSelector == 'J':
assertFunc()
self._reportLogger.info(f"{variantLabel}: Test passed")
except AssertionError as ae:
self._reportLogger.error(f"{variantLabel}: Test FAILED ({ae})")
def run(self):
pass
# MC_list = MediaCombinator.getAllClassReferences()
# for MC in MC_list:
# self._logger.debug(f"MC={MC.__name__}")
# mc = MC(context = self._context)
# for y in mc.getYield():
# self.job(y)
MC_list = MediaCombinator.getAllClassReferences()
for MC in MC_list:
self._logger.debug(f"MC={MC.__name__}")
mc = MC(context = self._context)
for y in mc.getYield():
self.job(y)

@ -5,6 +5,9 @@ class Scenario3(Scenario):
def __init__(self, context):
super().__init__(context)
def getScenario(self):
return self.__class__.__name__[8:]
def run(self):
pass
# self._testDirectory

@ -74,21 +74,25 @@ def ffx(ctx, verbose, dry_run):
# Another subcommand
@ffx.command()
@click.pass_context
@click.option('--scenario', type=str, default='', help='Only run tests from this scenario')
@click.option('--variant', type=str, default='', help='Only run this test variant')
def run(ctx, variant):
def run(ctx, scenario, variant):
"""Run ffx test sequences"""
ctx.obj['logger'].info('Starting FFX test runs')
ctx.obj['test_variant'] = variant
for scenarioIdentifier in Scenario.list():
for si in Scenario.list():
scenario = Scenario.getClassReference(scenarioIdentifier)(ctx.obj)
scen = Scenario.getClassReference(si)(ctx.obj)
ctx.obj['logger'].debug(f"Running scenario {scenarioIdentifier}")
if scenario and scenario != scen.getScenario():
continue
scenario.run()
ctx.obj['logger'].debug(f"Running scenario {si}")
scen.run()
@ffx.command()

Loading…
Cancel
Save