"""
AddMultiInstanceHandler - adds a new instance to a running MI activity.
Implements WCP-15: Multiple Instances without a priori Run-Time Knowledge.
Dynamically creates new parallel instances during execution.
"""
from __future__ import annotations
import logging
from datetime import timedelta
from typing import TYPE_CHECKING
from stabilize.handlers.base import StabilizeHandler
from stabilize.queue.messages import AddMultiInstance, StartStage
from stabilize.resilience.config import HandlerConfig
if TYPE_CHECKING:
from stabilize.events.recorder import EventRecorder
from stabilize.models.stage import StageExecution
from stabilize.persistence.store import WorkflowStore
from stabilize.queue import Queue
logger = logging.getLogger(__name__)
[docs]
class AddMultiInstanceHandler(StabilizeHandler[AddMultiInstance]):
"""
Handler for AddMultiInstance messages.
Execution flow:
1. Load the parent MI stage
2. Verify it allows dynamic instances (mi_config.allow_dynamic)
3. Create a new child stage instance
4. Update the MI join threshold if needed
5. Push StartStage for the new instance
"""
def __init__(
self,
queue: Queue,
repository: WorkflowStore,
retry_delay: timedelta | None = None,
handler_config: HandlerConfig | None = None,
event_recorder: EventRecorder | None = None,
) -> None:
super().__init__(queue, repository, retry_delay, handler_config, event_recorder=event_recorder)
@property
def message_type(self) -> type[AddMultiInstance]:
return AddMultiInstance
[docs]
def handle(self, message: AddMultiInstance) -> None:
"""Handle the AddMultiInstance message."""
self.retry_on_concurrency_error(
lambda: self._handle_with_retry(message),
f"adding multi-instance to stage {message.stage_id}",
)
def _handle_with_retry(self, message: AddMultiInstance) -> None:
"""Inner handle logic to be retried."""
def on_stage(stage: StageExecution) -> None:
from stabilize.models.stage import StageExecution as StageExec
if stage.mi_config is None:
logger.warning(
"AddMultiInstance for stage %s which has no MI config, ignoring",
stage.name,
)
return
if not stage.mi_config.allow_dynamic:
logger.warning(
"AddMultiInstance for stage %s which does not allow dynamic instances, ignoring",
stage.name,
)
return
if stage.status.is_complete:
logger.warning(
"AddMultiInstance for completed stage %s, ignoring",
stage.name,
)
return
# Track instance count
instance_count = stage.context.get("_mi_instance_count", 0)
instance_count += 1
stage.context["_mi_instance_count"] = instance_count
# Create new child instance stage
instance_ref_id = f"{stage.ref_id}_instance_{instance_count}"
instance_context = dict(message.instance_context)
instance_context["_mi_parent_ref_id"] = stage.ref_id
instance_context["_mi_instance_index"] = instance_count
new_instance = StageExec.create(
type=stage.type,
name=f"{stage.name} [instance {instance_count}]",
ref_id=instance_ref_id,
context=instance_context,
requisite_stage_ref_ids={stage.ref_id},
)
# Add to execution
execution = stage.execution
new_instance.execution = execution
execution.stages.append(new_instance)
# Persist the new instance and update parent
with self.repository.transaction(self.queue) as txn:
txn.store_stage(stage)
if message.message_id:
txn.mark_message_processed(
message_id=message.message_id,
handler_type="AddMultiInstance",
execution_id=message.execution_id,
)
# Add stage to repository (outside transaction since add_stage is separate)
self.repository.add_stage(new_instance)
# Push start message for the new instance
self.queue.push(
StartStage(
execution_type=message.execution_type,
execution_id=message.execution_id,
stage_id=new_instance.id,
)
)
logger.info(
"Added MI instance %s to stage %s (total: %d)",
instance_ref_id,
stage.name,
instance_count,
)
self.with_stage(message, on_stage)