from __future__ import annotations from pathlib import Path import sys import time import unittest from unittest.mock import patch SRC_ROOT = Path(__file__).resolve().parents[2] / "src" if str(SRC_ROOT) not in sys.path: sys.path.insert(0, str(SRC_ROOT)) from ffx.process import ( # noqa: E402 COMMAND_NOT_FOUND_RETURN_CODE, COMMAND_TIMED_OUT_RETURN_CODE, executeProcess, getWrappedCommandSequence, normalizeCpuPercent, normalizeNiceness, ) class ProcessTests(unittest.TestCase): def test_execute_process_returns_stdout_for_success(self): out, err, rc = executeProcess( [sys.executable, "-c", "print('hello from process')"] ) self.assertEqual(0, rc) self.assertEqual("", err) self.assertEqual("hello from process\n", out) def test_execute_process_maps_missing_command_to_stable_error(self): out, err, rc = executeProcess(["ffx-command-that-does-not-exist"]) self.assertEqual("", out) self.assertEqual(COMMAND_NOT_FOUND_RETURN_CODE, rc) self.assertIn("Command not found while running", err) self.assertIn("ffx-command-that-does-not-exist", err) def test_execute_process_maps_timeout_to_stable_error(self): out, err, rc = executeProcess( [sys.executable, "-c", "import time; time.sleep(0.2)"], timeoutSeconds=0.05, ) self.assertEqual("", out) self.assertEqual(COMMAND_TIMED_OUT_RETURN_CODE, rc) self.assertIn("Command timed out", err) self.assertIn(sys.executable, err) def test_execute_process_can_stop_early_while_streaming_stderr(self): start = time.monotonic() observed_lines = [] out, err, rc = executeProcess( [ sys.executable, "-c", ( "import sys, time; " "sys.stderr.write('fatal warning\\n'); sys.stderr.flush(); " "time.sleep(2); " "sys.stderr.write('late line\\n'); sys.stderr.flush()" ), ], stderrLineHandler=lambda line: observed_lines.append(line) or ("fatal warning" in line), ) elapsed = time.monotonic() - start self.assertLess(elapsed, 1.5) self.assertNotEqual(0, rc) self.assertEqual("", out) self.assertIn("fatal warning", err) self.assertNotIn("late line", err) self.assertEqual(["fatal warning\n"], observed_lines) def test_get_wrapped_command_sequence_leaves_command_unwrapped_when_limits_disabled(self): wrapped = getWrappedCommandSequence( ["ffmpeg", "-i", "input.mkv"], context={"resource_limits": {"niceness": None, "cpu_percent": None}}, ) self.assertEqual(["ffmpeg", "-i", "input.mkv"], wrapped) def test_get_wrapped_command_sequence_wraps_nice_when_configured(self): wrapped = getWrappedCommandSequence( ["ffmpeg", "-i", "input.mkv"], context={"resource_limits": {"niceness": 5, "cpu_percent": None}}, ) self.assertEqual(["nice", "-n", "5", "ffmpeg", "-i", "input.mkv"], wrapped) def test_get_wrapped_command_sequence_wraps_cpulimit_around_nice_when_both_configured(self): wrapped = getWrappedCommandSequence( ["ffmpeg", "-i", "input.mkv"], context={"resource_limits": {"niceness": 5, "cpu_limit": 200}}, ) self.assertEqual( ["cpulimit", "-l", "200", "--", "nice", "-n", "5", "ffmpeg", "-i", "input.mkv"], wrapped, ) def test_normalize_niceness_accepts_disabled_sentinel(self): self.assertIsNone(normalizeNiceness(99)) def test_normalize_cpu_percent_accepts_disabled_sentinel(self): self.assertIsNone(normalizeCpuPercent(0)) def test_normalize_cpu_percent_accepts_absolute_cpulimit_values(self): self.assertEqual(200, normalizeCpuPercent(200)) def test_normalize_cpu_percent_converts_percent_of_present_cores(self): with patch("ffx.process.getPresentCpuCount", return_value=8): self.assertEqual(200, normalizeCpuPercent("25%")) if __name__ == "__main__": unittest.main()