import os, math, tempfile, click from ffx.process import executeProcess from ffx.media_descriptor import MediaDescriptor from ffx.media_descriptor_change_set import MediaDescriptorChangeSet from ffx.track_type import TrackType from ffx.helper import dictCache from ffx.configuration_controller import ConfigurationController SHORT_SUBTITLE_SEQUENCE = [{'start': 1, 'end': 2, 'text': 'yolo'}, {'start': 3, 'end': 4, 'text': 'zolo'}, {'start': 5, 'end': 6, 'text': 'golo'}] def getTimeString(hours: float = 0.0, minutes: float = 0.0, seconds: float = 0.0, millis: float = 0.0, format: str = ''): duration = (hours * 3600.0 + minutes * 60.0 + seconds + millis / 1000.0) hours = math.floor(duration / 3600.0) remaining = duration - 3600.0 * hours minutes = math.floor(remaining / 60.0) remaining = remaining - 60.0 * minutes seconds = math.floor(remaining) remaining = remaining - seconds millis = math.floor(remaining * 1000) if format == 'ass': return f"{hours:01d}:{minutes:02d}:{seconds:02d}.{millis:02d}" # srt & vtt return f"{hours:02d}:{minutes:02d}:{seconds:02d}.{millis:03d}" def createAssFile(entries: dict, directory = None): # [Script Info] # ; Script generated by FFmpeg/Lavc61.3.100 # ScriptType: v4.00+ # PlayResX: 384 # PlayResY: 288 # ScaledBorderAndShadow: yes # YCbCr Matrix: None # # [V4+ Styles] # Format: Name, Fontname, Fontsize, PrimaryColour, SecondaryColour, OutlineColour, BackColour, Bold, Italic, Underline, StrikeOut, ScaleX, ScaleY, Spacing, Angle, BorderStyle, Outline, Shadow, Alignment, MarginL, MarginR, MarginV, Encoding # Style: Default,Arial,16,&Hffffff,&Hffffff,&H0,&H0,0,0,0,0,100,100,0,0,1,1,0,2,10,10,10,1 # # [Events] # Format: Layer, Start, End, Style, Name, MarginL, MarginR, MarginV, Effect, Text # Dialogue: 0,0:00:01.00,0:00:02.00,Default,,0,0,0,,yolo # Dialogue: 0,0:00:03.00,0:00:04.00,Default,,0,0,0,,zolo # Dialogue: 0,0:00:05.00,0:00:06.00,Default,,0,0,0,,golo tmpFileName = tempfile.mktemp(suffix=".ass", dir = directory) with open(tmpFileName, 'w') as tmpFile: tmpFile.write("[Script Info]\n") tmpFile.write("; Script generated by Ffx\n") tmpFile.write("ScriptType: v4.00+\n") tmpFile.write("PlayResX: 384\n") tmpFile.write("PlayResY: 288\n") tmpFile.write("ScaledBorderAndShadow: yes\n") tmpFile.write("YCbCr Matrix: None\n") tmpFile.write("\n") tmpFile.write("[V4+ Styles]\n") tmpFile.write("Format: Name, Fontname, Fontsize, PrimaryColour, SecondaryColour, OutlineColour, BackColour, Bold, Italic, Underline, StrikeOut, ScaleX, ScaleY, Spacing, Angle, BorderStyle, Outline, Shadow, Alignment, MarginL, MarginR, MarginV, Encoding\n") tmpFile.write("Style: Default,Arial,16,&Hffffff,&Hffffff,&H0,&H0,0,0,0,0,100,100,0,0,1,1,0,2,10,10,10,1\n") tmpFile.write("\n") tmpFile.write("[Events]\n") tmpFile.write("Format: Layer, Start, End, Style, Name, MarginL, MarginR, MarginV, Effect, Text\n") for entryIndex in range(len(entries)): tmpFile.write(f"Dialogue: 0,{getTimeString(seconds=entries[entryIndex]['start'], format='ass')},{getTimeString(seconds=entries[entryIndex]['end'], format='ass')},Default,,0,0,0,,{entries[entryIndex]['text']}\n") return tmpFileName def createSrtFile(entries: dict, directory = None): # 1 # 00:00:00,000 --> 00:00:02,500 # Welcome to the Example Subtitle File! # # 2 # 00:00:03,000 --> 00:00:06,000 # This is a demonstration of SRT subtitles. # # 3 # 00:00:07,000 --> 00:00:10,500 # You can use SRT files to add subtitles to your videos. tmpFileName = tempfile.mktemp(suffix=".srt", dir = directory) with open(tmpFileName, 'w') as tmpFile: for entryIndex in range(len(entries)): tmpFile.write(f"{entryIndex}\n") tmpFile.write(f"{getTimeString(seconds=entries[entryIndex]['start'])} --> {getTimeString(seconds=entries[entryIndex]['end'])}\n") tmpFile.write(f"{entries[entryIndex]['text']}\n\n") return tmpFileName def createVttFile(entries: dict, directory = None): # WEBVTT # # 01:20:33.050 --> 01:20:35.050 # Yolo tmpFileName = tempfile.mktemp(suffix=".vtt", dir = directory) with open(tmpFileName, 'w') as tmpFile: tmpFile.write("WEBVTT\n") for entryIndex in range(len(entries)): tmpFile.write("\n") tmpFile.write(f"{getTimeString(seconds=entries[entryIndex]['start'])} --> {getTimeString(seconds=entries[entryIndex]['end'])}\n") tmpFile.write(f"{entries[entryIndex]['text']}\n") return tmpFileName def createMediaTestFile(mediaDescriptor: MediaDescriptor, directory: str = '', baseName: str = 'media', format: str = '', extension: str = 'mkv', sizeX: int = 1280, sizeY: int = 720, rate: int = 25, length: int = 10, logger = None): # subtitleFilePath = createVttFile(SHORT_SUBTITLE_SEQUENCE) commandTokens = ['ffmpeg', '-y'] generatorCache = [] generatorTokens = [] mappingTokens = [] importTokens = [] metadataTokens = [] for mediaTagKey, mediaTagValue in mediaDescriptor.getTags().items(): metadataTokens += ['-metadata:g', f"{mediaTagKey}={mediaTagValue}"] subIndexCounter = {} # for trackDescriptor in mediaDescriptor.getAllTrackDescriptors(): for trackDescriptor in mediaDescriptor.getTrackDescriptors(): trackType = trackDescriptor.getType() if trackType == TrackType.VIDEO: cacheIndex, generatorCache = dictCache({'type': TrackType.VIDEO}, generatorCache) # click.echo(f"createMediaTestFile() cache index={cacheIndex} size={len(generatorCache)}") if cacheIndex == -1: generatorTokens += ['-f', 'lavfi', '-i', f"color=size={sizeX}x{sizeY}:rate={rate}:color=black"] sourceIndex = len(generatorCache) - 1 if cacheIndex == -1 else cacheIndex mappingTokens += ['-map', f"{sourceIndex}:v:0"] if not trackType in subIndexCounter.keys(): subIndexCounter[trackType] = 0 for mediaTagKey, mediaTagValue in trackDescriptor.getTags().items(): metadataTokens += [f"-metadata:s:{trackType.indicator()}:{subIndexCounter[trackType]}", f"{mediaTagKey}={mediaTagValue}"] subIndexCounter[trackType] += 1 if trackType == TrackType.AUDIO: audioLayout = 'stereo' cacheIndex, generatorCache = dictCache({'type': TrackType.AUDIO, 'layout': audioLayout}, generatorCache) # click.echo(f"createMediaTestFile() cache index={cacheIndex} size={len(generatorCache)}") # click.echo(f"generartorCache index={cacheIndex} len={len(generatorCache)}") if cacheIndex == -1: generatorTokens += ['-f', 'lavfi', '-i', f"anullsrc=channel_layout={audioLayout}:sample_rate=44100"] sourceIndex = len(generatorCache) - 1 if cacheIndex == -1 else cacheIndex mappingTokens += ['-map', f"{sourceIndex}:a:0"] if not trackType in subIndexCounter.keys(): subIndexCounter[trackType] = 0 for mediaTagKey, mediaTagValue in trackDescriptor.getTags().items(): metadataTokens += [f"-metadata:s:{trackType.indicator()}:{subIndexCounter[trackType]}", f"{mediaTagKey}={mediaTagValue}"] subIndexCounter[trackType] += 1 if trackType == TrackType.SUBTITLE: cacheIndex, generatorCache = dictCache({'type': TrackType.SUBTITLE}, generatorCache) # click.echo(f"createMediaTestFile() cache index={cacheIndex} size={len(generatorCache)}") if cacheIndex == -1: importTokens = ['-i', createVttFile(SHORT_SUBTITLE_SEQUENCE, directory=directory if directory else None)] sourceIndex = len(generatorCache) - 1 if cacheIndex == -1 else cacheIndex mappingTokens += ['-map', f"{sourceIndex}:s:0"] if not trackType in subIndexCounter.keys(): subIndexCounter[trackType] = 0 for mediaTagKey, mediaTagValue in trackDescriptor.getTags().items(): metadataTokens += [f"-metadata:s:{trackType.indicator()}:{subIndexCounter[trackType]}", f"{mediaTagKey}={mediaTagValue}"] subIndexCounter[trackType] += 1 ffxContext = {'config': ConfigurationController(), 'logger': logger} mdcs = MediaDescriptorChangeSet(ffxContext, mediaDescriptor) commandTokens += (generatorTokens + importTokens + mappingTokens + metadataTokens + mdcs.generateDispositionTokens()) commandTokens += ['-t', str(length)] if format: commandTokens += ['-f', format] fileName = f"{baseName}.{extension}" if directory: outputPath = os.path.join(directory, fileName) else: outputPath = fileName commandTokens += [outputPath] ctx = {'logger': logger} out, err, rc = executeProcess(commandTokens, context = ctx) if not logger is None: if out: logger.debug(f"createMediaTestFile(): Process output: {out}") if rc: logger.debug(f"createMediaTestFile(): Process returned ERROR {rc} ({err})") return outputPath def createEmptyDirectory(): return tempfile.mkdtemp() def createEmptyFile(suffix=None): return tempfile.mkstemp(suffix=suffix)