Source code for stabilize.tasks.python

"""
PythonTask for executing Python code in isolated subprocess.

This module provides a production-ready PythonTask with:
- Inline script execution
- Script file execution
- Module + function execution
- INPUT/RESULT variable convention for data passing
- Full subprocess isolation with timeout support
"""

from __future__ import annotations

import base64
import json
import logging
import os
import re
import subprocess
import sys
import tempfile
from pathlib import Path
from typing import TYPE_CHECKING, Any

from stabilize.tasks.interface import Task
from stabilize.tasks.result import TaskResult

_VALID_IDENTIFIER_RE = re.compile(r"^[a-zA-Z_][a-zA-Z0-9_.]*$")

if TYPE_CHECKING:
    from stabilize.models.stage import StageExecution

logger = logging.getLogger(__name__)


[docs] class PythonTask(Task): """ Execute Python code in isolated subprocess. Supports three execution modes: 1. Inline script: Pass Python code as a string 2. Script file: Load and execute a Python file 3. Module + function: Import a module and call a function All modes run in a subprocess for isolation and hard timeout enforcement. Context Parameters: # Script execution (choose one): script (str): Inline Python code to execute script_file (str): Path to Python script file module (str): Python module path (e.g., "myapp.tasks.validate") function (str): Function name to call (requires module) # Inputs: inputs (dict): Input variables, available as INPUT in script (optional) args (list): Command line arguments (optional) # Execution: python_path (str): Python interpreter path (default: current interpreter) timeout (int): Execution timeout in seconds (default: 60) cwd (str): Working directory (optional) env (dict): Environment variables to add (optional) continue_on_failure (bool): Return failed_continue on error (default: False) Outputs: stdout (str): Standard output from script stderr (str): Standard error from script exit_code (int): Process exit code result (any): Value of RESULT variable if set in script Notes: - Scripts access input data via the INPUT dict - Scripts set return value via the RESULT variable - RESULT must be JSON-serializable - Module mode: imports module.function and calls with INPUT as argument - Upstream stage outputs are automatically available in INPUT Examples: # Inline script context = { "script": ''' result = sum(INPUT["numbers"]) RESULT = {"sum": result, "count": len(INPUT["numbers"])} ''', "inputs": {"numbers": [1, 2, 3, 4, 5]} } # Script file context = { "script_file": "/path/to/script.py", "inputs": {"config": {"debug": True}} } # Module + function context = { "module": "myapp.validators", "function": "validate_input", "inputs": {"data": {"name": "test"}} } """ # Wrapper template for script mode - handles INPUT/RESULT SCRIPT_WRAPPER = """\ import json import sys import base64 INPUT = json.loads(base64.b64decode('{inputs_json_b64}').decode()) # User script {script} # Output result if RESULT variable was set if 'RESULT' in dir(): print("__PYTHONTASK_RESULT_START__") print(json.dumps(RESULT)) print("__PYTHONTASK_RESULT_END__") """ # Wrapper template for module+function mode MODULE_WRAPPER = """\ import json import sys import base64 INPUT = json.loads(base64.b64decode('{inputs_json_b64}').decode()) from {module} import {function} RESULT = {function}(INPUT) # Output result print("__PYTHONTASK_RESULT_START__") print(json.dumps(RESULT)) print("__PYTHONTASK_RESULT_END__") """ RESULT_START_MARKER = "__PYTHONTASK_RESULT_START__" RESULT_END_MARKER = "__PYTHONTASK_RESULT_END__"
[docs] def execute(self, stage: StageExecution) -> TaskResult: """Execute Python code based on context parameters.""" # Get execution mode parameters script = stage.context.get("script") script_file = stage.context.get("script_file") module = stage.context.get("module") function = stage.context.get("function") # Validate execution mode mode_count = sum(bool(x) for x in [script, script_file, module]) if mode_count == 0: return TaskResult.terminal(error="One of 'script', 'script_file', or 'module' must be specified") if mode_count > 1: return TaskResult.terminal(error="Only one of 'script', 'script_file', or 'module' can be specified") if module and not function: return TaskResult.terminal(error="'function' is required when using 'module' mode") # Get execution parameters args = stage.context.get("args", []) python_path = stage.context.get("python_path", sys.executable) timeout = stage.context.get("timeout", 60) cwd = stage.context.get("cwd") env_vars = stage.context.get("env", {}) continue_on_failure = stage.context.get("continue_on_failure", False) # Build inputs: merge ancestor outputs + stage context + explicit inputs # This allows upstream stage outputs to flow to downstream Python tasks # Exclude internal parameters from being passed as inputs internal_keys = { "script", "script_file", "module", "function", "args", "python_path", "timeout", "cwd", "env", "continue_on_failure", "inputs", } # Start with ancestor outputs (upstream stage data) ancestor_outputs: dict[str, Any] = {} try: for ancestor in reversed(stage.ancestors()): if ancestor.outputs: ancestor_outputs.update(ancestor.outputs) except (AttributeError, ValueError): # Stage might not be attached to an execution yet pass # Filter out internal keys from context base_inputs = {k: v for k, v in stage.context.items() if k not in internal_keys} # Merge: ancestor outputs -> stage context -> explicit inputs explicit_inputs = stage.context.get("inputs", {}) inputs = {**ancestor_outputs, **base_inputs, **explicit_inputs} # Generate wrapped script try: wrapped_script = self._generate_script( script=script, script_file=script_file, module=module, function=function, inputs=inputs, ) except Exception as e: return TaskResult.terminal(error=f"Failed to generate script: {e}") # Write to temp file tmp_path = None try: with tempfile.NamedTemporaryFile(mode="w", suffix=".py", delete=False) as tmp: tmp.write(wrapped_script) tmp.flush() tmp_path = tmp.name # Build command cmd = [python_path, tmp_path] + list(args) # Build environment env = os.environ.copy() env.update(env_vars) logger.debug("PythonTask executing: %s", " ".join(cmd)) # Execute result = subprocess.run( cmd, capture_output=True, text=True, timeout=timeout, cwd=cwd, env=env, ) max_output_size = stage.context.get("max_output_size", 10 * 1024 * 1024) stdout = result.stdout stderr = result.stderr exit_code = result.returncode truncated = False if len(stdout) > max_output_size: stdout = stdout[:max_output_size] truncated = True if len(stderr) > max_output_size: stderr = stderr[:max_output_size] truncated = True # Extract RESULT if present script_result = self._extract_result(stdout) # Clean stdout (remove result markers) clean_stdout = self._clean_stdout(stdout) outputs: dict[str, Any] = { "stdout": clean_stdout, "stderr": stderr, "exit_code": exit_code, "truncated": truncated, } if script_result is not None: outputs["result"] = script_result if exit_code == 0: logger.debug("PythonTask success, result: %s", str(script_result)[:100]) return TaskResult.success(outputs=outputs) else: error_msg = f"Python script failed with exit code {exit_code}" if stderr: error_msg += f": {stderr[:200]}" if continue_on_failure: return TaskResult.failed_continue(error=error_msg, outputs=outputs) return TaskResult.terminal(error=error_msg, context=outputs) except subprocess.TimeoutExpired: error_msg = f"Python script timed out after {timeout}s" if continue_on_failure: return TaskResult.failed_continue(error=error_msg) return TaskResult.terminal(error=error_msg) except Exception as e: error_msg = f"Python execution failed: {e}" if continue_on_failure: return TaskResult.failed_continue(error=error_msg) return TaskResult.terminal(error=error_msg) finally: # Clean up temp file if tmp_path and os.path.exists(tmp_path): try: os.unlink(tmp_path) except OSError: pass
def _generate_script( self, script: str | None, script_file: str | None, module: str | None, function: str | None, inputs: dict[str, Any], ) -> str: """Generate the wrapped script to execute.""" inputs_json_b64 = base64.b64encode(json.dumps(inputs).encode()).decode() if module and function: if not _VALID_IDENTIFIER_RE.match(module): raise ValueError(f"Invalid module name: {module!r}") if not _VALID_IDENTIFIER_RE.match(function): raise ValueError(f"Invalid function name: {function!r}") return self.MODULE_WRAPPER.format( inputs_json_b64=inputs_json_b64, module=module, function=function, ) if script_file: if ".." in script_file: raise ValueError(f"Path traversal blocked in script_file: {script_file!r}") script_path = Path(script_file) if not script_path.exists(): raise FileNotFoundError(f"Script file not found: {script_file}") script = script_path.read_text() assert script is not None return self.SCRIPT_WRAPPER.format( inputs_json_b64=inputs_json_b64, script=script, ) def _extract_result(self, stdout: str) -> Any: """Extract RESULT value from stdout if present.""" if self.RESULT_START_MARKER not in stdout: return None try: start = stdout.index(self.RESULT_START_MARKER) + len(self.RESULT_START_MARKER) end = stdout.index(self.RESULT_END_MARKER) result_json = stdout[start:end].strip() return json.loads(result_json) except (ValueError, json.JSONDecodeError): return None def _clean_stdout(self, stdout: str) -> str: """Remove result markers from stdout.""" if self.RESULT_START_MARKER not in stdout: return stdout.strip() try: start = stdout.index(self.RESULT_START_MARKER) end = stdout.index(self.RESULT_END_MARKER) + len(self.RESULT_END_MARKER) clean = stdout[:start] + stdout[end:] return clean.strip() except ValueError: return stdout.strip()