Queue

class stabilize.queue.processor.MessageHandler[source]

Bases: Generic[M]

Base class for message handlers.

Each handler processes a specific type of message.

handle(message)[source]

Handle a message.

Parameters:

message (M) – The message to handle

Return type:

None

property message_type: type[M]

Return the type of message this handler processes.

class stabilize.queue.processor.QueueProcessor(queue, config=None, store=None, handler_config=None, task_registry=None, bulkhead_manager=None, circuit_factory=None)[source]

Bases: QueueProcessorMixin

Processes messages from a queue using registered handlers.

The processor polls the queue at regular intervals and dispatches messages to appropriate handlers. Handlers run in a thread pool for concurrent processing.

When store and task_registry are both provided, all 12 default handlers are registered automatically — no manual registration needed.

Example

queue = SqliteQueue(“sqlite:///workflow.db”, table_name=”queue_messages”) store = SqliteWorkflowStore(“sqlite:///workflow.db”, create_tables=True) registry = TaskRegistry() processor = QueueProcessor(queue, store=store, task_registry=registry) processor.start()

Parameters:
  • queue (Queue)

  • config (QueueProcessorConfig | None)

  • store (WorkflowStore | None)

  • handler_config (HandlerConfig | None)

  • task_registry (TaskRegistry | None)

  • bulkhead_manager (TaskBulkheadManager | None)

  • circuit_factory (WorkflowCircuitFactory | None)

property active_count: int

Get the number of actively processing messages.

property is_running: bool

Check if the processor is running.

property is_stopping: bool

Check if stop has been requested but not yet completed.

process_all(timeout=60.0)[source]

Process all messages synchronously until queue is empty.

Thread-safe: uses a processing lock to prevent concurrent calls. Also performs periodic DLQ cleanup for expired messages.

Parameters:

timeout (float) – Maximum time to wait for processing

Returns:

Number of messages processed

Return type:

int

process_one()[source]

Process a single message synchronously.

Useful for testing and debugging.

Returns:

True if a message was processed, False otherwise

Return type:

bool

register_handler(handler)[source]

Register a message handler.

Raises ValueError if a handler for the same message type is already registered. Use replace_handler() to override an existing handler.

Parameters:

handler (MessageHandler[Any]) – The handler to register

Raises:

ValueError – If a handler for this message type is already registered.

Return type:

None

register_handler_func(message_type, handler_func)[source]

Register a handler function for a message type.

Parameters:
  • message_type (type[M]) – The type of message to handle

  • handler_func (Callable[[M], None]) – Function to call with the message

Return type:

None

replace_handler(handler)[source]

Replace an existing handler.

Parameters:

handler (MessageHandler[Any]) – The new handler to use

Raises:

ValueError – If no handler is registered for this message type.

Return type:

None

request_stop()[source]

Request graceful stop without blocking.

Sets the stopping flag to stop accepting new messages, but doesn’t wait for active tasks to complete. Use the active_count property to monitor progress.

Example

processor.request_stop() while processor.active_count > 0:

time.sleep(0.1)

processor.stop()

Return type:

None

start()[source]

Start the queue processor.

Return type:

None

stop(wait=True)[source]

Stop the queue processor.

Parameters:

wait (bool) – Whether to wait for pending messages to complete

Return type:

None

class stabilize.queue.processor.QueueProcessorConfig(poll_frequency_ms=50, max_workers=10, retry_delay=datetime.timedelta(seconds=15), stop_on_error=False, enable_deduplication=True)[source]

Bases: object

Configuration for the queue processor.

Values can be loaded from environment variables via HandlerConfig. See HandlerConfig documentation for environment variable names.

Parameters:
  • poll_frequency_ms (int)

  • max_workers (int)

  • retry_delay (timedelta)

  • stop_on_error (bool)

  • enable_deduplication (bool)

enable_deduplication: bool = True
classmethod from_handler_config(handler_config=None)[source]

Create QueueProcessorConfig from HandlerConfig.

Parameters:

handler_config (HandlerConfig | None) – HandlerConfig to use. If None, loads from environment.

Returns:

QueueProcessorConfig with values from HandlerConfig

Return type:

QueueProcessorConfig

max_workers: int = 10
poll_frequency_ms: int = 50
retry_delay: timedelta = datetime.timedelta(seconds=15)
stop_on_error: bool = False
class stabilize.queue.processor.SynchronousQueueProcessor(queue, store=None, task_registry=None, config=None, handler_config=None, bulkhead_manager=None, circuit_factory=None)[source]

Bases: QueueProcessor

A synchronous queue processor that processes messages immediately.

Useful for testing where you want deterministic execution order.

Accepts the same parameters as QueueProcessor for auto-registration.

Parameters:
  • queue (Queue)

  • store (WorkflowStore | None)

  • task_registry (TaskRegistry | None)

  • config (QueueProcessorConfig | None)

  • handler_config (HandlerConfig | None)

  • bulkhead_manager (TaskBulkheadManager | None)

  • circuit_factory (WorkflowCircuitFactory | None)

push_and_process(message)[source]

Push a message and process it immediately.

Parameters:

message (Message) – The message to push and process

Return type:

None

start()[source]

No-op for synchronous processor.

Return type:

None

stop(wait=True)[source]

No-op for synchronous processor.

Parameters:

wait (bool)

Return type:

None

Message types for the queue-based execution engine.

This module defines all message types used in the pipeline execution queue.

class stabilize.queue.messages.Message(message_id=None, created_at=<factory>, attempts=0, max_attempts=10, last_error=None, last_error_type=None)[source]

Bases: object

Base class for all queue messages.

Each message includes metadata for tracking and debugging.

Parameters:
  • message_id (str | None)

  • created_at (datetime)

  • attempts (int)

  • max_attempts (int)

  • last_error (str | None)

  • last_error_type (str | None)

message_id: str | None = None
created_at: datetime
attempts: int = 0
max_attempts: int = 10
last_error: str | None = None
last_error_type: str | None = None
copy_with_attempts(attempts)[source]

Create a copy with updated attempt count.

Parameters:

attempts (int)

Return type:

Message

set_error_context(error)[source]

Store error context from a failed attempt.

This context is preserved across reschedules to help debugging and provide visibility into why messages are being retried.

Parameters:

error (Exception)

Return type:

None

class stabilize.queue.messages.WorkflowLevel(message_id=None, created_at=<factory>, attempts=0, max_attempts=10, last_error=None, last_error_type=None, execution_type='PIPELINE', execution_id='')[source]

Bases: Message

Base class for execution-level messages.

These messages target a specific pipeline execution.

Parameters:
  • message_id (str | None)

  • created_at (datetime)

  • attempts (int)

  • max_attempts (int)

  • last_error (str | None)

  • last_error_type (str | None)

  • execution_type (str)

  • execution_id (str)

execution_type: str = 'PIPELINE'
execution_id: str = ''
class stabilize.queue.messages.StartWorkflow(message_id=None, created_at=<factory>, attempts=0, max_attempts=10, last_error=None, last_error_type=None, execution_type='PIPELINE', execution_id='')[source]

Bases: WorkflowLevel

Message to start a pipeline execution.

Triggers the beginning of pipeline execution, starting initial stages.

Parameters:
  • message_id (str | None)

  • created_at (datetime)

  • attempts (int)

  • max_attempts (int)

  • last_error (str | None)

  • last_error_type (str | None)

  • execution_type (str)

  • execution_id (str)

class stabilize.queue.messages.CompleteWorkflow(message_id=None, created_at=<factory>, attempts=0, max_attempts=10, last_error=None, last_error_type=None, execution_type='PIPELINE', execution_id='', retry_count=0)[source]

Bases: WorkflowLevel

Message to complete a pipeline execution.

Sent when all stages have completed or execution should be finalized.

Parameters:
  • message_id (str | None)

  • created_at (datetime)

  • attempts (int)

  • max_attempts (int)

  • last_error (str | None)

  • last_error_type (str | None)

  • execution_type (str)

  • execution_id (str)

  • retry_count (int)

retry_count

Number of times this message has been re-queued while waiting for stages to complete. Used to prevent infinite loops.

Type:

int

retry_count: int = 0
class stabilize.queue.messages.CancelWorkflow(message_id=None, created_at=<factory>, attempts=0, max_attempts=10, last_error=None, last_error_type=None, execution_type='PIPELINE', execution_id='', user='', reason='')[source]

Bases: WorkflowLevel

Message to cancel a pipeline execution.

Marks the execution as canceled and stops all running stages.

Parameters:
  • message_id (str | None)

  • created_at (datetime)

  • attempts (int)

  • max_attempts (int)

  • last_error (str | None)

  • last_error_type (str | None)

  • execution_type (str)

  • execution_id (str)

  • user (str)

  • reason (str)

user: str = ''
reason: str = ''
class stabilize.queue.messages.StartWaitingWorkflows(message_id=None, created_at=<factory>, attempts=0, max_attempts=10, last_error=None, last_error_type=None, pipeline_config_id='', purge_queue=False)[source]

Bases: Message

Message to start any queued/waiting executions for a pipeline config.

Sent after an execution completes when concurrent execution limits are enabled.

Parameters:
  • message_id (str | None)

  • created_at (datetime)

  • attempts (int)

  • max_attempts (int)

  • last_error (str | None)

  • last_error_type (str | None)

  • pipeline_config_id (str)

  • purge_queue (bool)

pipeline_config_id: str = ''
purge_queue: bool = False
class stabilize.queue.messages.StageLevel(message_id=None, created_at=<factory>, attempts=0, max_attempts=10, last_error=None, last_error_type=None, execution_type='PIPELINE', execution_id='', stage_id='', retry_count=0)[source]

Bases: WorkflowLevel

Base class for stage-level messages.

These messages target a specific stage within an execution.

Parameters:
  • message_id (str | None)

  • created_at (datetime)

  • attempts (int)

  • max_attempts (int)

  • last_error (str | None)

  • last_error_type (str | None)

  • execution_type (str)

  • execution_id (str)

  • stage_id (str)

  • retry_count (int)

stage_id

The ID of the stage this message targets

Type:

str

retry_count

Number of times this message has been re-queued while waiting for upstream stages to complete. Used to prevent infinite loops.

Type:

int

stage_id: str = ''
retry_count: int = 0
classmethod from_execution_level(msg, stage_id)[source]

Create a stage-level message from an execution-level message.

Parameters:
Return type:

StageLevel

class stabilize.queue.messages.StartStage(message_id=None, created_at=<factory>, attempts=0, max_attempts=10, last_error=None, last_error_type=None, execution_type='PIPELINE', execution_id='', stage_id='', retry_count=0)[source]

Bases: StageLevel

Message to start a stage.

Checks if upstream stages are complete, then plans and starts the stage.

Parameters:
  • message_id (str | None)

  • created_at (datetime)

  • attempts (int)

  • max_attempts (int)

  • last_error (str | None)

  • last_error_type (str | None)

  • execution_type (str)

  • execution_id (str)

  • stage_id (str)

  • retry_count (int)

class stabilize.queue.messages.CompleteStage(message_id=None, created_at=<factory>, attempts=0, max_attempts=10, last_error=None, last_error_type=None, execution_type='PIPELINE', execution_id='', stage_id='', retry_count=0)[source]

Bases: StageLevel

Message to complete a stage.

Determines stage status, plans after stages, and triggers downstream.

Parameters:
  • message_id (str | None)

  • created_at (datetime)

  • attempts (int)

  • max_attempts (int)

  • last_error (str | None)

  • last_error_type (str | None)

  • execution_type (str)

  • execution_id (str)

  • stage_id (str)

  • retry_count (int)

class stabilize.queue.messages.SkipStage(message_id=None, created_at=<factory>, attempts=0, max_attempts=10, last_error=None, last_error_type=None, execution_type='PIPELINE', execution_id='', stage_id='', retry_count=0)[source]

Bases: StageLevel

Message to skip a stage.

Sets stage status to SKIPPED and triggers downstream stages.

Parameters:
  • message_id (str | None)

  • created_at (datetime)

  • attempts (int)

  • max_attempts (int)

  • last_error (str | None)

  • last_error_type (str | None)

  • execution_type (str)

  • execution_id (str)

  • stage_id (str)

  • retry_count (int)

class stabilize.queue.messages.CancelStage(message_id=None, created_at=<factory>, attempts=0, max_attempts=10, last_error=None, last_error_type=None, execution_type='PIPELINE', execution_id='', stage_id='', retry_count=0)[source]

Bases: StageLevel

Message to cancel a stage.

Cancels any running tasks and marks stage as canceled.

Parameters:
  • message_id (str | None)

  • created_at (datetime)

  • attempts (int)

  • max_attempts (int)

  • last_error (str | None)

  • last_error_type (str | None)

  • execution_type (str)

  • execution_id (str)

  • stage_id (str)

  • retry_count (int)

class stabilize.queue.messages.RestartStage(message_id=None, created_at=<factory>, attempts=0, max_attempts=10, last_error=None, last_error_type=None, execution_type='PIPELINE', execution_id='', stage_id='', retry_count=0)[source]

Bases: StageLevel

Message to restart a stage.

Resets stage status and re-executes from the beginning.

Parameters:
  • message_id (str | None)

  • created_at (datetime)

  • attempts (int)

  • max_attempts (int)

  • last_error (str | None)

  • last_error_type (str | None)

  • execution_type (str)

  • execution_id (str)

  • stage_id (str)

  • retry_count (int)

class stabilize.queue.messages.ResumeStage(message_id=None, created_at=<factory>, attempts=0, max_attempts=10, last_error=None, last_error_type=None, execution_type='PIPELINE', execution_id='', stage_id='', retry_count=0)[source]

Bases: StageLevel

Message to resume a paused stage.

Continues execution from where it was paused.

Parameters:
  • message_id (str | None)

  • created_at (datetime)

  • attempts (int)

  • max_attempts (int)

  • last_error (str | None)

  • last_error_type (str | None)

  • execution_type (str)

  • execution_id (str)

  • stage_id (str)

  • retry_count (int)

class stabilize.queue.messages.ContinueParentStage(message_id=None, created_at=<factory>, attempts=0, max_attempts=10, last_error=None, last_error_type=None, execution_type='PIPELINE', execution_id='', stage_id='', retry_count=0, phase=SyntheticStageOwner.STAGE_AFTER)[source]

Bases: StageLevel

Message to continue parent stage after synthetic stage completes.

Sent when a synthetic stage completes to notify its parent.

Parameters:
  • message_id (str | None)

  • created_at (datetime)

  • attempts (int)

  • max_attempts (int)

  • last_error (str | None)

  • last_error_type (str | None)

  • execution_type (str)

  • execution_id (str)

  • stage_id (str)

  • retry_count (int)

  • phase (SyntheticStageOwner)

phase: SyntheticStageOwner = 'STAGE_AFTER'
class stabilize.queue.messages.JumpToStage(message_id=None, created_at=<factory>, attempts=0, max_attempts=10, last_error=None, last_error_type=None, execution_type='PIPELINE', execution_id='', stage_id='', retry_count=0, target_stage_ref_id='', jump_context=<factory>, jump_outputs=<factory>)[source]

Bases: StageLevel

Message to jump to a different stage (dynamic routing).

Resets the target stage to NOT_STARTED and starts it with merged context. Used by TaskResult.jump_to() for dynamic flow control.

Parameters:
  • message_id (str | None)

  • created_at (datetime)

  • attempts (int)

  • max_attempts (int)

  • last_error (str | None)

  • last_error_type (str | None)

  • execution_type (str)

  • execution_id (str)

  • stage_id (str)

  • retry_count (int)

  • target_stage_ref_id (str)

  • jump_context (dict[str, Any])

  • jump_outputs (dict[str, Any])

target_stage_ref_id

The ref_id of the stage to jump to

Type:

str

jump_context

Context to merge into target stage

Type:

dict[str, Any]

jump_outputs

Outputs to make available to target stage

Type:

dict[str, Any]

target_stage_ref_id: str = ''
jump_context: dict[str, Any]
jump_outputs: dict[str, Any]
class stabilize.queue.messages.SignalStage(message_id=None, created_at=<factory>, attempts=0, max_attempts=10, last_error=None, last_error_type=None, execution_type='PIPELINE', execution_id='', stage_id='', retry_count=0, signal_name='', signal_data=<factory>, persistent=False)[source]

Bases: StageLevel

Message to signal a suspended stage (WCP-23, WCP-24).

For transient triggers (WCP-23), the signal is discarded if the stage is not currently SUSPENDED. For persistent triggers (WCP-24), the signal is buffered if the stage is not ready.

Parameters:
  • message_id (str | None)

  • created_at (datetime)

  • attempts (int)

  • max_attempts (int)

  • last_error (str | None)

  • last_error_type (str | None)

  • execution_type (str)

  • execution_id (str)

  • stage_id (str)

  • retry_count (int)

  • signal_name (str)

  • signal_data (dict[str, Any])

  • persistent (bool)

signal_name

Name/type of the signal

Type:

str

signal_data

Payload data carried by the signal

Type:

dict[str, Any]

persistent

If True, buffer signal when stage not ready (WCP-24)

Type:

bool

signal_name: str = ''
signal_data: dict[str, Any]
persistent: bool = False
class stabilize.queue.messages.CancelRegion(message_id=None, created_at=<factory>, attempts=0, max_attempts=10, last_error=None, last_error_type=None, execution_type='PIPELINE', execution_id='', region='')[source]

Bases: WorkflowLevel

Message to cancel all stages in a named region (WCP-25).

Finds all stages with matching cancel_region and pushes CancelStage for each active one.

Parameters:
  • message_id (str | None)

  • created_at (datetime)

  • attempts (int)

  • max_attempts (int)

  • last_error (str | None)

  • last_error_type (str | None)

  • execution_type (str)

  • execution_id (str)

  • region (str)

region

The cancel region name to target

Type:

str

region: str = ''
class stabilize.queue.messages.AddMultiInstance(message_id=None, created_at=<factory>, attempts=0, max_attempts=10, last_error=None, last_error_type=None, execution_type='PIPELINE', execution_id='', stage_id='', retry_count=0, instance_context=<factory>)[source]

Bases: StageLevel

Message to add a new instance to a running multi-instance activity (WCP-15).

Creates a new parallel instance for an MI stage that allows dynamic additions.

Parameters:
  • message_id (str | None)

  • created_at (datetime)

  • attempts (int)

  • max_attempts (int)

  • last_error (str | None)

  • last_error_type (str | None)

  • execution_type (str)

  • execution_id (str)

  • stage_id (str)

  • retry_count (int)

  • instance_context (dict[str, Any])

instance_context

Context for the new instance

Type:

dict[str, Any]

instance_context: dict[str, Any]
class stabilize.queue.messages.TaskLevel(message_id=None, created_at=<factory>, attempts=0, max_attempts=10, last_error=None, last_error_type=None, execution_type='PIPELINE', execution_id='', stage_id='', retry_count=0, task_id='')[source]

Bases: StageLevel

Base class for task-level messages.

These messages target a specific task within a stage.

Parameters:
  • message_id (str | None)

  • created_at (datetime)

  • attempts (int)

  • max_attempts (int)

  • last_error (str | None)

  • last_error_type (str | None)

  • execution_type (str)

  • execution_id (str)

  • stage_id (str)

  • retry_count (int)

  • task_id (str)

task_id: str = ''
classmethod from_stage_level(msg, task_id)[source]

Create a task-level message from a stage-level message.

Parameters:
Return type:

TaskLevel

class stabilize.queue.messages.StartTask(message_id=None, created_at=<factory>, attempts=0, max_attempts=10, last_error=None, last_error_type=None, execution_type='PIPELINE', execution_id='', stage_id='', retry_count=0, task_id='')[source]

Bases: TaskLevel

Message to start a task.

Sets task status to RUNNING and triggers RunTask.

Parameters:
  • message_id (str | None)

  • created_at (datetime)

  • attempts (int)

  • max_attempts (int)

  • last_error (str | None)

  • last_error_type (str | None)

  • execution_type (str)

  • execution_id (str)

  • stage_id (str)

  • retry_count (int)

  • task_id (str)

class stabilize.queue.messages.RunTask(message_id=None, created_at=<factory>, attempts=0, max_attempts=10, last_error=None, last_error_type=None, execution_type='PIPELINE', execution_id='', stage_id='', retry_count=0, task_id='', task_type='')[source]

Bases: TaskLevel

Message to execute a task.

Runs the task implementation and handles the result.

Parameters:
  • message_id (str | None)

  • created_at (datetime)

  • attempts (int)

  • max_attempts (int)

  • last_error (str | None)

  • last_error_type (str | None)

  • execution_type (str)

  • execution_id (str)

  • stage_id (str)

  • retry_count (int)

  • task_id (str)

  • task_type (str)

task_type: str = ''
class stabilize.queue.messages.CompleteTask(message_id=None, created_at=<factory>, attempts=0, max_attempts=10, last_error=None, last_error_type=None, execution_type='PIPELINE', execution_id='', stage_id='', retry_count=0, task_id='', status=WorkflowStatus.SUCCEEDED, original_status=None)[source]

Bases: TaskLevel

Message to complete a task.

Updates task status and triggers next task or stage completion.

Parameters:
  • message_id (str | None)

  • created_at (datetime)

  • attempts (int)

  • max_attempts (int)

  • last_error (str | None)

  • last_error_type (str | None)

  • execution_type (str)

  • execution_id (str)

  • stage_id (str)

  • retry_count (int)

  • task_id (str)

  • status (WorkflowStatus)

  • original_status (WorkflowStatus | None)

status: WorkflowStatus = ('SUCCEEDED', True, False)
original_status: WorkflowStatus | None = None
class stabilize.queue.messages.PauseTask(message_id=None, created_at=<factory>, attempts=0, max_attempts=10, last_error=None, last_error_type=None, execution_type='PIPELINE', execution_id='', stage_id='', retry_count=0, task_id='')[source]

Bases: TaskLevel

Message to pause a task.

Used when execution is paused - task will resume when execution resumes.

Parameters:
  • message_id (str | None)

  • created_at (datetime)

  • attempts (int)

  • max_attempts (int)

  • last_error (str | None)

  • last_error_type (str | None)

  • execution_type (str)

  • execution_id (str)

  • stage_id (str)

  • retry_count (int)

  • task_id (str)

class stabilize.queue.messages.InvalidWorkflowId(message_id=None, created_at=<factory>, attempts=0, max_attempts=10, last_error=None, last_error_type=None, execution_type='PIPELINE', execution_id='')[source]

Bases: WorkflowLevel

Message indicating an invalid execution ID was referenced.

Logged and dropped - no further processing.

Parameters:
  • message_id (str | None)

  • created_at (datetime)

  • attempts (int)

  • max_attempts (int)

  • last_error (str | None)

  • last_error_type (str | None)

  • execution_type (str)

  • execution_id (str)

class stabilize.queue.messages.InvalidStageId(message_id=None, created_at=<factory>, attempts=0, max_attempts=10, last_error=None, last_error_type=None, execution_type='PIPELINE', execution_id='', stage_id='', retry_count=0)[source]

Bases: StageLevel

Message indicating an invalid stage ID was referenced.

Logged and dropped - no further processing.

Parameters:
  • message_id (str | None)

  • created_at (datetime)

  • attempts (int)

  • max_attempts (int)

  • last_error (str | None)

  • last_error_type (str | None)

  • execution_type (str)

  • execution_id (str)

  • stage_id (str)

  • retry_count (int)

class stabilize.queue.messages.InvalidTaskId(message_id=None, created_at=<factory>, attempts=0, max_attempts=10, last_error=None, last_error_type=None, execution_type='PIPELINE', execution_id='', stage_id='', retry_count=0, task_id='')[source]

Bases: TaskLevel

Message indicating an invalid task ID was referenced.

Logged and dropped - no further processing.

Parameters:
  • message_id (str | None)

  • created_at (datetime)

  • attempts (int)

  • max_attempts (int)

  • last_error (str | None)

  • last_error_type (str | None)

  • execution_type (str)

  • execution_id (str)

  • stage_id (str)

  • retry_count (int)

  • task_id (str)

class stabilize.queue.messages.InvalidTaskType(message_id=None, created_at=<factory>, attempts=0, max_attempts=10, last_error=None, last_error_type=None, execution_type='PIPELINE', execution_id='', stage_id='', retry_count=0, task_id='', task_type_name='')[source]

Bases: TaskLevel

Message indicating an unknown task type was referenced.

Logged and dropped - no further processing.

Parameters:
  • message_id (str | None)

  • created_at (datetime)

  • attempts (int)

  • max_attempts (int)

  • last_error (str | None)

  • last_error_type (str | None)

  • execution_type (str)

  • execution_id (str)

  • stage_id (str)

  • retry_count (int)

  • task_id (str)

  • task_type_name (str)

task_type_name: str = ''
stabilize.queue.messages.get_message_type_name(message)[source]

Get the type name for a message.

Parameters:

message (Message)

Return type:

str

stabilize.queue.messages.create_message_from_dict(type_name, data)[source]

Create a message from a dictionary representation.

Parameters:
  • type_name (str) – The message type name

  • data (dict[str, Any]) – Dictionary of message fields

Returns:

A Message instance

Raises:

ValueError – If type_name is unknown

Return type:

Message