Adding tests; scenario 1 MWE

click-textual
Maveno 11 months ago
parent 772c1d8f90
commit 3647b25b4c

@ -326,12 +326,14 @@ def convert(ctx,
click.echo(f"Pattern matching: {'No' if currentPattern is None else 'Yes'}")
fileBasename = ''
# fileBasename = ''
if currentPattern is None:
# Case no pattern matching
# fileBasename = currentShowDescriptor.getFilenamePrefix()
# Check for multiple default or forced dispositions if not set by user input or database requirements
#
# Query user for the correct sub indices, then configure flags in track descriptors associated with media descriptor accordingly.
@ -367,13 +369,17 @@ def convert(ctx,
mediaFileProperties.getSeason(),
mediaFileProperties.getEpisode())
if context['use_jellyfin']:
# Reorder subtracks in types with default the last, then make subindices flat again
sourceMediaDescriptor.applyJellyfinOrder()
fc = FfxController(context, sourceMediaDescriptor)
dispositionTokens = fc.generateDispositionTokens()
click.echo(f"Disposition Tokens: {dispositionTokens}")
# dispositionTokens = fc.generateDispositionTokens()
# click.echo(f"Disposition Tokens: {dispositionTokens}")
audioTokens = fc.generateAudioEncodingTokens()
click.echo(f"Audio Tokens: {audioTokens}")
# audioTokens = fc.generateAudioEncodingTokens()
# click.echo(f"Audio Tokens: {audioTokens}")
else:
@ -389,7 +395,7 @@ def convert(ctx,
tmdbEpisodeResult = tc.queryEpisode(currentShowDescriptor.getId(), mediaFileProperties.getSeason(), mediaFileProperties.getEpisode())
if tmdbEpisodeResult:
fileBasename = tc.getEpisodeFileBasename(currentShowDescriptor.getFilenamePrefix(),
sourceFileBasename = tc.getEpisodeFileBasename(currentShowDescriptor.getFilenamePrefix(),
tmdbEpisodeResult['name'],
mediaFileProperties.getSeason(),
mediaFileProperties.getEpisode(),
@ -398,9 +404,7 @@ def convert(ctx,
currentShowDescriptor.getIndicatorSeasonDigits(),
currentShowDescriptor.getIndicatorEpisodeDigits())
else:
fileBasename = currentShowDescriptor.getFilenamePrefix()
click.echo(f"fileBasename={fileBasename}")
sourceFileBasename = currentShowDescriptor.getFilenamePrefix()
if context['import_subtitles']:
targetMediaDescriptor.importSubtitles(context['subtitle_directory'],
@ -422,18 +426,20 @@ def convert(ctx,
fc = FfxController(context, targetMediaDescriptor, sourceMediaDescriptor)
mappingTokens = fc.generateMetadataTokens()
click.echo(f"Metadata Tokens: {mappingTokens}")
# mappingTokens = fc.generateMetadataTokens()
# click.echo(f"Metadata Tokens: {mappingTokens}")
dispositionTokens = fc.generateDispositionTokens()
click.echo(f"Disposition Tokens: {dispositionTokens}")
# dispositionTokens = fc.generateDispositionTokens()
# click.echo(f"Disposition Tokens: {dispositionTokens}")
audioTokens = fc.generateAudioEncodingTokens()
click.echo(f"Audio Tokens: {audioTokens}")
# audioTokens = fc.generateAudioEncodingTokens()
# click.echo(f"Audio Tokens: {audioTokens}")
click.echo(f"Season={mediaFileProperties.getSeason()} Episode={mediaFileProperties.getEpisode()}")
click.echo(f"fileBasename={sourceFileBasename}")
for q in q_list:
click.echo(f"\nRunning job {jobIndex} file={sourcePath} q={q}")
@ -441,8 +447,8 @@ def convert(ctx,
extra = ['ffx'] if sourceFilenameExtension == FfxController.DEFAULT_FILE_EXTENSION else []
targetFilename = (fileBasename if context['use_tmdb']
else mediaFileProperties.assembleTargetFileBasename(label if label else fileBasename,
targetFilename = (sourceFileBasename if context['use_tmdb']
else mediaFileProperties.assembleTargetFileBasename(label if label else sourceFileBasename,
q if len(q_list) > 1 else -1,
extraTokens = extra))

@ -1,3 +1,5 @@
import click
DIFF_ADDED_KEY = 'added'
DIFF_REMOVED_KEY = 'removed'
DIFF_CHANGED_KEY = 'changed'
@ -27,6 +29,15 @@ def dictDiff(a : dict, b : dict):
return diffResult
def dictCache(element: dict, cache: list = []):
for index in range(len(cache)):
diff = dictDiff(cache[index], element)
click.echo(f"dictCache() element={element} index={index} cached={cache[index]} diff={diff}")
if not diff:
return index, cache
cache.append(element)
return -1, cache
def setDiff(a : set, b : set) -> set:
@ -42,6 +53,10 @@ def setDiff(a : set, b : set) -> set:
return diffResult
def filterFilename(fileName: str) -> str:
"""This filter replaces charactes from TMDB responses with characters
less problemating when using in filenames or removes them"""
fileName = str(fileName).replace(':', ';')
return fileName

@ -134,16 +134,18 @@ class MediaDescriptor:
if defaultVideoTracks:
videoTracks.append(videoTracks.pop(videoTracks.index(defaultVideoTracks[0])))
self.sortSubIndices(videoTracks)
#self.sortSubIndices(videoTracks)
if defaultAudioTracks:
audioTracks.append(audioTracks.pop(audioTracks.index(defaultAudioTracks[0])))
self.sortSubIndices(audioTracks)
#self.sortSubIndices(audioTracks)
if defaultSubtitleTracks:
subtitleTracks.append(subtitleTracks.pop(subtitleTracks.index(defaultSubtitleTracks[0])))
self.sortSubIndices(subtitleTracks)
#self.sortSubIndices(subtitleTracks)
self.__trackDescriptors = videoTracks + audioTracks + subtitleTracks
self.sortIndices(self.__trackDescriptors)
#self.sortIndices(self.__trackDescriptors)
self.reindexSubIndices()
self.reindexIndices()
@classmethod
@ -193,6 +195,15 @@ class MediaDescriptor:
subIndex += 1
return descriptors
def reindexSubIndices(self):
subIndexCounter = {}
for td in self.__trackDescriptors:
trackType = td.getType()
if trackType not in subIndexCounter.keys():
subIndexCounter[trackType] = 0
td.setSubIndex(subIndexCounter[trackType])
subIndexCounter[trackType] += 1
def sortIndices(
self, descriptors: List[TrackDescriptor]
) -> List[TrackDescriptor]:
@ -202,6 +213,10 @@ class MediaDescriptor:
index += 1
return descriptors
def reindexIndices(self):
for trackIndex in range(len(self.__trackDescriptors)):
self.__trackDescriptors[trackIndex].setIndex(trackIndex)
def getAllTrackDescriptors(self) -> List[TrackDescriptor]:
return self.getVideoTracks() + self.getAudioTracks() + self.getSubtitleTracks()
@ -306,6 +321,7 @@ class MediaDescriptor:
return compareResult
def getImportFileTokens(self, use_sub_index: bool = True):
# reorderedTrackDescriptors = self.getReorderedTrackDescriptors()
@ -326,19 +342,20 @@ class MediaDescriptor:
def getInputMappingTokens(self, use_sub_index: bool = True, only_video: bool = False):
"""?: Tracks must be reordered for source index order"""
# reorderedTrackDescriptors = self.getReorderedTrackDescriptors()
inputMappingTokens = []
filePointer = 1
#for rtd in reorderedTrackDescriptors:
for td in self.__trackDescriptors:
for rtd in sorted(self.__trackDescriptors.copy(), key=lambda d: d.getSourceIndex()):
trackType = td.getType()
trackType = rtd.getType()
if (trackType == TrackType.VIDEO or not only_video):
importedFilePath = td.getExternalSourceFilePath()
importedFilePath = rtd.getExternalSourceFilePath()
if use_sub_index:
@ -352,18 +369,19 @@ class MediaDescriptor:
else:
if td.getCodec() != TrackDescriptor.CODEC_PGS:
if rtd.getCodec() != TrackDescriptor.CODEC_PGS:
inputMappingTokens += [
"-map",
f"0:{trackType.indicator()}:{td.getSubIndex()}",
f"0:{trackType.indicator()}:{rtd.getSubIndex()}",
]
else:
if td.getCodec() != TrackDescriptor.CODEC_PGS:
inputMappingTokens += ["-map", f"0:{td.getIndex()}"]
if rtd.getCodec() != TrackDescriptor.CODEC_PGS:
inputMappingTokens += ["-map", f"0:{rtd.getIndex()}"]
return inputMappingTokens
def searchSubtitleFiles(self, searchDirectory, prefix):
sesl_match = re.compile(MediaDescriptor.SEASON_EPISODE_STREAM_LANGUAGE_MATCH)

@ -1,9 +1,9 @@
import subprocess
from typing import List
def executeProcess(commandSequence: List[str]):
def executeProcess(commandSequence: List[str], directory: str = None):
# process = subprocess.Popen([t.encode('utf-8') for t in commandSequence], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
process = subprocess.Popen(commandSequence, stdout=subprocess.PIPE, stderr=subprocess.PIPE, encoding='utf-8')
process = subprocess.Popen(commandSequence, stdout=subprocess.PIPE, stderr=subprocess.PIPE, encoding='utf-8', cwd = directory)
output, error = process.communicate()
# return output.decode('utf-8'), error.decode('utf-8'), process.returncode
return output, error, process.returncode

@ -1,7 +1,19 @@
import os, math, tempfile
import os, math, tempfile, click
from ffx.ffx_controller import FfxController
from ffx.process import executeProcess
from ffx.media_descriptor import MediaDescriptor
from ffx.track_type import TrackType
from ffx.helper import dictCache
SHORT_SUBTITLE_SEQUENCE = [{'start': 1, 'end': 2, 'text': 'yolo'},
{'start': 3, 'end': 4, 'text': 'zolo'},
{'start': 5, 'end': 6, 'text': 'golo'}]
def getTimeString(hours: float = 0.0,
minutes: float = 0.0,
@ -123,7 +135,8 @@ def createVttFile(entries: dict):
return tmpFileName
def createMediaFile(directory: str = '',
def createMediaTestFile(mediaDescriptor: MediaDescriptor,
directory: str = '',
baseName: str = 'media',
format: str = '',
extension: str = 'mkv',
@ -132,37 +145,100 @@ def createMediaFile(directory: str = '',
rate: int = 25,
length: int = 10):
# subtitleContent = []
# subtitleContent.append({'start': 1, 'end': 2, 'text': 'yolo'})
# subtitleContent.append({'start': 3, 'end': 4, 'text': 'zolo'})
# subtitleContent.append({'start': 5, 'end': 6, 'text': 'golo'})
# subtitleFilePath = createVttFile(subtitleContent)
# subtitleFilePath = createVttFile(SHORT_SUBTITLE_SEQUENCE)
# commandTokens = FfxController.COMMAND_TOKENS
commandTokens = ['ffmpeg', '-y']
commandTokens += ['-f',
generatorCache = []
generatorTokens = []
mappingTokens = []
importTokens = []
metadataTokens = []
for mediaTagKey, mediaTagValue in mediaDescriptor.getTags().items():
metadataTokens += ['-metadata:g', f"{mediaTagKey}={mediaTagValue}"]
subIndexCounter = {}
for trackDescriptor in mediaDescriptor.getAllTrackDescriptors():
trackType = trackDescriptor.getType()
if trackType == TrackType.VIDEO:
cacheIndex, generatorCache = dictCache({'type': TrackType.VIDEO}, generatorCache)
click.echo(f"createMediaTestFile() cache index={cacheIndex} size={len(generatorCache)}")
if cacheIndex == -1:
generatorTokens += ['-f',
'lavfi',
'-i',
f"color=size={sizeX}x{sizeY}:rate={rate}:color=black"]
commandTokens += ['-f',
sourceIndex = len(generatorCache) - 1 if cacheIndex == -1 else cacheIndex
mappingTokens += ['-map', f"{sourceIndex}:v:0"]
if not trackType in subIndexCounter.keys():
subIndexCounter[trackType] = 0
for mediaTagKey, mediaTagValue in trackDescriptor.getTags().items():
metadataTokens += [f"-metadata:s:{trackType.indicator()}:{subIndexCounter[trackType]}",
f"{mediaTagKey}={mediaTagValue}"]
subIndexCounter[trackType] += 1
if trackType == TrackType.AUDIO:
audioLayout = 'stereo'
cacheIndex, generatorCache = dictCache({'type': TrackType.AUDIO, 'layout': audioLayout}, generatorCache)
click.echo(f"createMediaTestFile() cache index={cacheIndex} size={len(generatorCache)}")
click.echo(f"generartorCache index={cacheIndex} len={len(generatorCache)}")
if cacheIndex == -1:
generatorTokens += ['-f',
'lavfi',
'-i',
'anullsrc=channel_layout=stereo:sample_rate=44100']
f"anullsrc=channel_layout={audioLayout}:sample_rate=44100"]
sourceIndex = len(generatorCache) - 1 if cacheIndex == -1 else cacheIndex
mappingTokens += ['-map', f"{sourceIndex}:a:0"]
if not trackType in subIndexCounter.keys():
subIndexCounter[trackType] = 0
for mediaTagKey, mediaTagValue in trackDescriptor.getTags().items():
metadataTokens += [f"-metadata:s:{trackType.indicator()}:{subIndexCounter[trackType]}",
f"{mediaTagKey}={mediaTagValue}"]
subIndexCounter[trackType] += 1
if trackType == TrackType.SUBTITLE:
cacheIndex, generatorCache = dictCache({'type': TrackType.SUBTITLE}, generatorCache)
click.echo(f"createMediaTestFile() cache index={cacheIndex} size={len(generatorCache)}")
if cacheIndex == -1:
importTokens = ['-i', createVttFile(SHORT_SUBTITLE_SEQUENCE)]
sourceIndex = len(generatorCache) - 1 if cacheIndex == -1 else cacheIndex
mappingTokens += ['-map', f"{sourceIndex}:s:0"]
if not trackType in subIndexCounter.keys():
subIndexCounter[trackType] = 0
for mediaTagKey, mediaTagValue in trackDescriptor.getTags().items():
metadataTokens += [f"-metadata:s:{trackType.indicator()}:{subIndexCounter[trackType]}",
f"{mediaTagKey}={mediaTagValue}"]
subIndexCounter[trackType] += 1
# '-i',
# subtitleFilePath,
#HINT: No context required for creating disposition tokens
fc = FfxController({}, mediaDescriptor)
commandTokens += ['-map', '0:v:0']
#'-map', '0:v:0',
commandTokens += ['-map', '1:a:0']
commandTokens += ['-map', '1:a:0']
commandTokens += (generatorTokens
+ importTokens
+ mappingTokens
+ metadataTokens
+ fc.generateDispositionTokens())
# '-map', '2:s:0',
# '-c:s', 'webvtt',
commandTokens += ['-t', str(length)]

@ -5,22 +5,27 @@ from ffx.test.helper import createEmptyDirectory
class Scenario():
def __init__(self):
def __init__(self, context = None):
self._context = context
self._testDirectory = createEmptyDirectory()
self._ffxExecutablePath = os.path.join(
os.path.dirname(
os.path.dirname(
os.path.dirname(__file__))),
'ffx.py')
@staticmethod
def list():
basePath = os.path.dirname(__file__)
return [int(os.path.basename(p)[8:-3])
return [os.path.basename(p)[9:-3]
for p
in glob.glob(f"{ basePath }/scenario*.py", recursive = True)
in glob.glob(f"{ basePath }/scenario_*.py", recursive = True)
if p != __file__]
@staticmethod
def getClassReference(identifier):
importlib.import_module(f"ffx.test.scenario{ identifier }")
for name, obj in inspect.getmembers(sys.modules[f"ffx.test.scenario{ identifier }"]):
if inspect.isclass(obj) and name == f"Scenario{identifier}":
importlib.import_module(f"ffx.test.scenario_{ identifier }")
for name, obj in inspect.getmembers(sys.modules[f"ffx.test.scenario_{ identifier }"]):
#HINT: Excluding Scenario as it seems to be included by import (?)
if inspect.isclass(obj) and name != 'Scenario' and name.startswith('Scenario'):
return obj

@ -1,31 +0,0 @@
import os, sys, click
from .scenario import Scenario
from ffx.test.helper import createMediaFile
from ffx.process import executeProcess
class Scenario1(Scenario):
"""Creating file VAa, h264/aac/aac
Converting to VaA, vp9/opus/opus
No tmdb, default parameters"""
def __init__(self):
super().__init__()
def i_am(self):
return 1
def run(self):
click.echo(f"Running scenario 1")
mediaFilePath = createMediaFile(directory=self._testDirectory)
commandSequence = [sys.executable, os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(__file__))), 'ffx.py'), '--help']
click.echo(f"Scenarion 1 test sequence: {commandSequence}")
out, err, rc = executeProcess(commandSequence)
click.echo(out)

@ -0,0 +1,92 @@
import os, sys, click
from .scenario import Scenario
from ffx.test.helper import createMediaTestFile
from ffx.process import executeProcess
from ffx.file_properties import FileProperties
from ffx.media_descriptor import MediaDescriptor
from ffx.track_descriptor import TrackDescriptor
from ffx.track_type import TrackType
from ffx.track_disposition import TrackDisposition
class Scenario1(Scenario):
"""Creating file VAa, h264/aac/aac
Converting to VaA, vp9/opus/opus
No tmdb, default parameters"""
def __init__(self, context):
super().__init__(context)
def run(self):
click.echo(f"Running scenario 1")
kwargs = {}
kwargs[TrackDescriptor.TRACK_TYPE_KEY] = TrackType.VIDEO
kwargs[TrackDescriptor.DISPOSITION_SET_KEY] = set([TrackDisposition.DEFAULT])
trackDescriptor1 = TrackDescriptor(**kwargs)
kwargs = {}
kwargs[TrackDescriptor.TRACK_TYPE_KEY] = TrackType.AUDIO
kwargs[TrackDescriptor.DISPOSITION_SET_KEY] = set([TrackDisposition.DEFAULT])
trackDescriptor2 = TrackDescriptor(**kwargs)
kwargs = {}
kwargs[TrackDescriptor.TRACK_TYPE_KEY] = TrackType.AUDIO
trackDescriptor3 = TrackDescriptor(**kwargs)
kwargs = {}
kwargs[MediaDescriptor.TRACK_DESCRIPTOR_LIST_KEY] = [trackDescriptor1,
trackDescriptor2,
trackDescriptor3]
sourceMediaDescriptor = MediaDescriptor(**kwargs)
sourceMediaDescriptor.reindexSubIndices()
# Phase 1: Setup source files
mediaFilePath = createMediaTestFile(mediaDescriptor=sourceMediaDescriptor, directory=self._testDirectory)
# Phase 2: Prepare database
# Phase 3: Run ffx
commandSequence = [sys.executable,
self._ffxExecutablePath,
'convert',
mediaFilePath]
click.echo(f"Scenarion 1 test sequence: {commandSequence}")
out, err, rc = executeProcess(commandSequence, directory = self._testDirectory)
click.echo(f"process output: {out}")
# Phase 4: Evaluate results
try:
resultFile = os.path.join(self._testDirectory, 'media.webm')
assert os.path.isfile(resultFile), f"Result file 'media.webm' in path '{self._testDirectory}' wasn't created"
resultFileProperties = FileProperties(self._context, resultFile)
resultMediaDescriptor = resultFileProperties.getMediaDescriptor()
resultMediaTracks = resultMediaDescriptor.getAllTrackDescriptors()
assert len(resultMediaTracks) == 3, f"Result file contains unexpected number of streams"
assert resultMediaTracks[0].getType() == TrackType.VIDEO, f"Stream #0 is not of type video"
assert resultMediaTracks[1].getType() == TrackType.AUDIO, f"Stream #1 is not of type audio"
assert not resultMediaTracks[1].getDispositionFlag(TrackDisposition.DEFAULT), f"Stream #1 has set default disposition"
assert resultMediaTracks[2].getType() == TrackType.AUDIO, f"Stream #2 is not of type audio"
assert resultMediaTracks[2].getDispositionFlag(TrackDisposition.DEFAULT), f"Stream #1 has not set default disposition"
except AssertionError as ae:
click.echo(f"Scenario 1 test failed ({ae})")

@ -2,11 +2,11 @@ from .scenario import Scenario
class Scenario2(Scenario):
def __init__(self):
super().__init__()
def __init__(self, context):
super().__init__(context)
def run(self):
pass
# self._testDirectory
#createMediaFile()
#createMediaTestFile()

@ -2,11 +2,11 @@ from .scenario import Scenario
class Scenario3(Scenario):
def __init__(self):
super().__init__()
def __init__(self, context):
super().__init__(context)
def run(self):
pass
# self._testDirectory
#createMediaFile()
#createMediaTestFile()

@ -10,7 +10,7 @@ class TmdbController():
try:
self.__tmdbApiKey = os.environ['TMDB_API_KEY']
except KeyError:
click.ClickException('TMDB api key is not available, please set environment variable TMDB_API_KEY')
raise click.ClickException('TMDB api key is not available, please set environment variable TMDB_API_KEY')
self.tmdbLanguage = TmdbController.DEFAULT_LANGUAGE
@ -53,7 +53,7 @@ class TmdbController():
urlParams = f"?language={self.tmdbLanguage}&api_key={self.__tmdbApiKey}"
tmdbUrl = f"https://api.themoviedb.org/3/tv/{showId}W{urlParams}"
tmdbUrl = f"https://api.themoviedb.org/3/tv/{showId}{urlParams}"
#TODO Check for result
try:
@ -153,3 +153,6 @@ class TmdbController():
filenameTokens += ['E{num:{fill}{width}}'.format(num=episode, fill='0', width=indicatorEpisodeDigits)]
return ''.join(filenameTokens)
def importShow(self, showId: int):
pass

@ -19,7 +19,7 @@ from ffx.track_disposition import TrackDisposition
from ffx.process import executeProcess
from ffx.test.helper import createMediaFile
from ffx.test.helper import createMediaTestFile
from ffx.test.scenario import Scenario
@ -63,18 +63,32 @@ def help():
# Another subcommand
@ffx.command()
def run():
@click.pass_context
def run(ctx):
"""Run ffx test sequences"""
for scenarioIndex in Scenario().list():
for scenarioIdentifier in Scenario().list():
scenario = Scenario().getClassReference(scenarioIndex)()
scenario = Scenario.getClassReference(scenarioIdentifier)(ctx.obj)
click.echo(f"Running scenario {scenarioIndex}")
click.echo(f"Running scenario {scenarioIdentifier}")
scenario.run()
@ffx.command()
@click.pass_context
@click.argument('paths', nargs=-1)
def dupe(ctx, paths):
existingSourcePaths = [p for p in paths if os.path.isfile(p) and p.split('.')[-1] in FfxController.INPUT_FILE_EXTENSIONS]
for sourcePath in existingSourcePaths:
sourceFileProperties = FileProperties(ctx.obj, sourcePath)
sourceMediaDescriptor = sourceFileProperties.getMediaDescriptor()
createMediaTestFile(sourceMediaDescriptor, baseName='dupe')
if __name__ == '__main__':
ffx()

Loading…
Cancel
Save