Models

Core Models

Workflow model.

A pipeline execution represents a running instance of a pipeline, containing all stages and their runtime state. The execution tracks: - Overall status - All stages (including synthetic stages) - Trigger information - Timing data

class stabilize.models.workflow.WorkflowType(value)[source]

Bases: Enum

Type of execution.

PIPELINE: A full pipeline execution ORCHESTRATION: An ad-hoc orchestration (single stage)

PIPELINE = 'PIPELINE'
ORCHESTRATION = 'ORCHESTRATION'
class stabilize.models.workflow.Trigger(type='manual', user='anonymous', parameters=<factory>, artifacts=<factory>, payload=<factory>)[source]

Bases: object

Trigger information for a pipeline execution.

Contains details about what triggered the pipeline (manual, webhook, cron, etc.) and any parameters passed to the execution.

Parameters:
  • type (str)

  • user (str)

  • parameters (dict[str, Any])

  • artifacts (list[dict[str, Any]])

  • payload (dict[str, Any])

type: str = 'manual'
user: str = 'anonymous'
parameters: dict[str, Any]
artifacts: list[dict[str, Any]]
payload: dict[str, Any]
to_dict()[source]

Convert trigger to dictionary for storage.

Return type:

dict[str, Any]

classmethod from_dict(data)[source]

Create trigger from dictionary.

Parameters:

data (dict[str, Any])

Return type:

Trigger

class stabilize.models.workflow.PausedDetails(paused_by='', pause_time=None, resume_time=None, paused_ms=0)[source]

Bases: object

Details about a paused execution.

Parameters:
  • paused_by (str)

  • pause_time (int | None)

  • resume_time (int | None)

  • paused_ms (int)

paused_by: str = ''
pause_time: int | None = None
resume_time: int | None = None
paused_ms: int = 0
property is_paused: bool

Check if currently paused.

class stabilize.models.workflow.Workflow(id=<factory>, type=WorkflowType.PIPELINE, application='', name='', status=WorkflowStatus.NOT_STARTED, stages=<factory>, context=<factory>, trigger=<factory>, start_time=None, end_time=None, start_time_expiry=None, is_canceled=False, canceled_by=None, cancellation_reason=None, paused=None, pipeline_config_id=None, is_limit_concurrent=False, max_concurrent_executions=0, keep_waiting_pipelines=False, origin='unknown', config_version=None)[source]

Bases: object

Represents a pipeline execution.

This is the top-level container for all execution state. It holds all stages and tracks the overall execution status.

Parameters:
  • id (str)

  • type (WorkflowType)

  • application (str)

  • name (str)

  • status (WorkflowStatus)

  • stages (list[StageExecution])

  • context (dict[str, Any])

  • trigger (Trigger)

  • start_time (int | None)

  • end_time (int | None)

  • start_time_expiry (int | None)

  • is_canceled (bool)

  • canceled_by (str | None)

  • cancellation_reason (str | None)

  • paused (PausedDetails | None)

  • pipeline_config_id (str | None)

  • is_limit_concurrent (bool)

  • max_concurrent_executions (int)

  • keep_waiting_pipelines (bool)

  • origin (str)

  • config_version (str | None)

id

Unique identifier (ULID)

Type:

str

type

PIPELINE or ORCHESTRATION

Type:

stabilize.models.workflow.WorkflowType

application

Application name this pipeline belongs to

Type:

str

name

Pipeline name

Type:

str

status

Current execution status

Type:

stabilize.models.status.WorkflowStatus

stages

All stages in this execution (including synthetic)

Type:

list[stabilize.models.stage.stage.StageExecution]

trigger

Trigger information

Type:

stabilize.models.workflow.Trigger

start_time

Epoch milliseconds when execution started

Type:

int | None

end_time

Epoch milliseconds when execution completed

Type:

int | None

start_time_expiry

If not started by this time, cancel

Type:

int | None

is_canceled

Whether execution has been canceled

Type:

bool

canceled_by

User who canceled the execution

Type:

str | None

cancellation_reason

Reason for cancellation

Type:

str | None

paused

Pause details if execution is paused

Type:

stabilize.models.workflow.PausedDetails | None

pipeline_config_id

ID of the pipeline configuration

Type:

str | None

is_limit_concurrent

Whether to limit concurrent executions

Type:

bool

max_concurrent_executions

Max concurrent executions allowed

Type:

int

keep_waiting_pipelines

Keep queued pipelines on cancel

Type:

bool

origin

Origin of the execution (e.g., “api”, “deck”)

Type:

str

id: str
type: WorkflowType = 'PIPELINE'
application: str = ''
name: str = ''
status: WorkflowStatus = ('NOT_STARTED', False, False)
stages: list[StageExecution]
context: dict[str, Any]
trigger: Trigger
start_time: int | None = None
end_time: int | None = None
start_time_expiry: int | None = None
is_canceled: bool = False
canceled_by: str | None = None
cancellation_reason: str | None = None
paused: PausedDetails | None = None
pipeline_config_id: str | None = None
is_limit_concurrent: bool = False
max_concurrent_executions: int = 0
keep_waiting_pipelines: bool = False
origin: str = 'unknown'
config_version: str | None = None
add_stage(stage)[source]

Add a stage to this execution.

Parameters:

stage (StageExecution)

Return type:

None

remove_stage(stage_id)[source]

Remove a stage from this execution.

Parameters:

stage_id (str)

Return type:

None

cleanup()[source]

Explicitly break circular references.

Return type:

None

stage_by_id(stage_id)[source]

Get a stage by its ID.

Raises:

ValueError – If stage not found

Parameters:

stage_id (str)

Return type:

StageExecution

stage_by_ref_id(ref_id)[source]

Get a stage by its reference ID.

Parameters:

ref_id (str)

Return type:

StageExecution | None

initial_stages()[source]

Get all initial stages (no dependencies, not synthetic).

These are the stages that can start immediately when execution begins.

Return type:

list[StageExecution]

top_level_stages()[source]

Get all top-level stages (not synthetic).

Return type:

list[StageExecution]

get_context()[source]

Get aggregated context from all stages.

Returns merged outputs from all stages in topological order. Collections are concatenated, latest value wins for non-collections.

Return type:

dict[str, Any]

update_status(status)[source]

Update the execution status.

Parameters:

status (WorkflowStatus)

Return type:

None

cancel(user, reason)[source]

Mark this execution as canceled.

Parameters:
  • user (str)

  • reason (str)

Return type:

None

pause(user)[source]

Pause this execution.

Parameters:

user (str)

Return type:

None

resume()[source]

Resume this execution.

Return type:

None

paused_duration_relative_to(instant_ms)[source]

Get paused duration relative to a given instant.

Returns 0 if not paused or pause was before the instant.

Parameters:

instant_ms (int)

Return type:

int

classmethod create(application, name, stages, trigger=None, pipeline_config_id=None, context=None)[source]

Factory method to create a new pipeline execution.

Parameters:
  • application (str) – Application name

  • name (str) – Pipeline name

  • stages (list[StageExecution]) – List of stages

  • trigger (Trigger | None) – Optional trigger info

  • pipeline_config_id (str | None) – Optional config ID

  • context (dict[str, Any] | None) – Optional execution-level context (for jump tracking, etc.)

Returns:

A new Workflow instance

Return type:

Workflow

classmethod create_orchestration(application, name, stages)[source]

Factory method to create an orchestration (ad-hoc execution).

Parameters:
  • application (str) – Application name

  • name (str) – Orchestration name

  • stages (list[StageExecution]) – List of stages

Returns:

A new Workflow with type ORCHESTRATION

Return type:

Workflow

class stabilize.models.stage.JoinType(value)[source]

Bases: Enum

Join semantics for stages with multiple upstream dependencies.

AND: Wait for ALL upstreams (default, WCP-3). OR: Wait only for activated branches from a paired OR-split (WCP-7). MULTI_MERGE: Fire once per upstream completion, no sync (WCP-8). DISCRIMINATOR: Fire on first upstream completion, ignore rest (WCP-9). N_OF_M: Fire when N of M upstreams complete (WCP-30).

AND = 'AND'
OR = 'OR'
MULTI_MERGE = 'MULTI_MERGE'
DISCRIMINATOR = 'DISCRIMINATOR'
N_OF_M = 'N_OF_M'
class stabilize.models.stage.SplitType(value)[source]

Bases: Enum

Split semantics for stages with multiple downstream branches.

AND: Activate ALL downstream stages (default, WCP-2). OR: Evaluate conditions per downstream, activate matching ones (WCP-6).

AND = 'AND'
OR = 'OR'
class stabilize.models.stage.StageExecution(id=<factory>, ref_id='', type='', name='', status=WorkflowStatus.NOT_STARTED, context=<factory>, outputs=<factory>, tasks=<factory>, requisite_stage_ref_ids=<factory>, parent_stage_id=None, synthetic_stage_owner=None, start_time=None, end_time=None, start_time_expiry=None, scheduled_time=None, version=0, cleanup_on_failure=False, finalizer_names=<factory>, join_type=JoinType.AND, join_threshold=0, split_type=SplitType.AND, split_conditions=<factory>, mi_config=None, deferred_choice_group=None, milestone_ref_id=None, milestone_status=None, mutex_key=None, cancel_region=None, _execution=None)[source]

Bases: StageNavigationMixin

Represents a stage execution within a pipeline.

The DAG structure is encoded in requisite_stage_ref_ids: - Empty set = initial stage (no dependencies) - Single ref_id = sequential dependency - Multiple ref_ids = join point (waits for all)

Parameters:
  • id (str)

  • ref_id (str)

  • type (str)

  • name (str)

  • status (WorkflowStatus)

  • context (dict[str, Any])

  • outputs (dict[str, Any])

  • tasks (list[TaskExecution])

  • requisite_stage_ref_ids (set[str])

  • parent_stage_id (str | None)

  • synthetic_stage_owner (SyntheticStageOwner | None)

  • start_time (int | None)

  • end_time (int | None)

  • start_time_expiry (int | None)

  • scheduled_time (int | None)

  • version (int)

  • cleanup_on_failure (bool)

  • finalizer_names (list[str])

  • join_type (JoinType)

  • join_threshold (int)

  • split_type (SplitType)

  • split_conditions (dict[str, str])

  • mi_config (MultiInstanceConfig | None)

  • deferred_choice_group (str | None)

  • milestone_ref_id (str | None)

  • milestone_status (str | None)

  • mutex_key (str | None)

  • cancel_region (str | None)

  • _execution (weakref.ReferenceType[Workflow] | Workflow | None)

id

Unique identifier (ULID)

Type:

str

ref_id

Reference identifier used for DAG relationships

Type:

str

type

Stage type (e.g., “deploy”, “bake”, “wait”)

Type:

str

name

Human-readable stage name

Type:

str

status

Current execution status

Type:

WorkflowStatus

context

Input parameters and runtime state (stage-scoped)

Type:

dict[str, Any]

outputs

Values available to downstream stages (pipeline-scoped)

Type:

dict[str, Any]

tasks

List of tasks to execute in this stage

Type:

list[TaskExecution]

requisite_stage_ref_ids

Set of ref_ids this stage depends on (DAG edges)

Type:

set[str]

parent_stage_id

Parent stage ID for synthetic stages

Type:

str | None

synthetic_stage_owner

STAGE_BEFORE or STAGE_AFTER for synthetic stages

Type:

SyntheticStageOwner | None

start_time

Epoch milliseconds when stage started

Type:

int | None

end_time

Epoch milliseconds when stage completed

Type:

int | None

start_time_expiry

If stage not started by this time, skip it

Type:

int | None

scheduled_time

When stage is scheduled to execute

Type:

int | None

property allow_sibling_stages_to_continue_on_failure: bool

Check if sibling stages can continue on this stage’s failure.

cancel_region: str | None = None
cleanup()[source]

Explicitly break circular references.

Return type:

None

cleanup_on_failure: bool = False
property continue_pipeline_on_failure: bool

Check if pipeline should continue on stage failure.

classmethod create(type, name, ref_id, context=None, requisite_stage_ref_ids=None)[source]

Factory method to create a new stage execution.

Parameters:
  • type (str) – Stage type

  • name (str) – Human-readable name

  • ref_id (str) – Reference ID for DAG relationships

  • context (dict[str, Any] | None) – Initial context/parameters

  • requisite_stage_ref_ids (set[str] | None) – Dependencies (empty = initial stage)

Returns:

A new StageExecution instance

Return type:

StageExecution

classmethod create_synthetic(type, name, parent, owner, context=None)[source]

Factory method to create a synthetic stage.

Parameters:
  • type (str) – Stage type

  • name (str) – Human-readable name

  • parent (StageExecution) – Parent stage

  • owner (SyntheticStageOwner) – STAGE_BEFORE or STAGE_AFTER

  • context (dict[str, Any] | None) – Initial context/parameters

Returns:

A new synthetic StageExecution

Return type:

StageExecution

deferred_choice_group: str | None = None
determine_status()[source]

Determine the stage status based on before-stages, tasks, and after-stages.

Status priority (highest to lowest): 1. TERMINAL/STOPPED/CANCELED - halt conditions 2. PAUSED/BUFFERED/SUSPENDED - waiting conditions 3. RUNNING - in progress 4. FAILED_CONTINUE - completed with non-fatal failure 5. SUCCEEDED/SKIPPED - completed successfully

After-stages ARE included in status determination to ensure the stage doesn’t report SUCCEEDED while after-stages are still running.

Return type:

WorkflowStatus

end_time: int | None = None
property execution: Workflow

Get the parent pipeline execution.

failure_status(default=WorkflowStatus.TERMINAL)[source]

Get the appropriate failure status based on stage configuration.

Parameters:

default (WorkflowStatus)

Return type:

WorkflowStatus

first_task()[source]

Get the first task in this stage.

Return type:

TaskExecution | None

has_execution()[source]

Check if this stage is attached to an execution.

Return type:

bool

has_tasks()[source]

Check if this stage has any tasks.

Return type:

bool

join_threshold: int = 0
join_type: JoinType = 'AND'
mi_config: MultiInstanceConfig | None = None
milestone_ref_id: str | None = None
milestone_status: str | None = None
mutex_key: str | None = None
name: str = ''
next_task(task)[source]

Get the task that follows the given task.

Parameters:

task (TaskExecution)

Return type:

TaskExecution | None

parent_stage_id: str | None = None
property phase_version: tuple[str, int]

Return (status_name, version) for optimistic locking with phase check.

This combines the current status and version number into a single tuple for phase-aware optimistic locking. When updating a stage, you can pass this to ensure both the version AND the expected status match before the update proceeds.

Returns:

Tuple of (status_name, version) for comparison.

Example

expected = stage.phase_version # … some operation … store.update_status(stage, expected_phase=expected[0])

ref_id: str = ''
scheduled_time: int | None = None
set_execution_strong(value)[source]

Set the parent pipeline execution as a strong reference.

Parameters:

value (Workflow)

Return type:

None

should_fail_pipeline()[source]

Check if stage failure should fail the pipeline.

Return type:

bool

split_type: SplitType = 'AND'
start_time: int | None = None
start_time_expiry: int | None = None
state_snapshot()[source]

Return frozen copy of current state for read-only use.

Creates an immutable snapshot of the current stage state that can be safely passed to external systems, cached, or logged without risk of mutation.

Returns:

Frozen StageStateSnapshot with current state.

Return type:

StageStateSnapshot

status: WorkflowStatus = ('NOT_STARTED', False, False)
synthetic_stage_owner: SyntheticStageOwner | None = None
type: str = ''
version: int = 0
id: str
context: dict[str, Any]
outputs: dict[str, Any]
tasks: list[TaskExecution]
requisite_stage_ref_ids: set[str]
finalizer_names: list[str]
split_conditions: dict[str, str]
class stabilize.models.stage.SyntheticStageOwner(value)[source]

Bases: Enum

Indicates the relationship of a synthetic stage to its parent.

STAGE_BEFORE: Runs before the parent’s tasks STAGE_AFTER: Runs after the parent completes

STAGE_BEFORE = 'STAGE_BEFORE'
STAGE_AFTER = 'STAGE_AFTER'

TaskExecution model.

A task is the smallest unit of work within a stage. Each stage contains one or more tasks that execute sequentially.

class stabilize.models.task.TaskExecution(id=<factory>, name='', implementing_class='', status=WorkflowStatus.NOT_STARTED, start_time=None, end_time=None, stage_start=False, stage_end=False, loop_start=False, loop_end=False, task_exception_details=<factory>, version=0, _stage=None)[source]

Bases: object

Represents a single task within a stage execution.

Tasks are the atomic units of work in a pipeline. They execute sequentially within a stage, and each task produces a result that can include: - Status updates - Context modifications (stage-scoped) - Output values (pipeline-scoped)

Parameters:
  • id (str)

  • name (str)

  • implementing_class (str)

  • status (WorkflowStatus)

  • start_time (int | None)

  • end_time (int | None)

  • stage_start (bool)

  • stage_end (bool)

  • loop_start (bool)

  • loop_end (bool)

  • task_exception_details (dict[str, Any])

  • version (int)

  • _stage (weakref.ReferenceType[StageExecution] | StageExecution | None)

id

Unique identifier for this task execution

Type:

str

name

Human-readable name for this task

Type:

str

implementing_class

Fully qualified class name of the task implementation

Type:

str

status

Current execution status

Type:

WorkflowStatus

start_time

Epoch milliseconds when task started

Type:

int | None

end_time

Epoch milliseconds when task completed

Type:

int | None

stage_start

True if this is the first task in the stage

Type:

bool

stage_end

True if this is the last task in the stage

Type:

bool

loop_start

True if this task starts a loop

Type:

bool

loop_end

True if this task ends a loop

Type:

bool

task_exception_details

Exception information if task failed

Type:

dict[str, Any]

id: str
name: str = ''
implementing_class: str = ''
status: WorkflowStatus = ('NOT_STARTED', False, False)
start_time: int | None = None
end_time: int | None = None
stage_start: bool = False
stage_end: bool = False
loop_start: bool = False
loop_end: bool = False
task_exception_details: dict[str, Any]
version: int = 0
property stage: StageExecution | None

Get the parent stage for this task.

set_stage_strong(value)[source]

Set the parent stage for this task as a strong reference.

Parameters:

value (StageExecution)

Return type:

None

has_stage()[source]

Check if this task is attached to a stage.

Returns True if the task has a valid stage reference that hasn’t been garbage collected. This is useful for safely checking before accessing stage data when using weak references.

Returns:

True if stage reference is valid, False otherwise.

Return type:

bool

property is_stage_start: bool

Check if this task starts the stage.

property is_stage_end: bool

Check if this task ends the stage.

property is_loop_start: bool

Check if this task starts a loop.

property is_loop_end: bool

Check if this task ends a loop.

cleanup()[source]

Explicitly break circular references.

Return type:

None

set_exception_details(exception)[source]

Store exception details for this task.

Parameters:

exception (dict[str, Any])

Return type:

None

property phase_version: tuple[str, int]

Return (status_name, version) for optimistic locking with phase check.

This combines the current status and version number into a single tuple for phase-aware optimistic locking.

Returns:

Tuple of (status_name, version) for comparison.

state_snapshot()[source]

Return frozen copy of current state for read-only use.

Creates an immutable snapshot of the current task state that can be safely passed to external systems, cached, or logged without risk of mutation.

Returns:

Frozen TaskStateSnapshot with current state.

Return type:

TaskStateSnapshot

classmethod create(name, implementing_class, stage_start=False, stage_end=False)[source]

Factory method to create a new task execution.

Parameters:
  • name (str) – Human-readable task name

  • implementing_class (str) – Class name or callable reference for task

  • stage_start (bool) – Whether this is the first task

  • stage_end (bool) – Whether this is the last task

Returns:

A new TaskExecution instance

Return type:

TaskExecution

WorkflowStatus enum.

This enum represents all possible states for executions, stages, and tasks. Each status has two boolean properties: - complete: Whether the entity has finished its work (successfully or not) - halt: Whether downstream execution should be blocked

class stabilize.models.status.WorkflowStatus(value)[source]

Bases: Enum

Execution status enum.

Each value is a tuple of (name, complete, halt).

NOT_STARTED = ('NOT_STARTED', False, False)
RUNNING = ('RUNNING', False, False)
PAUSED = ('PAUSED', False, False)
SUSPENDED = ('SUSPENDED', False, False)
SUCCEEDED = ('SUCCEEDED', True, False)
FAILED_CONTINUE = ('FAILED_CONTINUE', True, False)
TERMINAL = ('TERMINAL', True, True)
CANCELED = ('CANCELED', True, True)
REDIRECT = ('REDIRECT', False, False)
STOPPED = ('STOPPED', True, True)
SKIPPED = ('SKIPPED', True, False)
BUFFERED = ('BUFFERED', False, False)
property is_complete: bool

Indicates that the task/stage/pipeline has finished its work.

Returns True for: CANCELED, SUCCEEDED, STOPPED, SKIPPED, TERMINAL, FAILED_CONTINUE

property is_halt: bool

Indicates an abnormal completion - nothing downstream should run.

Returns True for: TERMINAL, CANCELED, STOPPED

property is_successful: bool

Check if this status represents a successful completion.

property is_failure: bool

Check if this status represents a failure.

property is_skipped: bool

Check if this status is SKIPPED.

property is_dirty: bool

True if entity is mid-execution (RUNNING, REDIRECT).

Dirty states indicate the entity is actively being processed and may have uncommitted changes. Useful for: - Determining if optimistic locking should be used - Identifying stages that need recovery after crash - Phase-aware state transitions

exception stabilize.models.status.InvalidStateTransitionError(current, target, entity_type='entity', entity_id=None)[source]

Bases: Exception

Raised when an invalid state transition is attempted.

Parameters:
Return type:

None

stabilize.models.status.can_transition(current, target)[source]

Check if a state transition is valid.

Parameters:
Returns:

True if the transition is valid, False otherwise

Return type:

bool

stabilize.models.status.validate_transition(current, target, entity_type='entity', entity_id=None)[source]

Validate and raise if state transition is invalid.

Parameters:
  • current (WorkflowStatus) – The current workflow status

  • target (WorkflowStatus) – The desired target status

  • entity_type (str) – Type of entity (workflow, stage, task) for error message

  • entity_id (str | None) – Optional ID of the entity for error message

Raises:

InvalidStateTransitionError – If the transition is not valid

Return type:

None

Control-Flow Pattern Models

Multi-instance configuration for WCP-12 through WCP-15.

Defines how a stage spawns and synchronizes multiple parallel instances.

class stabilize.models.multi_instance.MultiInstanceConfig(count=0, count_from_context='', sync_on_complete=True, allow_dynamic=False, collection_from_context='', join_threshold=0, cancel_remaining=False)[source]

Bases: object

Configuration for multi-instance stage execution.

Supports several patterns: - WCP-12: count > 0, sync_on_complete=False (fire and forget) - WCP-13: count > 0, sync_on_complete=True (design-time known count) - WCP-14: count_from_context set, sync_on_complete=True (runtime known count) - WCP-15: allow_dynamic=True (count not known until last instance)

Parameters:
  • count (int)

  • count_from_context (str)

  • sync_on_complete (bool)

  • allow_dynamic (bool)

  • collection_from_context (str)

  • join_threshold (int)

  • cancel_remaining (bool)

count

Fixed number of instances (0 means use count_from_context).

Type:

int

count_from_context

Context key holding the instance count at runtime.

Type:

str

sync_on_complete

Whether to wait for all instances before continuing.

Type:

bool

allow_dynamic

Whether new instances can be added during execution.

Type:

bool

collection_from_context

Context key holding items to iterate over.

Type:

str

join_threshold

If set, continue after this many complete (N-of-M).

Type:

int

cancel_remaining

Cancel remaining instances after threshold reached.

Type:

bool

count: int = 0
count_from_context: str = ''
sync_on_complete: bool = True
allow_dynamic: bool = False
collection_from_context: str = ''
join_threshold: int = 0
cancel_remaining: bool = False
to_dict()[source]

Serialize to dictionary.

Return type:

dict[str, Any]

classmethod from_dict(data)[source]

Deserialize from dictionary.

Parameters:

data (dict[str, Any])

Return type:

MultiInstanceConfig

Safe expression evaluator for workflow control-flow conditions.

Used by OR-split (WCP-6) to evaluate per-downstream conditions, and by other patterns that need runtime condition evaluation.

Only supports a safe subset of operations - no arbitrary code execution. Expressions can reference context values using dot notation or bracket notation.

exception stabilize.expressions.ExpressionError[source]

Bases: Exception

Raised when an expression cannot be evaluated.

stabilize.expressions.evaluate_expression(expression, context)[source]

Evaluate a simple expression against a context dictionary.

Supports: - Boolean literals: true, false, True, False - String/number/None literals - Context lookups: context_key, context[“key”] - Comparisons: ==, !=, <, <=, >, >=, in, not in - Boolean operators: and, or, not - Attribute access for nested dicts: a.b.c

Does NOT support: - Function calls - Imports - Assignments - Arbitrary code

Parameters:
  • expression (str) – The expression string to evaluate

  • context (dict[str, Any]) – Dictionary of values available to the expression

Returns:

The result of evaluating the expression

Raises:

ExpressionError – If the expression is invalid or uses unsupported features

Return type:

Any