click-textual
Maveno 11 months ago
parent 3f0efab49b
commit 95d858b2c6

@ -0,0 +1,13 @@
class Combinator():
def __init__(self, SubCombinations: dict):
self._SubCombinators = SubCombinations
def getPayload(self):
pass
def assertFunc(self, testObj):
pass
def getYield(yieldObj: dict):
pass

@ -0,0 +1,76 @@
import os, sys, importlib, glob, inspect, itertools
class DispositionCombinator2():
def __init__(self, context = None):
self._context = context
self._logger = context['logger']
self._reportLogger = context['report_logger']
def getIdentifier(self):
return self._identifier
def getVariant(self):
return self._variant
@staticmethod
def list():
basePath = os.path.dirname(__file__)
return [os.path.basename(p)[25:-3]
for p
in glob.glob(f"{ basePath }/disposition_combinator_2_*.py", recursive = True)
if p != __file__]
@staticmethod
def getClassReference(identifier):
importlib.import_module(f"ffx.test.disposition_combinator_2_{ identifier }")
for name, obj in inspect.getmembers(sys.modules[f"ffx.test.disposition_combinator_2_{ identifier }"]):
#HINT: Excluding DispositionCombination as it seems to be included by import (?)
if inspect.isclass(obj) and name != 'DispositionCombinator2' and name.startswith('DispositionCombinator2'):
return obj
@staticmethod
def getAllClassReferences():
return [DispositionCombinator2.getClassReference(i) for i in DispositionCombinator2.list()]
def getYield(self, yieldObj: dict = {}):
if not 'asserts' in yieldObj.keys():
yieldObj['asserts'] = []
if not 'variants' in yieldObj.keys():
yieldObj['variants'] = set()
if not 'shouldFail' in yieldObj.keys():
yieldObj['shouldFail'] = False
yieldObj[self.getIdentifier()] = self.getPayload()
#assertFuncList = yieldObj.pop('asserts', [])
#variants = yieldObj.pop('variants', set())
sc_list = [(k,v) for k,v in self._SubCombinators.items()]
#y = []
for p in itertools.product(*[v.getYield() for k,v in sc_list]):
z = {}
for i in range(len(p)):
z[sc_list[i][0]] = p[i]
#y.append(z)
for k,v in z.items():
yieldObj[k] = v
assertFunc = getattr(v, "assertFunc", None)
if callable(assertFunc):
yieldObj['asserts'].append(assertFunc)
shouldFailFunc = getattr(v, "shouldFail", None)
if callable(shouldFailFunc):
if shouldFailFunc():
yieldObj['shouldFail'] = True
yieldObj['variants'].add(v.getVariant())
#HINT: y should now contain a list of dicts with labels as keys and the SubCombinator classes as keys
yield yieldObj

@ -0,0 +1,28 @@
import os, sys, importlib, glob, inspect
from ffx.track_disposition import TrackDisposition
from .disposition_combinator_2 import DispositionCombinator2
class DispositionCombinator20(DispositionCombinator2):
IDENTIFIER = 'disposition2'
VARIANT = 'D00'
def __init__(self, context):
super().__init__(context)
self._identifier = DispositionCombinator20.IDENTIFIER
self._variant = DispositionCombinator20.VARIANT
def getPayload(self):
return (set(), # 0
set()) # 1
def assertFunc(self, trackDescriptors):
assert not trackDescriptors[0].getDispositionFlag(TrackDisposition.DEFAULT), f"Stream #0 has set default disposition"
assert not trackDescriptors[1].getDispositionFlag(TrackDisposition.DEFAULT), f"Stream #1 has set default disposition"
def shouldFail(self):
return False

@ -0,0 +1,32 @@
import os, sys, importlib, glob, inspect
from ffx.track_disposition import TrackDisposition
from .disposition_combinator_2 import DispositionCombinator2
class DispositionCombinator21(DispositionCombinator2):
IDENTIFIER = 'disposition2'
VARIANT = 'D10'
def __init__(self, context):
super().__init__(context)
self._identifier = DispositionCombinator21.IDENTIFIER
self._variant = DispositionCombinator21.VARIANT
def getPayload(self):
return (set([TrackDisposition.DEFAULT]), # 0
set()) # 1
def assertFunc(self, trackDescriptors):
if self._context['use_jellyfin']:
assert not trackDescriptors[0].getDispositionFlag(TrackDisposition.DEFAULT), f"Stream #0 has set default disposition"
assert trackDescriptors[1].getDispositionFlag(TrackDisposition.DEFAULT), f"Stream #1 has not set default disposition"
else:
assert trackDescriptors[0].getDispositionFlag(TrackDisposition.DEFAULT), f"Stream #0 has not set default disposition"
assert not trackDescriptors[1].getDispositionFlag(TrackDisposition.DEFAULT), f"Stream #1 has set default disposition"
def shouldFail(self):
return False

@ -0,0 +1,28 @@
import os, sys, importlib, glob, inspect
from ffx.track_disposition import TrackDisposition
from .disposition_combinator_2 import DispositionCombinator2
class DispositionCombinator22(DispositionCombinator2):
IDENTIFIER = 'disposition2'
VARIANT = 'D01'
def __init__(self, context):
super().__init__(context)
self._identifier = DispositionCombinator22.IDENTIFIER
self._variant = DispositionCombinator22.VARIANT
def getPayload(self):
return (set(), # 0
set([TrackDisposition.DEFAULT])) # 1
def assertFunc(self, trackDescriptors):
assert trackDescriptors[0].getDispositionFlag(TrackDisposition.DEFAULT), f"Stream #0 has set default disposition"
assert not trackDescriptors[1].getDispositionFlag(TrackDisposition.DEFAULT), f"Stream #1 has not set default disposition"
def shouldFail(self):
return False

@ -0,0 +1,27 @@
import os, sys, importlib, glob, inspect
from ffx.track_disposition import TrackDisposition
from .disposition_combinator_2 import DispositionCombinator2
class DispositionCombinator23(DispositionCombinator2):
IDENTIFIER = 'disposition2'
VARIANT = 'D11'
def __init__(self, context):
super().__init__(context)
self._identifier = DispositionCombinator23.IDENTIFIER
self._variant = DispositionCombinator23.VARIANT
def getPayload(self):
return (set([TrackDisposition.DEFAULT]), # 0
set([TrackDisposition.DEFAULT])) # 1
# def assertFunc(self, trackDescriptors):
# pass
def shouldFail(self):
return True

@ -0,0 +1,76 @@
import os, sys, importlib, glob, inspect, itertools
class DispositionCombinator3():
def __init__(self, context = None):
self._context = context
self._logger = context['logger']
self._reportLogger = context['report_logger']
def getIdentifier(self):
return self._identifier
def getVariant(self):
return self._variant
@staticmethod
def list():
basePath = os.path.dirname(__file__)
return [os.path.basename(p)[25:-3]
for p
in glob.glob(f"{ basePath }/disposition_combinator_3_*.py", recursive = True)
if p != __file__]
@staticmethod
def getClassReference(identifier):
importlib.import_module(f"ffx.test.disposition_combinator_3_{ identifier }")
for name, obj in inspect.getmembers(sys.modules[f"ffx.test.disposition_combinator_3_{ identifier }"]):
#HINT: Excluding DispositionCombination as it seems to be included by import (?)
if inspect.isclass(obj) and name != 'DispositionCombinator3' and name.startswith('DispositionCombinator3'):
return obj
@staticmethod
def getAllClassReferences():
return [DispositionCombinator3.getClassReference(i) for i in DispositionCombinator3.list()]
def getYield(self, yieldObj: dict = {}):
if not 'asserts' in yieldObj.keys():
yieldObj['asserts'] = []
if not 'variants' in yieldObj.keys():
yieldObj['variants'] = set()
if not 'shouldFail' in yieldObj.keys():
yieldObj['shouldFail'] = False
yieldObj[self.getIdentifier()] = self.getPayload()
#assertFuncList = yieldObj.pop('asserts', [])
#variants = yieldObj.pop('variants', set())
sc_list = [(k,v) for k,v in self._SubCombinators.items()]
#y = []
for p in itertools.product(*[v.getYield() for k,v in sc_list]):
z = {}
for i in range(len(p)):
z[sc_list[i][0]] = p[i]
#y.append(z)
for k,v in z.items():
yieldObj[k] = v
assertFunc = getattr(v, "assertFunc", None)
if callable(assertFunc):
yieldObj['asserts'].append(assertFunc)
shouldFailFunc = getattr(v, "shouldFail", None)
if callable(shouldFailFunc):
if shouldFailFunc():
yieldObj['shouldFail'] = True
yieldObj['variants'].add(v.getVariant())
#HINT: y should now contain a list of dicts with labels as keys and the SubCombinator classes as keys
yield yieldObj

@ -0,0 +1,30 @@
import os, sys, importlib, glob, inspect
from ffx.track_disposition import TrackDisposition
from .disposition_combinator_3 import DispositionCombinator3
class DispositionCombinator30(DispositionCombinator3):
IDENTIFIER = 'disposition3'
VARIANT = 'D000'
def __init__(self, context):
super().__init__(context)
self._identifier = DispositionCombinator30.IDENTIFIER
self._variant = DispositionCombinator30.VARIANT
def getPayload(self):
return (set(), # 0
set(), # 1
set()) # 2
def assertFunc(self, trackDescriptors):
assert not trackDescriptors[0].getDispositionFlag(TrackDisposition.DEFAULT), f"Stream #0 has set default disposition"
assert not trackDescriptors[1].getDispositionFlag(TrackDisposition.DEFAULT), f"Stream #1 has set default disposition"
assert not trackDescriptors[2].getDispositionFlag(TrackDisposition.DEFAULT), f"Stream #2 has set default disposition"
def shouldFail(self):
return False

@ -0,0 +1,35 @@
import os, sys, importlib, glob, inspect
from ffx.track_disposition import TrackDisposition
from .disposition_combinator_3 import DispositionCombinator3
class DispositionCombinator31(DispositionCombinator3):
IDENTIFIER = 'disposition3'
VARIANT = 'D100'
def __init__(self, context):
super().__init__(context)
self._identifier = DispositionCombinator31.IDENTIFIER
self._variant = DispositionCombinator31.VARIANT
def getPayload(self):
return (set([TrackDisposition.DEFAULT]), # 0
set(), # 1
set()) # 2
def assertFunc(self, trackDescriptors):
if self._context['use_jellyfin']:
assert not trackDescriptors[0].getDispositionFlag(TrackDisposition.DEFAULT), f"Stream #0 has set default disposition"
assert not trackDescriptors[1].getDispositionFlag(TrackDisposition.DEFAULT), f"Stream #1 has set default disposition"
assert trackDescriptors[2].getDispositionFlag(TrackDisposition.DEFAULT), f"Stream #2 has not set default disposition"
else:
assert trackDescriptors[0].getDispositionFlag(TrackDisposition.DEFAULT), f"Stream #0 has not set default disposition"
assert not trackDescriptors[1].getDispositionFlag(TrackDisposition.DEFAULT), f"Stream #1 has set default disposition"
assert not trackDescriptors[2].getDispositionFlag(TrackDisposition.DEFAULT), f"Stream #2 has set default disposition"
def shouldFail(self):
return False

@ -0,0 +1,35 @@
import os, sys, importlib, glob, inspect
from ffx.track_disposition import TrackDisposition
from .disposition_combinator_3 import DispositionCombinator3
class DispositionCombinator32(DispositionCombinator3):
IDENTIFIER = 'disposition3'
VARIANT = 'D010'
def __init__(self, context):
super().__init__(context)
self._identifier = DispositionCombinator32.IDENTIFIER
self._variant = DispositionCombinator32.VARIANT
def getPayload(self):
return (set(), # 0
set([TrackDisposition.DEFAULT]), # 1
set()) # 2
def assertFunc(self, trackDescriptors):
if self._context['use_jellyfin']:
assert not trackDescriptors[0].getDispositionFlag(TrackDisposition.DEFAULT), f"Stream #0 has set default disposition"
assert not trackDescriptors[1].getDispositionFlag(TrackDisposition.DEFAULT), f"Stream #1 has set default disposition"
assert trackDescriptors[2].getDispositionFlag(TrackDisposition.DEFAULT), f"Stream #2 has not set default disposition"
else:
assert not trackDescriptors[0].getDispositionFlag(TrackDisposition.DEFAULT), f"Stream #0 has set default disposition"
assert trackDescriptors[1].getDispositionFlag(TrackDisposition.DEFAULT), f"Stream #1 has not set default disposition"
assert not trackDescriptors[2].getDispositionFlag(TrackDisposition.DEFAULT), f"Stream #2 has set default disposition"
def shouldFail(self):
return False

@ -0,0 +1,31 @@
import os, sys, importlib, glob, inspect
from ffx.track_disposition import TrackDisposition
from .disposition_combinator_3 import DispositionCombinator3
class DispositionCombinator33(DispositionCombinator3):
IDENTIFIER = 'disposition3'
VARIANT = 'D001'
def __init__(self, context):
super().__init__(context)
self._identifier = DispositionCombinator33.IDENTIFIER
self._variant = DispositionCombinator33.VARIANT
def getPayload(self):
return (set(), # 0
set(), # 1
set([TrackDisposition.DEFAULT])) # 2
def assertFunc(self, trackDescriptors):
if self._context['use_jellyfin']:
assert not trackDescriptors[0].getDispositionFlag(TrackDisposition.DEFAULT), f"Stream #0 has set default disposition"
assert not trackDescriptors[1].getDispositionFlag(TrackDisposition.DEFAULT), f"Stream #1 has set default disposition"
assert trackDescriptors[2].getDispositionFlag(TrackDisposition.DEFAULT), f"Stream #2 has not set default disposition"
def shouldFail(self):
return False

@ -0,0 +1,27 @@
import os, sys, importlib, glob, inspect
from ffx.track_disposition import TrackDisposition
from .disposition_combinator_3 import DispositionCombinator3
class DispositionCombinator34(DispositionCombinator3):
IDENTIFIER = 'disposition3'
VARIANT = 'D101'
def __init__(self, context):
super().__init__(context)
self._identifier = DispositionCombinator34.IDENTIFIER
self._variant = DispositionCombinator34.VARIANT
def getPayload(self):
return (set([TrackDisposition.DEFAULT]), # 0
set(), # 1
set([TrackDisposition.DEFAULT])) # 2
# def assertFunc(self, trackDescriptors):
# pass
def shouldFail(self):
return True

@ -13,7 +13,7 @@ from ffx.track_descriptor import TrackDescriptor
from ffx.track_type import TrackType
from ffx.track_disposition import TrackDisposition
from ffx.test.track_combination import TrackCombination
from ffx.test.track_combinator import TrackCombinator
class Scenario1(Scenario):
@ -24,178 +24,193 @@ class Scenario1(Scenario):
def __init__(self, context):
super().__init__(context)
def run(self):
# TrackCombinations -> asserts
# * Dispositions[2] -> asserts[2]
# * Tags[2] -> asserts[2]
# * MediaTags -> asserts
for tci in TrackCombination.list():
# combinationContext = self._context.copy()
self._context['use_jellyfin'] = True
self._context['use_tmdb'] = False
self._context['use_pattern'] = False
trackCombination = TrackCombination.getClassReference(tci)(self._context)
subcombinations = trackCombination.getSubcombinations()
if not subcombinations is None:
for trackDescriptors, evaluateFunc, shouldFail, combinationIdentifier in subcombinations:
kwargs = {}
kwargs[MediaDescriptor.CONTEXT_KEY] = self._context
kwargs[MediaDescriptor.TRACK_DESCRIPTOR_LIST_KEY] = trackDescriptors
self._logger.debug(f"src track dispositions {[d.getDispositionSet() for d in trackDescriptors]}")
sourceMediaDescriptor = MediaDescriptor(**kwargs)
sourceMediaDescriptor.reindexSubIndices()
# sourceMediaDescriptor.reindexIndices()
# Phase 1: Setup source files
# TODO:
# src track dispositions [set(), set(), {<TrackDisposition.DEFAULT: {'name': 'default', 'index': 0}>}, set(), set()]
# vs
# Stream #0:4: Subtitle: ass (ssa) (default)
mediaFilePath = createMediaTestFile(mediaDescriptor=sourceMediaDescriptor, directory=self._testDirectory)
# Phase 2: Prepare database
# Phase 3: Run ffx
commandSequence = [sys.executable,
self._ffxExecutablePath,
'convert',
mediaFilePath,
'--no-prompt']
if not self._context['use_jellyfin']:
commandSequence += ['--no-jellyfin']
self._logger.debug(f"Scenario1.run(): sub={trackCombination.getIdentifier()},{combinationIdentifier} test sequence: {commandSequence}")
out, err, rc = executeProcess(commandSequence, directory = self._testDirectory)
self._logger.debug(f"Scenario1.run(): sub={trackCombination.getIdentifier()},{combinationIdentifier} process returned: {rc}")
def run(self):
TC_list = TrackCombinator.getAllClassReferences()
for TC in TC_list:
self._logger.debug(f"TC={TC.__name__}")
tc = TC(context = self._context)
for y in tc.getYield():
print(y)
# def run(self):
#
# # TrackCombinations -> asserts
# # * Dispositions[2] -> asserts[2]
# # * Tags[2] -> asserts[2]
# # * MediaTags -> asserts
#
# for tci in TrackCombination.list():
# # combinationContext = self._context.copy()
# self._context['use_jellyfin'] = True
# self._context['use_tmdb'] = False
# self._context['use_pattern'] = False
# trackCombination = TrackCombination.getClassReference(tci)(self._context)
#
# subcombinations = trackCombination.getSubcombinations()
# if not subcombinations is None:
# for trackDescriptors, evaluateFunc, shouldFail, combinationIdentifier in subcombinations:
#
# kwargs = {}
# kwargs[MediaDescriptor.CONTEXT_KEY] = self._context
# kwargs[MediaDescriptor.TRACK_DESCRIPTOR_LIST_KEY] = trackDescriptors
# self._logger.debug(f"src track dispositions {[d.getDispositionSet() for d in trackDescriptors]}")
# sourceMediaDescriptor = MediaDescriptor(**kwargs)
# sourceMediaDescriptor.reindexSubIndices()
# # sourceMediaDescriptor.reindexIndices()
#
# # Phase 1: Setup source files
# # TODO:
# # src track dispositions [set(), set(), {<TrackDisposition.DEFAULT: {'name': 'default', 'index': 0}>}, set(), set()]
# # vs
# # Stream #0:4: Subtitle: ass (ssa) (default)
# mediaFilePath = createMediaTestFile(mediaDescriptor=sourceMediaDescriptor, directory=self._testDirectory)
#
# # Phase 2: Prepare database
#
# # Phase 3: Run ffx
# commandSequence = [sys.executable,
# self._ffxExecutablePath,
# 'convert',
# mediaFilePath,
# '--no-prompt']
#
# if not self._context['use_jellyfin']:
# commandSequence += ['--no-jellyfin']
#
# self._logger.debug(f"Scenario1.run(): sub={trackCombination.getIdentifier()},{combinationIdentifier} test sequence: {commandSequence}")
#
# out, err, rc = executeProcess(commandSequence, directory = self._testDirectory)
#
# self._logger.debug(f"Scenario1.run(): sub={trackCombination.getIdentifier()},{combinationIdentifier} process returned: {rc}")
#
# # if out:
# # self._logger.debug(f"Scenario1.run(): process output: {out}")
# # if rc:
# # self._logger.error(f"Scenario1.run(): process resultet in error {rc}: {err}")
#
# # Phase 4: Evaluate results
#
# try:
#
# if rc:
# # Stuck with prompt: More than one default subtitle stream detected! Please select stream:
# # TODO: Catch prompt is add option to fail at prompt
# assert shouldFail, "ffmpeg run didn't fail" if shouldFail else "ffmpeg run failed"
# self._reportLogger.info(f"Scenario 1 sub={trackCombination.getIdentifier()},{combinationIdentifier} test passed")
# else:
#
# resultFile = os.path.join(self._testDirectory, 'media.webm')
#
# assert os.path.isfile(resultFile), f"Result file 'media.webm' in path '{self._testDirectory}' wasn't created"
#
# resultFileProperties = FileProperties(self._context, resultFile)
# resultMediaDescriptor = resultFileProperties.getMediaDescriptor()
#
# resultMediaTracks = resultMediaDescriptor.getAllTrackDescriptors()
#
# assert len(resultMediaTracks) == len(trackDescriptors), f"Result file contains unexpected number of streams"
#
# # assert resultMediaTracks[0].getType() == TrackType.VIDEO, f"Stream #0 is not of type video"
#
# # assert resultMediaTracks[1].getType() == TrackType.AUDIO, f"Stream #1 is not of type audio"
#
# # assert resultMediaTracks[2].getType() == TrackType.AUDIO, f"Stream #2 is not of type audio"
#
# self._logger.debug(f"tgt track dispositions {[d.getDispositionSet() for d in resultMediaDescriptor.getSubtitleTracks()]}")
#
# # assert dispositions
# evaluateFunc(resultMediaTracks)
#
# self._reportLogger.info(f"Scenario 1 sub={trackCombination.getIdentifier()},{combinationIdentifier} test passed")
#
# except AssertionError as ae:
#
# self._reportLogger.error(f"Scenario 1 sub={trackCombination.getIdentifier()},{combinationIdentifier} test failed ({ae})")
#
#
#
#
# def job(self):
#
# self._logger.info(f"Running {self.__class__.__name__}")
#
# kwargs = {}
# kwargs[TrackDescriptor.CONTEXT_KEY] = self._context
# kwargs[TrackDescriptor.TRACK_TYPE_KEY] = TrackType.VIDEO
# kwargs[TrackDescriptor.DISPOSITION_SET_KEY] = set([TrackDisposition.DEFAULT])
# trackDescriptor1 = TrackDescriptor(**kwargs)
#
# kwargs = {}
# kwargs[TrackDescriptor.CONTEXT_KEY] = self._context
# kwargs[TrackDescriptor.TRACK_TYPE_KEY] = TrackType.AUDIO
# kwargs[TrackDescriptor.DISPOSITION_SET_KEY] = set([TrackDisposition.DEFAULT])
# trackDescriptor2 = TrackDescriptor(**kwargs)
#
# kwargs = {}
# kwargs[TrackDescriptor.CONTEXT_KEY] = self._context
# kwargs[TrackDescriptor.TRACK_TYPE_KEY] = TrackType.AUDIO
# trackDescriptor3 = TrackDescriptor(**kwargs)
#
# kwargs = {}
# kwargs[MediaDescriptor.CONTEXT_KEY] = self._context
# kwargs[MediaDescriptor.TRACK_DESCRIPTOR_LIST_KEY] = [trackDescriptor1,
# trackDescriptor2,
# trackDescriptor3]
# sourceMediaDescriptor = MediaDescriptor(**kwargs)
# sourceMediaDescriptor.reindexSubIndices()
#
# # Phase 1: Setup source files
# mediaFilePath = createMediaTestFile(mediaDescriptor=sourceMediaDescriptor, directory=self._testDirectory)
#
# # Phase 2: Prepare database
#
# # Phase 3: Run ffx
# commandSequence = [sys.executable,
# self._ffxExecutablePath,
# 'convert',
# mediaFilePath]
#
# self._logger.debug(f"Scenario1.run(): test sequence: {commandSequence}")
#
# out, err, rc = executeProcess(commandSequence, directory = self._testDirectory)
#
# if out:
# self._logger.debug(f"Scenario1.run(): process output: {out}")
# if rc:
# self._logger.error(f"Scenario1.run(): process resultet in error {rc}: {err}")
# Phase 4: Evaluate results
try:
if rc:
# Stuck with prompt: More than one default subtitle stream detected! Please select stream:
# TODO: Catch prompt is add option to fail at prompt
assert shouldFail, "ffmpeg run didn't fail" if shouldFail else "ffmpeg run failed"
self._reportLogger.info(f"Scenario 1 sub={trackCombination.getIdentifier()},{combinationIdentifier} test passed")
else:
resultFile = os.path.join(self._testDirectory, 'media.webm')
assert os.path.isfile(resultFile), f"Result file 'media.webm' in path '{self._testDirectory}' wasn't created"
resultFileProperties = FileProperties(self._context, resultFile)
resultMediaDescriptor = resultFileProperties.getMediaDescriptor()
resultMediaTracks = resultMediaDescriptor.getAllTrackDescriptors()
assert len(resultMediaTracks) == len(trackDescriptors), f"Result file contains unexpected number of streams"
#
# # Phase 4: Evaluate results
#
# try:
#
# resultFile = os.path.join(self._testDirectory, 'media.webm')
#
# assert os.path.isfile(resultFile), f"Result file 'media.webm' in path '{self._testDirectory}' wasn't created"
#
# resultFileProperties = FileProperties(self._context, resultFile)
# resultMediaDescriptor = resultFileProperties.getMediaDescriptor()
#
# resultMediaTracks = resultMediaDescriptor.getAllTrackDescriptors()
#
# assert len(resultMediaTracks) == 3, f"Result file contains unexpected number of streams"
#
# assert resultMediaTracks[0].getType() == TrackType.VIDEO, f"Stream #0 is not of type video"
#
# assert resultMediaTracks[1].getType() == TrackType.AUDIO, f"Stream #1 is not of type audio"
# assert not resultMediaTracks[1].getDispositionFlag(TrackDisposition.DEFAULT), f"Stream #1 has set default disposition"
#
# assert resultMediaTracks[2].getType() == TrackType.AUDIO, f"Stream #2 is not of type audio"
self._logger.debug(f"tgt track dispositions {[d.getDispositionSet() for d in resultMediaDescriptor.getSubtitleTracks()]}")
# assert dispositions
evaluateFunc(resultMediaTracks)
self._reportLogger.info(f"Scenario 1 sub={trackCombination.getIdentifier()},{combinationIdentifier} test passed")
except AssertionError as ae:
self._reportLogger.error(f"Scenario 1 sub={trackCombination.getIdentifier()},{combinationIdentifier} test failed ({ae})")
def job(self):
self._logger.info(f"Running {self.__class__.__name__}")
kwargs = {}
kwargs[TrackDescriptor.CONTEXT_KEY] = self._context
kwargs[TrackDescriptor.TRACK_TYPE_KEY] = TrackType.VIDEO
kwargs[TrackDescriptor.DISPOSITION_SET_KEY] = set([TrackDisposition.DEFAULT])
trackDescriptor1 = TrackDescriptor(**kwargs)
kwargs = {}
kwargs[TrackDescriptor.CONTEXT_KEY] = self._context
kwargs[TrackDescriptor.TRACK_TYPE_KEY] = TrackType.AUDIO
kwargs[TrackDescriptor.DISPOSITION_SET_KEY] = set([TrackDisposition.DEFAULT])
trackDescriptor2 = TrackDescriptor(**kwargs)
kwargs = {}
kwargs[TrackDescriptor.CONTEXT_KEY] = self._context
kwargs[TrackDescriptor.TRACK_TYPE_KEY] = TrackType.AUDIO
trackDescriptor3 = TrackDescriptor(**kwargs)
kwargs = {}
kwargs[MediaDescriptor.CONTEXT_KEY] = self._context
kwargs[MediaDescriptor.TRACK_DESCRIPTOR_LIST_KEY] = [trackDescriptor1,
trackDescriptor2,
trackDescriptor3]
sourceMediaDescriptor = MediaDescriptor(**kwargs)
sourceMediaDescriptor.reindexSubIndices()
# Phase 1: Setup source files
mediaFilePath = createMediaTestFile(mediaDescriptor=sourceMediaDescriptor, directory=self._testDirectory)
# Phase 2: Prepare database
# Phase 3: Run ffx
commandSequence = [sys.executable,
self._ffxExecutablePath,
'convert',
mediaFilePath]
self._logger.debug(f"Scenario1.run(): test sequence: {commandSequence}")
out, err, rc = executeProcess(commandSequence, directory = self._testDirectory)
if out:
self._logger.debug(f"Scenario1.run(): process output: {out}")
if rc:
self._logger.error(f"Scenario1.run(): process resultet in error {rc}: {err}")
# Phase 4: Evaluate results
try:
resultFile = os.path.join(self._testDirectory, 'media.webm')
assert os.path.isfile(resultFile), f"Result file 'media.webm' in path '{self._testDirectory}' wasn't created"
resultFileProperties = FileProperties(self._context, resultFile)
resultMediaDescriptor = resultFileProperties.getMediaDescriptor()
resultMediaTracks = resultMediaDescriptor.getAllTrackDescriptors()
assert len(resultMediaTracks) == 3, f"Result file contains unexpected number of streams"
assert resultMediaTracks[0].getType() == TrackType.VIDEO, f"Stream #0 is not of type video"
assert resultMediaTracks[1].getType() == TrackType.AUDIO, f"Stream #1 is not of type audio"
assert not resultMediaTracks[1].getDispositionFlag(TrackDisposition.DEFAULT), f"Stream #1 has set default disposition"
assert resultMediaTracks[2].getType() == TrackType.AUDIO, f"Stream #2 is not of type audio"
assert resultMediaTracks[2].getDispositionFlag(TrackDisposition.DEFAULT), f"Stream #1 has not set default disposition"
self._reportLogger.info('Scenario 1 test passed')
except AssertionError as ae:
self._reportLogger.error(f"Scenario 1 test failed ({ae})")
# assert resultMediaTracks[2].getDispositionFlag(TrackDisposition.DEFAULT), f"Stream #1 has not set default disposition"
#
# self._reportLogger.info('Scenario 1 test passed')
#
# except AssertionError as ae:
#
# self._reportLogger.error(f"Scenario 1 test failed ({ae})")

@ -0,0 +1,33 @@
import os, sys, importlib, glob, inspect, itertools
class TrackCombinator():
def __init__(self, context = None):
self._context = context
self._logger = context['logger']
self._reportLogger = context['report_logger']
def getIdentifier(self):
return self._identifier
def getVariant(self):
return self._variant
@staticmethod
def list():
basePath = os.path.dirname(__file__)
return [os.path.basename(p)[17:-3]
for p
in glob.glob(f"{ basePath }/track_combinator_*.py", recursive = True)
if p != __file__]
@staticmethod
def getClassReference(identifier):
importlib.import_module(f"ffx.test.track_combinator_{ identifier }")
for name, obj in inspect.getmembers(sys.modules[f"ffx.test.track_combinator_{ identifier }"]):
#HINT: Excluding TrackCombinator as it seems to be included by import (?)
if inspect.isclass(obj) and name != 'TrackCombinator' and name.startswith('TrackCombinator'):
return obj
@staticmethod
def getAllClassReferences():
return [TrackCombinator.getClassReference(i) for i in TrackCombinator.list()]

@ -0,0 +1,87 @@
import os, sys, importlib, glob, inspect, itertools
from ffx.track_type import TrackType
from ffx.track_descriptor import TrackDescriptor
from .track_combinator import TrackCombinator
class TrackCombinator0(TrackCombinator):
IDENTIFIER = 'tracks'
VARIANT = 'VA'
def __init__(self, SubCombinators: dict = {}, context = None):
self._context = context
self._logger = context['logger']
self._reportLogger = context['report_logger']
self._SubCombinators = SubCombinators
self._identifier = TrackCombinator0.IDENTIFIER
self._variant = TrackCombinator0.VARIANT
def getPayload(self):
kwargs = {}
kwargs[TrackDescriptor.CONTEXT_KEY] = self._context
kwargs[TrackDescriptor.TRACK_TYPE_KEY] = TrackType.VIDEO
trackDescriptor0 = TrackDescriptor(**kwargs)
kwargs = {}
kwargs[TrackDescriptor.CONTEXT_KEY] = self._context
kwargs[TrackDescriptor.TRACK_TYPE_KEY] = TrackType.AUDIO
trackDescriptor1 = TrackDescriptor(**kwargs)
return [trackDescriptor0,
trackDescriptor1]
# def assertFunc(self, testObj):
# pass
def getYield(self, yieldObj: dict = {}):
if not 'asserts' in yieldObj.keys():
yieldObj['asserts'] = []
if not 'variants' in yieldObj.keys():
yieldObj['variants'] = set()
if not 'shouldFail' in yieldObj.keys():
yieldObj['shouldFail'] = False
yo = yieldObj.copy()
yo[self.getIdentifier()] = self.getPayload()
yo['variants'].add(self.getVariant())
# #assertFuncList = yo.pop('asserts', [])
# #variants = yo.pop('variants', set())
#
# sc_list = [(k,v) for k,v in self._SubCombinators.items()]
#
# #y = []
# for p in itertools.product(*[v.getYield() for k,v in sc_list]):
# z = {}
# for i in range(len(p)):
# z[sc_list[i][0]] = p[i]
#
#
# #y.append(z)
# for k,v in z.items():
# yo[k] = v
#
# assertFunc = getattr(v, "assertFunc", None)
# if callable(assertFunc):
# yo['asserts'].append(assertFunc)
#
# shouldFailFunc = getattr(v, "shouldFail", None)
# if callable(shouldFailFunc):
# if shouldFailFunc():
# yo['shouldFail'] = True
#
# yo['variants'].add(v.getVariant())
#
# #HINT: y should now contain a list of dicts with labels as keys and the SubCombinator classes as keys
yield yo

@ -0,0 +1,91 @@
import os, sys, importlib, glob, inspect, itertools
from ffx.track_type import TrackType
from ffx.track_descriptor import TrackDescriptor
from .track_combinator import TrackCombinator
class TrackCombinator1(TrackCombinator):
IDENTIFIER = 'tracks'
VARIANT = 'VAS'
def __init__(self, SubCombinations: dict = {}, context = None):
self._context = context
self._logger = context['logger']
self._reportLogger = context['report_logger']
self._SubCombinators = SubCombinations
self._identifier = TrackCombinator1.IDENTIFIER
self._variant = TrackCombinator1.VARIANT
def getPayload(self):
kwargs = {}
kwargs[TrackDescriptor.CONTEXT_KEY] = self._context
kwargs[TrackDescriptor.TRACK_TYPE_KEY] = TrackType.VIDEO
trackDescriptor0 = TrackDescriptor(**kwargs)
kwargs = {}
kwargs[TrackDescriptor.CONTEXT_KEY] = self._context
kwargs[TrackDescriptor.TRACK_TYPE_KEY] = TrackType.AUDIO
trackDescriptor1 = TrackDescriptor(**kwargs)
kwargs = {}
kwargs[TrackDescriptor.CONTEXT_KEY] = self._context
kwargs[TrackDescriptor.TRACK_TYPE_KEY] = TrackType.SUBTITLE
trackDescriptor2 = TrackDescriptor(**kwargs)
return [trackDescriptor0,
trackDescriptor1,
trackDescriptor2]
# def assertFunc(self, testObj):
# pass
def getYield(self, yieldObj: dict = {}):
if not 'asserts' in yieldObj.keys():
yieldObj['asserts'] = []
if not 'variants' in yieldObj.keys():
yieldObj['variants'] = set()
if not 'shouldFail' in yieldObj.keys():
yieldObj['shouldFail'] = False
yo = yieldObj.copy()
yo[self.getIdentifier()] = self.getPayload()
yo['variants'].add(self.getVariant())
# #assertFuncList = yo.pop('asserts', [])
# #variants = yo.pop('variants', set())
#
# sc_list = [(k,v) for k,v in self._SubCombinators.items()]
#
# #y = []
# for p in itertools.product(*[v.getYield() for k,v in sc_list]):
# z = {}
# for i in range(len(p)):
# z[sc_list[i][0]] = p[i]
#
#
# #y.append(z)
# for k,v in z.items():
# yo[k] = v
#
# assertFunc = getattr(v, "assertFunc", None)
# if callable(assertFunc):
# yo['asserts'].append(assertFunc)
#
# shouldFailFunc = getattr(v, "shouldFail", None)
# if callable(shouldFailFunc):
# if shouldFailFunc():
# yo['shouldFail'] = True
#
# yo['variants'].add(v.getVariant())
#
# #HINT: y should now contain a list of dicts with labels as keys and the SubCombinator classes as keys
yield yo

@ -0,0 +1,115 @@
import os, sys, importlib, glob, inspect, itertools
from ffx.track_type import TrackType
from ffx.track_descriptor import TrackDescriptor
from .track_combinator import TrackCombinator
from .disposition_combinator_2 import DispositionCombinator2
class TrackCombinator2(TrackCombinator):
IDENTIFIER = 'tracks'
VARIANT = 'VASS'
def __init__(self, SubCombinators: dict = {}, context = None):
self._context = context
self._logger = context['logger']
self._reportLogger = context['report_logger']
self._SubCombinators = SubCombinators
self._identifier = TrackCombinator2.IDENTIFIER
self._variant = TrackCombinator2.VARIANT
def getPayload(self, subtitleDispositionTuple = (set(), set())):
kwargs = {}
kwargs[TrackDescriptor.CONTEXT_KEY] = self._context
kwargs[TrackDescriptor.TRACK_TYPE_KEY] = TrackType.VIDEO
trackDescriptor0 = TrackDescriptor(**kwargs)
kwargs = {}
kwargs[TrackDescriptor.CONTEXT_KEY] = self._context
kwargs[TrackDescriptor.TRACK_TYPE_KEY] = TrackType.AUDIO
trackDescriptor1 = TrackDescriptor(**kwargs)
kwargs = {}
kwargs[TrackDescriptor.CONTEXT_KEY] = self._context
kwargs[TrackDescriptor.TRACK_TYPE_KEY] = TrackType.SUBTITLE
kwargs[TrackDescriptor.DISPOSITION_SET_KEY] = subtitleDispositionTuple[0]
trackDescriptor2 = TrackDescriptor(**kwargs)
kwargs = {}
kwargs[TrackDescriptor.CONTEXT_KEY] = self._context
kwargs[TrackDescriptor.TRACK_TYPE_KEY] = TrackType.SUBTITLE
kwargs[TrackDescriptor.DISPOSITION_SET_KEY] = subtitleDispositionTuple[1]
trackDescriptor3 = TrackDescriptor(**kwargs)
return [trackDescriptor0,
trackDescriptor1,
trackDescriptor2,
trackDescriptor3]
# def assertFunc(self, testObj):
# pass
def getYield(self, yieldObj: dict = {}):
if not 'asserts' in yieldObj.keys():
yieldObj['asserts'] = []
if not 'variants' in yieldObj.keys():
yieldObj['variants'] = set()
if not 'shouldFail' in yieldObj.keys():
yieldObj['shouldFail'] = False
for DC in DispositionCombinator2.getAllClassReferences():
yo = yieldObj.copy()
yo['variants'].add(self.getVariant())
dc = DC(context = self._context)
yo[self.getIdentifier()] = self.getPayload(dc.getPayload())
assertFunc = getattr(dc, "assertFunc", None)
if callable(assertFunc):
yo['asserts'].append(assertFunc)
shouldFailFunc = getattr(dc, "shouldFail", None)
if callable(shouldFailFunc):
if shouldFailFunc():
yo['shouldFail'] = True
yo['variants'].add(dc.getVariant())
# sc_list = [(k,v) for k,v in self._SubCombinators.items()]
#
# #y = []
# for p in itertools.product(*[v.getYield() for k,v in sc_list]):
# z = {}
# for i in range(len(p)):
# z[sc_list[i][0]] = p[i]
#
#
# #y.append(z)
# for k,v in z.items():
# yo[k] = v
#
# assertFunc = getattr(v, "assertFunc", None)
# if callable(assertFunc):
# yo['asserts'].append(assertFunc)
#
# shouldFailFunc = getattr(v, "shouldFail", None)
# if callable(shouldFailFunc):
# if shouldFailFunc():
# yo['shouldFail'] = True
#
# yo['variants'].add(v.getVariant())
#HINT: y should now contain a list of dicts with labels as keys and the SubCombinator classes as keys
yield yo

@ -0,0 +1,123 @@
import os, sys, importlib, glob, inspect, itertools
from ffx.track_type import TrackType
from ffx.track_descriptor import TrackDescriptor
from .track_combinator import TrackCombinator
from .disposition_combinator_3 import DispositionCombinator3
class TrackCombinator3(TrackCombinator):
IDENTIFIER = 'tracks'
VARIANT = 'VASSS'
def __init__(self, SubCombinations: dict = {}, context = None):
self._context = context
self._logger = context['logger']
self._reportLogger = context['report_logger']
self._SubCombinators = SubCombinations
self._identifier = TrackCombinator3.IDENTIFIER
self._variant = TrackCombinator3.VARIANT
def getPayload(self, subtitleDispositionTuple = (set(), set(), set())):
kwargs = {}
kwargs[TrackDescriptor.CONTEXT_KEY] = self._context
kwargs[TrackDescriptor.TRACK_TYPE_KEY] = TrackType.VIDEO
trackDescriptor0 = TrackDescriptor(**kwargs)
kwargs = {}
kwargs[TrackDescriptor.CONTEXT_KEY] = self._context
kwargs[TrackDescriptor.TRACK_TYPE_KEY] = TrackType.AUDIO
trackDescriptor1 = TrackDescriptor(**kwargs)
kwargs = {}
kwargs[TrackDescriptor.CONTEXT_KEY] = self._context
kwargs[TrackDescriptor.TRACK_TYPE_KEY] = TrackType.SUBTITLE
kwargs[TrackDescriptor.DISPOSITION_SET_KEY] = subtitleDispositionTuple[0]
trackDescriptor2 = TrackDescriptor(**kwargs)
kwargs = {}
kwargs[TrackDescriptor.CONTEXT_KEY] = self._context
kwargs[TrackDescriptor.TRACK_TYPE_KEY] = TrackType.SUBTITLE
kwargs[TrackDescriptor.DISPOSITION_SET_KEY] = subtitleDispositionTuple[1]
trackDescriptor3 = TrackDescriptor(**kwargs)
kwargs = {}
kwargs[TrackDescriptor.CONTEXT_KEY] = self._context
kwargs[TrackDescriptor.TRACK_TYPE_KEY] = TrackType.SUBTITLE
kwargs[TrackDescriptor.DISPOSITION_SET_KEY] = subtitleDispositionTuple[2]
trackDescriptor4 = TrackDescriptor(**kwargs)
return [trackDescriptor0,
trackDescriptor1,
trackDescriptor2,
trackDescriptor3,
trackDescriptor4]
# def assertFunc(self, testObj):
# pass
def getYield(self, yieldObj: dict = {}):
if not 'asserts' in yieldObj.keys():
yieldObj['asserts'] = []
if not 'variants' in yieldObj.keys():
yieldObj['variants'] = set()
if not 'shouldFail' in yieldObj.keys():
yieldObj['shouldFail'] = False
for DC in DispositionCombinator3.getAllClassReferences():
yo = yieldObj.copy()
yo['variants'].add(self.getVariant())
dc = DC(context = self._context)
yo[self.getIdentifier()] = self.getPayload(dc.getPayload())
assertFunc = getattr(dc, "assertFunc", None)
if callable(assertFunc):
yo['asserts'].append(assertFunc)
shouldFailFunc = getattr(dc, "shouldFail", None)
if callable(shouldFailFunc):
if shouldFailFunc():
yo['shouldFail'] = True
yo['variants'].add(dc.getVariant())
# sc_list = [(k,v) for k,v in self._SubCombinators.items()]
#
# #y = []
# for p in itertools.product(*[v.getYield() for k,v in sc_list]):
# z = {}
# for i in range(len(p)):
# z[sc_list[i][0]] = p[i]
#
#
# #y.append(z)
# for k,v in z.items():
# yo[k] = v
#
# assertFunc = getattr(v, "assertFunc", None)
# if callable(assertFunc):
# yo['asserts'].append(assertFunc)
#
# shouldFailFunc = getattr(v, "shouldFail", None)
# if callable(shouldFailFunc):
# if shouldFailFunc():
# yo['shouldFail'] = True
#
# yo['variants'].add(v.getVariant())
#HINT: y should now contain a list of dicts with labels as keys and the SubCombinator classes as keys
yield yo

@ -0,0 +1,108 @@
import os, sys, importlib, glob, inspect, itertools
from ffx.track_type import TrackType
from ffx.track_descriptor import TrackDescriptor
from .track_combinator import TrackCombinator
from .disposition_combinator_2 import DispositionCombinator2
class TrackCombinator4(TrackCombinator):
IDENTIFIER = 'tracks'
VARIANT = 'VAA'
def __init__(self, SubCombinations: dict = {}, context = None):
self._context = context
self._logger = context['logger']
self._reportLogger = context['report_logger']
self._SubCombinators = SubCombinations
self._identifier = TrackCombinator4.IDENTIFIER
self._variant = TrackCombinator4.VARIANT
def getPayload(self, audioDispositionTuple = (set(), set())):
kwargs = {}
kwargs[TrackDescriptor.CONTEXT_KEY] = self._context
kwargs[TrackDescriptor.TRACK_TYPE_KEY] = TrackType.VIDEO
trackDescriptor0 = TrackDescriptor(**kwargs)
kwargs = {}
kwargs[TrackDescriptor.CONTEXT_KEY] = self._context
kwargs[TrackDescriptor.TRACK_TYPE_KEY] = TrackType.AUDIO
kwargs[TrackDescriptor.DISPOSITION_SET_KEY] = audioDispositionTuple[0]
trackDescriptor1 = TrackDescriptor(**kwargs)
kwargs = {}
kwargs[TrackDescriptor.CONTEXT_KEY] = self._context
kwargs[TrackDescriptor.TRACK_TYPE_KEY] = TrackType.AUDIO
kwargs[TrackDescriptor.DISPOSITION_SET_KEY] = audioDispositionTuple[1]
trackDescriptor2 = TrackDescriptor(**kwargs)
return [trackDescriptor0,
trackDescriptor1,
trackDescriptor2]
# def assertFunc(self, testObj):
# pass
def getYield(self, yieldObj: dict = {}):
if not 'asserts' in yieldObj.keys():
yieldObj['asserts'] = []
if not 'variants' in yieldObj.keys():
yieldObj['variants'] = set()
if not 'shouldFail' in yieldObj.keys():
yieldObj['shouldFail'] = False
for DC in DispositionCombinator2.getAllClassReferences():
yo = yieldObj.copy()
yo['variants'].add(self.getVariant())
dc = DC(context = self._context)
yo[self.getIdentifier()] = self.getPayload(dc.getPayload())
assertFunc = getattr(dc, "assertFunc", None)
if callable(assertFunc):
yo['asserts'].append(assertFunc)
shouldFailFunc = getattr(dc, "shouldFail", None)
if callable(shouldFailFunc):
if shouldFailFunc():
yo['shouldFail'] = True
yo['variants'].add(dc.getVariant())
# sc_list = [(k,v) for k,v in self._SubCombinators.items()]
#
# #y = []
# for p in itertools.product(*[v.getYield() for k,v in sc_list]):
# z = {}
# for i in range(len(p)):
# z[sc_list[i][0]] = p[i]
#
#
# #y.append(z)
# for k,v in z.items():
# yo[k] = v
#
# assertFunc = getattr(v, "assertFunc", None)
# if callable(assertFunc):
# yo['asserts'].append(assertFunc)
#
# shouldFailFunc = getattr(v, "shouldFail", None)
# if callable(shouldFailFunc):
# if shouldFailFunc():
# yo['shouldFail'] = True
#
# yo['variants'].add(v.getVariant())
#HINT: y should now contain a list of dicts with labels as keys and the SubCombinator classes as keys
yield yo

@ -0,0 +1,114 @@
import os, sys, importlib, glob, inspect, itertools
from ffx.track_type import TrackType
from ffx.track_descriptor import TrackDescriptor
from .track_combinator import TrackCombinator
from .disposition_combinator_2 import DispositionCombinator2
class TrackCombinator5(TrackCombinator):
IDENTIFIER = 'tracks'
VARIANT = 'VAAS'
def __init__(self, SubCombinations: dict = {}, context = None):
self._context = context
self._logger = context['logger']
self._reportLogger = context['report_logger']
self._SubCombinators = SubCombinations
self._identifier = TrackCombinator5.IDENTIFIER
self._variant = TrackCombinator5.VARIANT
def getPayload(self, audioDispositionTuple = (set(), set())):
kwargs = {}
kwargs[TrackDescriptor.CONTEXT_KEY] = self._context
kwargs[TrackDescriptor.TRACK_TYPE_KEY] = TrackType.VIDEO
trackDescriptor0 = TrackDescriptor(**kwargs)
kwargs = {}
kwargs[TrackDescriptor.CONTEXT_KEY] = self._context
kwargs[TrackDescriptor.TRACK_TYPE_KEY] = TrackType.AUDIO
kwargs[TrackDescriptor.DISPOSITION_SET_KEY] = audioDispositionTuple[0]
trackDescriptor1 = TrackDescriptor(**kwargs)
kwargs = {}
kwargs[TrackDescriptor.CONTEXT_KEY] = self._context
kwargs[TrackDescriptor.TRACK_TYPE_KEY] = TrackType.AUDIO
kwargs[TrackDescriptor.DISPOSITION_SET_KEY] = audioDispositionTuple[1]
trackDescriptor2 = TrackDescriptor(**kwargs)
kwargs = {}
kwargs[TrackDescriptor.CONTEXT_KEY] = self._context
kwargs[TrackDescriptor.TRACK_TYPE_KEY] = TrackType.SUBTITLE
trackDescriptor3 = TrackDescriptor(**kwargs)
return [trackDescriptor0,
trackDescriptor1,
trackDescriptor2,
trackDescriptor3]
# def assertFunc(self, testObj):
# pass
def getYield(self, yieldObj: dict = {}):
if not 'asserts' in yieldObj.keys():
yieldObj['asserts'] = []
if not 'variants' in yieldObj.keys():
yieldObj['variants'] = set()
if not 'shouldFail' in yieldObj.keys():
yieldObj['shouldFail'] = False
for DC in DispositionCombinator2.getAllClassReferences():
yo = yieldObj.copy()
yo['variants'].add(self.getVariant())
dc = DC(context = self._context)
yo[self.getIdentifier()] = self.getPayload(dc.getPayload())
assertFunc = getattr(dc, "assertFunc", None)
if callable(assertFunc):
yo['asserts'].append(assertFunc)
shouldFailFunc = getattr(dc, "shouldFail", None)
if callable(shouldFailFunc):
if shouldFailFunc():
yo['shouldFail'] = True
yo['variants'].add(dc.getVariant())
# sc_list = [(k,v) for k,v in self._SubCombinators.items()]
#
# #y = []
# for p in itertools.product(*[v.getYield() for k,v in sc_list]):
# z = {}
# for i in range(len(p)):
# z[sc_list[i][0]] = p[i]
#
#
# #y.append(z)
# for k,v in z.items():
# yo[k] = v
#
# assertFunc = getattr(v, "assertFunc", None)
# if callable(assertFunc):
# yo['asserts'].append(assertFunc)
#
# shouldFailFunc = getattr(v, "shouldFail", None)
# if callable(shouldFailFunc):
# if shouldFailFunc():
# yo['shouldFail'] = True
#
# yo['variants'].add(v.getVariant())
#HINT: y should now contain a list of dicts with labels as keys and the SubCombinator classes as keys
yield yo

@ -0,0 +1,138 @@
import os, sys, importlib, glob, inspect, itertools
from ffx.track_type import TrackType
from ffx.track_descriptor import TrackDescriptor
from .track_combinator import TrackCombinator
from .disposition_combinator_2 import DispositionCombinator2
class TrackCombinator6(TrackCombinator):
IDENTIFIER = 'tracks'
VARIANT = 'VAASS'
def __init__(self, SubCombinations: dict = {}, context = None):
self._context = context
self._logger = context['logger']
self._reportLogger = context['report_logger']
self._SubCombinators = SubCombinations
self._identifier = TrackCombinator6.IDENTIFIER
self._variant = TrackCombinator6.VARIANT
def getPayload(self, audioDispositionTuple = (set(), set()), subtitleDispositionTuple = (set(), set())):
kwargs = {}
kwargs[TrackDescriptor.CONTEXT_KEY] = self._context
kwargs[TrackDescriptor.TRACK_TYPE_KEY] = TrackType.VIDEO
trackDescriptor0 = TrackDescriptor(**kwargs)
kwargs = {}
kwargs[TrackDescriptor.CONTEXT_KEY] = self._context
kwargs[TrackDescriptor.TRACK_TYPE_KEY] = TrackType.AUDIO
kwargs[TrackDescriptor.DISPOSITION_SET_KEY] = audioDispositionTuple[0]
trackDescriptor1 = TrackDescriptor(**kwargs)
kwargs = {}
kwargs[TrackDescriptor.CONTEXT_KEY] = self._context
kwargs[TrackDescriptor.TRACK_TYPE_KEY] = TrackType.AUDIO
kwargs[TrackDescriptor.DISPOSITION_SET_KEY] = audioDispositionTuple[1]
trackDescriptor2 = TrackDescriptor(**kwargs)
kwargs = {}
kwargs[TrackDescriptor.CONTEXT_KEY] = self._context
kwargs[TrackDescriptor.TRACK_TYPE_KEY] = TrackType.SUBTITLE
kwargs[TrackDescriptor.DISPOSITION_SET_KEY] = subtitleDispositionTuple[0]
trackDescriptor3 = TrackDescriptor(**kwargs)
kwargs = {}
kwargs[TrackDescriptor.CONTEXT_KEY] = self._context
kwargs[TrackDescriptor.TRACK_TYPE_KEY] = TrackType.SUBTITLE
kwargs[TrackDescriptor.DISPOSITION_SET_KEY] = subtitleDispositionTuple[1]
trackDescriptor4 = TrackDescriptor(**kwargs)
return [trackDescriptor0,
trackDescriptor1,
trackDescriptor2,
trackDescriptor3,
trackDescriptor4]
# def assertFunc(self, testObj):
# pass
def getYield(self, yieldObj: dict = {}):
if not 'asserts' in yieldObj.keys():
yieldObj['asserts'] = []
if not 'variants' in yieldObj.keys():
yieldObj['variants'] = set()
if not 'shouldFail' in yieldObj.keys():
yieldObj['shouldFail'] = False
for DC_A in DispositionCombinator2.getAllClassReferences():
for DC_S in DispositionCombinator2.getAllClassReferences():
yo = yieldObj.copy()
yo['variants'].add(self.getVariant())
dc_a = DC_A(context = self._context)
dc_s = DC_S(context = self._context)
yo[self.getIdentifier()] = self.getPayload(dc_a.getPayload(), dc_s.getPayload())
assertFunc = getattr(dc_a, "assertFunc", None)
if callable(assertFunc):
yo['asserts'].append(assertFunc)
shouldFailFunc = getattr(dc_a, "shouldFail", None)
if callable(shouldFailFunc):
if shouldFailFunc():
yo['shouldFail'] = True
yo['variants'].add(dc_a.getVariant())
assertFunc = getattr(dc_s, "assertFunc", None)
if callable(assertFunc):
yo['asserts'].append(assertFunc)
shouldFailFunc = getattr(dc_s, "shouldFail", None)
if callable(shouldFailFunc):
if shouldFailFunc():
yo['shouldFail'] = True
yo['variants'].add(dc_s.getVariant())
# sc_list = [(k,v) for k,v in self._SubCombinators.items()]
#
# #y = []
# for p in itertools.product(*[v.getYield() for k,v in sc_list]):
# z = {}
# for i in range(len(p)):
# z[sc_list[i][0]] = p[i]
#
#
# #y.append(z)
# for k,v in z.items():
# yieldObj[k] = v
#
# assertFunc = getattr(v, "assertFunc", None)
# if callable(assertFunc):
# yieldObj['asserts'].append(assertFunc)
#
# shouldFailFunc = getattr(v, "shouldFail", None)
# if callable(shouldFailFunc):
# if shouldFailFunc():
# yieldObj['shouldFail'] = True
#
# yieldObj['variants'].add(v.getVariant())
#HINT: y should now contain a list of dicts with labels as keys and the SubCombinator classes as keys
yield yo

@ -0,0 +1,155 @@
import os, sys, importlib, glob, inspect, itertools
from ffx.track_type import TrackType
from ffx.track_descriptor import TrackDescriptor
from .track_combinator import TrackCombinator
from .disposition_combinator_2 import DispositionCombinator2
from .disposition_combinator_3 import DispositionCombinator3
class TrackCombinator7(TrackCombinator):
IDENTIFIER = 'tracks'
VARIANT = 'VAASSS'
def __init__(self, SubCombinations: dict = {}, context = None):
self._context = context
self._logger = context['logger']
self._reportLogger = context['report_logger']
self._SubCombinators = SubCombinations
self._identifier = TrackCombinator7.IDENTIFIER
self._variant = TrackCombinator7.VARIANT
def getPayload(self, audioDispositionTuple = (set(), set()), subtitleDispositionTuple = (set(), set(), set())):
kwargs = {}
kwargs[TrackDescriptor.CONTEXT_KEY] = self._context
kwargs[TrackDescriptor.TRACK_TYPE_KEY] = TrackType.VIDEO
trackDescriptor0 = TrackDescriptor(**kwargs)
kwargs = {}
kwargs[TrackDescriptor.CONTEXT_KEY] = self._context
kwargs[TrackDescriptor.TRACK_TYPE_KEY] = TrackType.AUDIO
kwargs[TrackDescriptor.DISPOSITION_SET_KEY] = audioDispositionTuple[0]
trackDescriptor1 = TrackDescriptor(**kwargs)
kwargs = {}
kwargs[TrackDescriptor.CONTEXT_KEY] = self._context
kwargs[TrackDescriptor.TRACK_TYPE_KEY] = TrackType.AUDIO
kwargs[TrackDescriptor.DISPOSITION_SET_KEY] = audioDispositionTuple[1]
trackDescriptor2 = TrackDescriptor(**kwargs)
kwargs = {}
kwargs[TrackDescriptor.CONTEXT_KEY] = self._context
kwargs[TrackDescriptor.TRACK_TYPE_KEY] = TrackType.SUBTITLE
kwargs[TrackDescriptor.DISPOSITION_SET_KEY] = subtitleDispositionTuple[0]
trackDescriptor3 = TrackDescriptor(**kwargs)
kwargs = {}
kwargs[TrackDescriptor.CONTEXT_KEY] = self._context
kwargs[TrackDescriptor.TRACK_TYPE_KEY] = TrackType.SUBTITLE
kwargs[TrackDescriptor.DISPOSITION_SET_KEY] = subtitleDispositionTuple[1]
trackDescriptor4 = TrackDescriptor(**kwargs)
kwargs = {}
kwargs[TrackDescriptor.CONTEXT_KEY] = self._context
kwargs[TrackDescriptor.TRACK_TYPE_KEY] = TrackType.SUBTITLE
kwargs[TrackDescriptor.DISPOSITION_SET_KEY] = subtitleDispositionTuple[2]
trackDescriptor5 = TrackDescriptor(**kwargs)
return [trackDescriptor0,
trackDescriptor1,
trackDescriptor2,
trackDescriptor3,
trackDescriptor4,
trackDescriptor5]
# def assertFunc(self, testObj):
# pass
#1 yieldObj leer, kommt aus Schleife in Scenario1
def getYield(self, yieldObj: dict = {}):
if not 'asserts' in yieldObj.keys():
yieldObj['asserts'] = []
if not 'variants' in yieldObj.keys():
yieldObj['variants'] = set()
if not 'shouldFail' in yieldObj.keys():
yieldObj['shouldFail'] = False
yieldObj['variants'].add(self.getVariant())
for DC_A in DispositionCombinator2.getAllClassReferences():
for DC_S in DispositionCombinator3.getAllClassReferences():
yo = {}
yo['asserts'] = yieldObj['asserts']
yo['variants'] = yieldObj['variants']
yo['shouldFail'] = yieldObj['shouldFail']
self._logger.debug(f"Iterating DC_A={DC_A.__name__} DC_S={DC_S.__name__}")
self._logger.debug(f"><))))^> yieldObj id={id(yieldObj)}")
self._logger.debug(f"><))))^> yo id={id(yo)}")
dc_a = DC_A(context = self._context)
dc_s = DC_S(context = self._context)
yo[self.getIdentifier()] = self.getPayload(dc_a.getPayload(), dc_s.getPayload())
assertFunc = getattr(dc_a, "assertFunc", None)
if callable(assertFunc):
yo['asserts'].append(assertFunc)
# shouldFailFunc = getattr(dc_a, "shouldFail", None)
# if callable(shouldFailFunc):
# if shouldFailFunc():
# yo['shouldFail'] = True
#
yo['variants'].add(f"{dc_a.getVariant()}A")
#
assertFunc = getattr(dc_s, "assertFunc", None)
if callable(assertFunc):
yo['asserts'].append(assertFunc)
# shouldFailFunc = getattr(dc_s, "shouldFail", None)
# if callable(shouldFailFunc):
# if shouldFailFunc():
# yo['shouldFail'] = True
#
yo['variants'].add(f"{dc_s.getVariant()}S")
# sc_list = [(k,v) for k,v in self._SubCombinators.items()]
#
# #y = []
# for p in itertools.product(*[v.getYield() for k,v in sc_list]):
# z = {}
# for i in range(len(p)):
# z[sc_list[i][0]] = p[i]
#
#
# #y.append(z)
# for k,v in z.items():
# yo[k] = v
#
# assertFunc = getattr(v, "assertFunc", None)
# if callable(assertFunc):
# yo['asserts'].append(assertFunc)
#
# shouldFailFunc = getattr(v, "shouldFail", None)
# if callable(shouldFailFunc):
# if shouldFailFunc():
# yo['shouldFail'] = True
#
# yo['variants'].add(v.getVariant())
#HINT: y should now contain a list of dicts with labels as keys and the SubCombinator classes as keys
yield yo
# yield yo | yieldObj

@ -0,0 +1,32 @@
import itertools
class c1():
def getYield():
for i in range(3):
yield i
class c2():
def getYield():
for i in range(5):
yield i
class c3():
def getYield():
for i in range(7):
yield i
sc = {}
sc['y1'] = c1
sc['y2'] = c2
sc['y3'] = c3
sc_list = [(k,v) for k,v in sc.items()]
y = []
for p in itertools.product(*[v.getYield() for k,v in sc_list]):
z = {}
for i in range(len(p)):
z[sc_list[i][0]] = p[i]
y.append(z)
print(y)
Loading…
Cancel
Save