This commit is contained in:
Javanaut
2026-04-14 00:26:16 +02:00
parent e3c18f22d4
commit 6c5b518e4d
7 changed files with 316 additions and 28 deletions

1
.gitignore vendored
View File

@@ -20,5 +20,6 @@ venv/
*.mkv
*.webm
*.mp4
ffmpeg2pass-0.log
*.sup

View File

@@ -8,6 +8,7 @@ from textual.widgets import Button, Footer, Header, Input, Static
from textual.widgets._data_table import CellDoesNotExist
from ffx.file_properties import FileProperties
from ffx.helper import DIFF_ADDED_KEY, DIFF_CHANGED_KEY, DIFF_REMOVED_KEY
from ffx.media_descriptor_change_set import MediaDescriptorChangeSet
from ffx.show_descriptor import ShowDescriptor
from ffx.track_descriptor import TrackDescriptor
@@ -207,6 +208,30 @@ class InspectDetailsScreen(MediaWorkflowScreenBase):
def action_back(self):
go_back_or_exit(self)
def getDisplayedMediaDescriptor(self):
if self._currentPattern is not None and self._targetMediaDescriptor is not None:
return self._targetMediaDescriptor
return self._sourceMediaDescriptor
def getTrackEditSourceDescriptor(self):
selectedTrackDescriptor = self.getSelectedTrackDescriptor()
if (
selectedTrackDescriptor is None
or self._currentPattern is None
or self._targetMediaDescriptor is None
):
return selectedTrackDescriptor
for sourceTrackDescriptor in self._sourceMediaDescriptor.getTrackDescriptors():
if (
sourceTrackDescriptor.getSourceIndex()
== selectedTrackDescriptor.getSourceIndex()
and sourceTrackDescriptor.getType() == selectedTrackDescriptor.getType()
):
return sourceTrackDescriptor
return None
def _build_shows_table(self):
from textual.widgets import DataTable
@@ -478,8 +503,6 @@ class InspectDetailsScreen(MediaWorkflowScreenBase):
self.updateDifferences()
return updated
self.reloadProperties(reset_draft=True)
tagDifferences = self._mediaChangeSetObj.get(MediaDescriptorChangeSet.TAGS_KEY, {})
for addedTagKey in tagDifferences.get(DIFF_ADDED_KEY, {}).keys():
self._tac.deleteMediaTagByKey(self._currentPattern.getId(), addedTagKey)
@@ -566,9 +589,6 @@ class InspectDetailsScreen(MediaWorkflowScreenBase):
)
def handle_edit_pattern(self, screenResult):
if not screenResult:
return
self.reloadProperties(reset_draft=True)
if self._currentPattern is not None:
self.query_one("#pattern_input", Input).value = self._currentPattern.getPattern()

View File

@@ -1,7 +1,7 @@
import os
from time import monotonic
from textual import work
from textual import events, work
from textual.containers import Grid
from textual.worker import Worker, WorkerState
from textual.widgets import Button, Footer, Header, Static
@@ -183,6 +183,13 @@ class MediaEditScreen(MediaWorkflowScreenBase):
self.updateToggleButtons()
self._applyChangesWorker = None
def on_screen_resume(self, _event: events.ScreenResume) -> None:
if not hasattr(self, "tracksTable"):
return
self.refreshAfterDraftChange()
self.updateToggleButtons()
def _update_grid_layout(self) -> None:
leftColumnWidth = max(
localized_column_width(t("File"), self.GRID_COLUMN_LABEL_MIN),
@@ -353,30 +360,35 @@ class MediaEditScreen(MediaWorkflowScreenBase):
if trackDescriptor is None:
return
updatedTracks = []
nextSourceMediaDescriptor = self._sourceMediaDescriptor.clone(context=self.context)
updatedTracks = nextSourceMediaDescriptor.getTrackDescriptors()
replacementTrack = trackDescriptor.clone(context=self.context)
replaced = False
for currentTrack in self._sourceMediaDescriptor.getTrackDescriptors():
if (
currentTrack.getIndex() == trackDescriptor.getIndex()
and currentTrack.getSubIndex() == trackDescriptor.getSubIndex()
):
updatedTracks.append(trackDescriptor)
for trackIndex, currentTrack in enumerate(updatedTracks):
sameSourceTrack = (
currentTrack.getSourceIndex() == replacementTrack.getSourceIndex()
and currentTrack.getType() == replacementTrack.getType()
)
sameVisibleTrack = (
currentTrack.getIndex() == replacementTrack.getIndex()
and currentTrack.getSubIndex() == replacementTrack.getSubIndex()
)
if sameSourceTrack or sameVisibleTrack:
updatedTracks[trackIndex] = replacementTrack
replaced = True
else:
updatedTracks.append(currentTrack)
break
if not replaced:
self.setMessage(t("Unable to update selected stream."))
return
self._sourceMediaDescriptor = self._sourceMediaDescriptor.clone(context=self.context)
self._sourceMediaDescriptor.getTrackDescriptors().clear()
self._sourceMediaDescriptor.getTrackDescriptors().extend(updatedTracks)
self._sourceMediaDescriptor = nextSourceMediaDescriptor
self.setMessage(
t(
"Updated stream #{index} ({track_type}).",
index=trackDescriptor.getIndex(),
track_type=t(trackDescriptor.getType().label()),
index=replacementTrack.getIndex(),
track_type=t(replacementTrack.getType().label()),
)
)
self.refreshAfterDraftChange()

View File

@@ -10,6 +10,7 @@ from ffx.audio_layout import AudioLayout
from ffx.file_properties import FileProperties
from ffx.helper import DIFF_ADDED_KEY, DIFF_CHANGED_KEY, DIFF_REMOVED_KEY
from ffx.iso_language import IsoLanguage
from ffx.media_descriptor import MediaDescriptor
from ffx.media_descriptor_change_set import MediaDescriptorChangeSet
from ffx.track_descriptor import TrackDescriptor
from ffx.track_disposition import TrackDisposition
@@ -171,10 +172,17 @@ class MediaWorkflowScreenBase(Screen):
def hasPendingChanges(self) -> bool:
return bool(self._mediaChangeSetObj)
def getDisplayedMediaDescriptor(self) -> MediaDescriptor | None:
return self._sourceMediaDescriptor
def getTrackEditSourceDescriptor(self) -> TrackDescriptor | None:
return self.getSelectedTrackDescriptor()
def updateMediaTags(self):
displayedMediaDescriptor = self.getDisplayedMediaDescriptor()
self._sourceMediaTagRowData = populate_tag_table(
self.mediaTagsTable,
self._sourceMediaDescriptor.getTags(),
displayedMediaDescriptor.getTags() if displayedMediaDescriptor is not None else {},
ignore_keys=self._ignoreGlobalKeys,
remove_keys=self._removeGlobalKeys,
)
@@ -184,7 +192,12 @@ class MediaWorkflowScreenBase(Screen):
self._configure_tracks_table_columns()
self._trackRowData = {}
trackDescriptorList = self._sourceMediaDescriptor.getTrackDescriptors()
displayedMediaDescriptor = self.getDisplayedMediaDescriptor()
trackDescriptorList = (
displayedMediaDescriptor.getTrackDescriptors()
if displayedMediaDescriptor is not None
else []
)
typeCounter = {}
applyNormalization = bool(getattr(self, "_applyNormalization", False))
@@ -366,7 +379,7 @@ class MediaWorkflowScreenBase(Screen):
return None
def setSelectedTrackDefault(self):
selectedTrackDescriptor = self.getSelectedTrackDescriptor()
selectedTrackDescriptor = self.getTrackEditSourceDescriptor()
if selectedTrackDescriptor is None:
return False
@@ -377,7 +390,7 @@ class MediaWorkflowScreenBase(Screen):
return True
def setSelectedTrackForced(self):
selectedTrackDescriptor = self.getSelectedTrackDescriptor()
selectedTrackDescriptor = self.getTrackEditSourceDescriptor()
if selectedTrackDescriptor is None:
return False

View File

@@ -103,8 +103,11 @@ def apply_metadata_edits(
*,
notify=None,
) -> dict[str, object]:
temporaryOutputPath = create_temporary_output_path(source_path)
editContext = build_metadata_edit_context(context)
commandSequence = build_metadata_edit_command(
editContext,
source_path,
@@ -112,17 +115,21 @@ def apply_metadata_edits(
baseline_descriptor,
draft_descriptor,
)
ffmpegSeconds = 0.0
replaceSeconds = 0.0
try:
if editContext.get("dry_run", False):
notify_ffmpeg_invocation(
editContext,
commandSequence,
notify=notify,
dry_run=True,
)
return {
"applied": False,
"dry_run": True,
@@ -136,15 +143,18 @@ def apply_metadata_edits(
}
notify_ffmpeg_invocation(editContext, commandSequence, notify=notify)
ffmpegStart = monotonic()
_out, err, rc = executeProcess(commandSequence, context=editContext)
ffmpegSeconds = monotonic() - ffmpegStart
if rc:
raise click.ClickException(f"ffmpeg edit failed: rc={rc} error={err}")
replaceStart = monotonic()
os.replace(temporaryOutputPath, source_path)
replaceSeconds = monotonic() - replaceStart
return {
"applied": True,
"dry_run": False,
@@ -156,6 +166,7 @@ def apply_metadata_edits(
"write_seconds": ffmpegSeconds + replaceSeconds,
},
}
except Exception:
if os.path.exists(temporaryOutputPath):
os.remove(temporaryOutputPath)

View File

@@ -1,6 +1,7 @@
import click, re
from typing import List
from textual import events
from textual.screen import Screen
from textual.widgets import Header, Footer, Static, Button, Input, DataTable, TextArea
from textual.containers import Grid
@@ -342,6 +343,16 @@ class PatternDetailsScreen(Screen):
self.updateTracks()
self.updateShiftedSeasons()
def on_screen_resume(self, _event: events.ScreenResume) -> None:
if not hasattr(self, "tracksTable") or not hasattr(self, "tagsTable"):
return
self.updateTags()
self.updateTracks()
if self.__pattern is not None and hasattr(self, "shiftedSeasonsTable"):
self.updateShiftedSeasons()
def compose(self):

View File

@@ -14,10 +14,12 @@ if str(SRC_ROOT) not in sys.path:
from ffx.audio_layout import AudioLayout # noqa: E402
from ffx.helper import DIFF_ADDED_KEY # noqa: E402
from ffx.iso_language import IsoLanguage # noqa: E402
from ffx.logging_utils import get_ffx_logger # noqa: E402
from ffx.inspect_details_screen import InspectDetailsScreen # noqa: E402
from ffx.i18n import set_current_language # noqa: E402
from ffx.media_descriptor import MediaDescriptor # noqa: E402
from ffx.media_edit_screen import MediaEditScreen # noqa: E402
from ffx.pattern_details_screen import PatternDetailsScreen # noqa: E402
from ffx.show_descriptor import ShowDescriptor # noqa: E402
@@ -89,12 +91,16 @@ class FakeTagTable:
class FakeMediaDescriptor:
def __init__(self, track_descriptors):
def __init__(self, track_descriptors, tags=None):
self._track_descriptors = list(track_descriptors)
self._tags = dict(tags or {})
def getTrackDescriptors(self):
return list(self._track_descriptors)
def getTags(self):
return dict(self._tags)
class FakeValueWidget:
def __init__(self, value):
@@ -459,6 +465,99 @@ class TagTableScreenStateTests(unittest.TestCase):
calls,
)
def test_media_edit_screen_handle_edit_track_updates_draft_descriptor(self):
original_track = TrackDescriptor(
index=1,
source_index=1,
sub_index=0,
track_type=TrackType.SUBTITLE,
codec_name=TrackCodec.UNKNOWN,
tags={"language": "ger"},
)
context = {"logger": get_ffx_logger()}
updated_track = original_track.clone(context=context)
updated_track.getTags()["language"] = "eng"
screen = object.__new__(MediaEditScreen)
screen.context = context
screen._sourceMediaDescriptor = MediaDescriptor(
context=context,
track_descriptors=[original_track],
)
calls = []
screen.setMessage = lambda _message: calls.append("setMessage")
screen.refreshAfterDraftChange = lambda: calls.append("refreshAfterDraftChange")
screen.handle_edit_track(updated_track)
self.assertEqual(
"eng",
screen._sourceMediaDescriptor.getTrackDescriptors()[0].getTags()["language"],
)
self.assertEqual(
["setMessage", "refreshAfterDraftChange"],
calls,
)
def test_media_edit_screen_screen_resume_refreshes_draft_tables(self):
screen = object.__new__(MediaEditScreen)
screen.tracksTable = FakeTagTable()
calls = []
screen.refreshAfterDraftChange = lambda: calls.append("refreshAfterDraftChange")
screen.updateToggleButtons = lambda: calls.append("updateToggleButtons")
screen.on_screen_resume(None)
self.assertEqual(
["refreshAfterDraftChange", "updateToggleButtons"],
calls,
)
def test_pattern_details_screen_screen_resume_refreshes_tables(self):
screen = object.__new__(PatternDetailsScreen)
screen.tracksTable = FakeTagTable()
screen.tagsTable = FakeTagTable()
screen.shiftedSeasonsTable = FakeTagTable()
screen._PatternDetailsScreen__pattern = object()
calls = []
screen.updateTags = lambda: calls.append("updateTags")
screen.updateTracks = lambda: calls.append("updateTracks")
screen.updateShiftedSeasons = lambda: calls.append("updateShiftedSeasons")
screen.on_screen_resume(None)
self.assertEqual(
["updateTags", "updateTracks", "updateShiftedSeasons"],
calls,
)
def test_inspect_details_screen_handle_edit_pattern_refreshes_even_without_result(self):
screen = object.__new__(InspectDetailsScreen)
calls = []
screen.reloadProperties = lambda reset_draft=True: calls.append(
("reloadProperties", reset_draft)
)
screen._currentPattern = None
screen.updateMediaTags = lambda: calls.append("updateMediaTags")
screen.updateTracks = lambda: calls.append("updateTracks")
screen.updateDifferences = lambda: calls.append("updateDifferences")
screen.handle_edit_pattern(None)
self.assertEqual(
[
("reloadProperties", True),
"updateMediaTags",
"updateTracks",
"updateDifferences",
],
calls,
)
def test_pattern_details_screen_reads_selected_shifted_season_from_row_mapping(self):
screen = object.__new__(PatternDetailsScreen)
screen.shiftedSeasonsTable = FakeTagTable()
@@ -565,6 +664,127 @@ class TagTableScreenStateTests(unittest.TestCase):
self.assertNotIn(placeholder_key, screen._showRowData)
self.assertEqual(0, screen.getRowIndexFromShowId(8))
def test_inspect_details_screen_update_tracks_shows_target_pattern_tracks(self):
source_track = TrackDescriptor(
index=1,
source_index=1,
sub_index=0,
track_type=TrackType.SUBTITLE,
codec_name=TrackCodec.UNKNOWN,
tags={"language": "ger", "title": "German Full"},
)
target_track = TrackDescriptor(
index=1,
source_index=1,
sub_index=0,
track_type=TrackType.SUBTITLE,
codec_name=TrackCodec.UNKNOWN,
tags={"language": "eng", "title": "English Full"},
)
screen = object.__new__(InspectDetailsScreen)
screen.tracksTable = FakeTagTable()
screen._sourceMediaDescriptor = FakeMediaDescriptor([source_track])
screen._targetMediaDescriptor = FakeMediaDescriptor([target_track])
screen._currentPattern = object()
screen._trackRowData = {}
screen._applyNormalization = False
screen.updateTracks()
self.assertIn("English Full", screen.tracksTable.rows["row-0"])
self.assertIs(target_track, screen.getSelectedTrackDescriptor())
def test_inspect_details_screen_maps_target_selection_back_to_source_track(self):
source_track = TrackDescriptor(
index=3,
source_index=7,
sub_index=1,
track_type=TrackType.SUBTITLE,
codec_name=TrackCodec.UNKNOWN,
tags={"language": "ger"},
)
target_track = TrackDescriptor(
index=1,
source_index=7,
sub_index=0,
track_type=TrackType.SUBTITLE,
codec_name=TrackCodec.UNKNOWN,
tags={"language": "eng"},
)
screen = object.__new__(InspectDetailsScreen)
screen.tracksTable = FakeTagTable()
screen._sourceMediaDescriptor = FakeMediaDescriptor([source_track])
screen._targetMediaDescriptor = FakeMediaDescriptor([target_track])
screen._currentPattern = object()
screen._trackRowData = {}
screen._applyNormalization = False
screen.updateTracks()
self.assertIs(source_track, screen.getTrackEditSourceDescriptor())
def test_inspect_details_screen_action_update_pattern_uses_existing_change_set_before_reload(self):
class _FakePattern:
def getPattern(self):
return r"demo_(s[0-9]+e[0-9]+)\.mkv"
def getId(self):
return 9
class _FakeTagController:
def __init__(self, calls):
self._calls = calls
def deleteMediaTagByKey(self, pattern_id, key):
self._calls.append(("deleteMediaTagByKey", pattern_id, key))
calls = []
screen = object.__new__(InspectDetailsScreen)
screen._currentPattern = _FakePattern()
screen._mediaChangeSetObj = {
"tags": {
DIFF_ADDED_KEY: {"TITLE": "Demo"},
}
}
screen._tac = _FakeTagController(calls)
screen._tc = type(
"_FakeTrackController",
(),
{
"addTrack": staticmethod(lambda *_args, **_kwargs: None),
"deleteTrack": staticmethod(lambda *_args, **_kwargs: None),
"setDispositionState": staticmethod(lambda *_args, **_kwargs: None),
},
)()
screen._sourceMediaDescriptor = FakeMediaDescriptor([], tags={})
screen._targetMediaDescriptor = FakeMediaDescriptor([])
screen.getPatternObjFromInput = lambda: {
"show_id": 1,
"pattern": r"demo_(s[0-9]+e[0-9]+)\.mkv",
}
screen.reloadProperties = lambda reset_draft=True: calls.append(
("reloadProperties", reset_draft)
)
screen.updateMediaTags = lambda: calls.append("updateMediaTags")
screen.updateTracks = lambda: calls.append("updateTracks")
screen.updateDifferences = lambda: calls.append("updateDifferences")
screen.action_update_pattern()
self.assertEqual(
[
("deleteMediaTagByKey", 9, "TITLE"),
("reloadProperties", True),
"updateMediaTags",
"updateTracks",
"updateDifferences",
],
calls,
)
if __name__ == "__main__":
unittest.main()