Files
ffx/src/ffx/process.py
2026-04-11 16:30:41 +02:00

170 lines
5.3 KiB
Python

import os
import shlex
import subprocess
from typing import Iterable, List
from .logging_utils import get_ffx_logger
COMMAND_TIMED_OUT_RETURN_CODE = 124
COMMAND_NOT_FOUND_RETURN_CODE = 127
MIN_NICENESS = -20
MAX_NICENESS = 19
DISABLED_NICENESS_SENTINEL = 99
DISABLED_CPU_PERCENT_SENTINEL = 0
MIN_CPU_PERCENT = 1
MAX_CPU_PERCENT = 100
def formatCommandSequence(commandSequence: Iterable[str]) -> str:
return shlex.join([str(token) for token in commandSequence])
def normalizeNiceness(niceness) -> int | None:
if niceness is None:
return None
niceness = int(niceness)
if niceness == DISABLED_NICENESS_SENTINEL:
return None
if niceness < MIN_NICENESS or niceness > MAX_NICENESS:
raise ValueError(
f"Niceness must be between {MIN_NICENESS} and {MAX_NICENESS}, "
+ f"or {DISABLED_NICENESS_SENTINEL} to disable."
)
return niceness
def getPresentCpuCount() -> int:
if hasattr(os, 'sched_getaffinity'):
affinity = os.sched_getaffinity(0)
if affinity:
return len(affinity)
cpuCount = os.cpu_count()
return cpuCount if cpuCount and cpuCount > 0 else 1
def normalizeCpuPercent(cpuPercent) -> int | None:
if cpuPercent is None:
return None
cpuPercent = str(cpuPercent).strip()
if cpuPercent.endswith('%'):
percentValue = int(cpuPercent[:-1].strip())
if percentValue == DISABLED_CPU_PERCENT_SENTINEL:
return None
if percentValue < MIN_CPU_PERCENT or percentValue > MAX_CPU_PERCENT:
raise ValueError(
f"CPU percentage must be between {MIN_CPU_PERCENT}% and {MAX_CPU_PERCENT}%, "
+ f"or {DISABLED_CPU_PERCENT_SENTINEL} to disable."
)
return percentValue * getPresentCpuCount()
cpuPercent = int(cpuPercent)
if cpuPercent == DISABLED_CPU_PERCENT_SENTINEL:
return None
if cpuPercent < MIN_CPU_PERCENT:
raise ValueError(
"CPU limit must be a positive absolute value such as 200, "
+ f"a percentage such as 25%, or {DISABLED_CPU_PERCENT_SENTINEL} to disable."
)
return cpuPercent
def getWrappedCommandSequence(commandSequence: List[str], context: dict = None) -> List[str]:
"""
niceness: -20 to 19, disabled when unset
cpu limit: positive absolute cpulimit value, or a machine-wide percentage
When both limits are configured, cpulimit wraps a nice-adjusted command:
cpulimit -l <cpu> -- nice -n <niceness> <command>
"""
resourceLimits = (context or {}).get('resource_limits', {})
niceness = normalizeNiceness(resourceLimits.get('niceness'))
cpu_percent = normalizeCpuPercent(
resourceLimits.get('cpu_limit', resourceLimits.get('cpu_percent'))
)
wrappedCommandSequence = [str(token) for token in commandSequence]
if niceness is not None:
wrappedCommandSequence = ['nice', '-n', str(niceness)] + wrappedCommandSequence
if cpu_percent is not None:
wrappedCommandSequence = ['cpulimit', '-l', str(cpu_percent), '--'] + wrappedCommandSequence
return wrappedCommandSequence
def getProcessTimeoutSeconds(context: dict = None, timeoutSeconds: float = None):
if timeoutSeconds is None:
timeoutSeconds = (context or {}).get('resource_limits', {}).get('timeout_seconds')
if timeoutSeconds is None:
return None
timeoutSeconds = float(timeoutSeconds)
return timeoutSeconds if timeoutSeconds > 0 else None
def executeProcess(
commandSequence: List[str],
directory: str = None,
context: dict = None,
timeoutSeconds: float = None,
):
logger = context['logger'] if context is not None and 'logger' in context else get_ffx_logger()
wrappedCommandSequence = getWrappedCommandSequence(commandSequence, context=context)
timeoutSeconds = getProcessTimeoutSeconds(context=context, timeoutSeconds=timeoutSeconds)
logger.debug(
"executeProcess() cwd=%s timeout=%s command=%s",
directory or '.',
timeoutSeconds if timeoutSeconds is not None else 'none',
formatCommandSequence(wrappedCommandSequence),
)
try:
completed = subprocess.run(
wrappedCommandSequence,
capture_output=True,
text=True,
cwd=directory,
timeout=timeoutSeconds,
check=False,
)
except FileNotFoundError as ex:
error = (
"Command not found while running "
+ f"{formatCommandSequence(wrappedCommandSequence)}: {ex.filename or ex}"
)
logger.error(error)
return '', error, COMMAND_NOT_FOUND_RETURN_CODE
except subprocess.TimeoutExpired as ex:
stdout = ex.stdout or ''
stderr = ex.stderr or ''
error = (
f"Command timed out after {timeoutSeconds} seconds while running "
+ formatCommandSequence(wrappedCommandSequence)
)
if stderr:
error = f"{error}\n{stderr}"
logger.error(error)
return stdout, error, COMMAND_TIMED_OUT_RETURN_CODE
if completed.returncode != 0:
logger.warning(
"executeProcess() rc=%s command=%s",
completed.returncode,
formatCommandSequence(wrappedCommandSequence),
)
return completed.stdout, completed.stderr, completed.returncode