"""
Queue processor for handling messages.
This module provides the QueueProcessor class that polls messages from
the queue and dispatches them to appropriate handlers.
Enterprise Features:
- Thread-safe processing with locks
- Dead Letter Queue (DLQ) cleanup for failed messages
- Configurable retry and error handling
"""
from __future__ import annotations
import logging
import threading
import time
from collections.abc import Callable
from concurrent.futures import ThreadPoolExecutor
from typing import TYPE_CHECKING, Any, TypeVar
from stabilize.queue import Queue
from stabilize.queue.messages import Message, get_message_type_name
from stabilize.queue.processor.config import QueueProcessorConfig
from stabilize.queue.processor.handler_base import MessageHandler
from stabilize.queue.processor.mixins import QueueProcessorMixin
from stabilize.resilience.config import HandlerConfig
if TYPE_CHECKING:
from stabilize.persistence.store import WorkflowStore
from stabilize.resilience.bulkheads import TaskBulkheadManager
from stabilize.resilience.circuits import WorkflowCircuitFactory
from stabilize.tasks.registry import TaskRegistry
logger = logging.getLogger(__name__)
M = TypeVar("M", bound=Message)
[docs]
class QueueProcessor(QueueProcessorMixin):
"""
Processes messages from a queue using registered handlers.
The processor polls the queue at regular intervals and dispatches
messages to appropriate handlers. Handlers run in a thread pool
for concurrent processing.
When ``store`` and ``task_registry`` are both provided, all 12 default
handlers are registered automatically — no manual registration needed.
Example:
queue = SqliteQueue("sqlite:///workflow.db", table_name="queue_messages")
store = SqliteWorkflowStore("sqlite:///workflow.db", create_tables=True)
registry = TaskRegistry()
processor = QueueProcessor(queue, store=store, task_registry=registry)
processor.start()
"""
def __init__(
self,
queue: Queue,
config: QueueProcessorConfig | None = None,
store: WorkflowStore | None = None,
handler_config: HandlerConfig | None = None,
task_registry: TaskRegistry | None = None,
bulkhead_manager: TaskBulkheadManager | None = None,
circuit_factory: WorkflowCircuitFactory | None = None,
) -> None:
"""
Initialize the queue processor.
Args:
queue: The queue to process
config: Optional configuration (takes precedence if provided)
store: Optional store for message deduplication and handler setup
handler_config: Optional HandlerConfig. If config is None, uses this
to create QueueProcessorConfig. If both are None,
loads from environment.
task_registry: Optional task registry. When both store and task_registry
are provided, all default handlers are auto-registered.
bulkhead_manager: Optional bulkhead manager for RunTaskHandler concurrency control.
circuit_factory: Optional circuit breaker factory for workflow-level circuit breaking.
"""
self.queue = queue
# Use explicit config if provided, otherwise create from handler_config
if config is not None:
self.config = config
else:
self.config = QueueProcessorConfig.from_handler_config(handler_config)
self._store = store
# Warn if deduplication is enabled but no store provided
if self.config.enable_deduplication and store is None:
logger.warning(
"QueueProcessor created with enable_deduplication=True but no store provided. "
"Message deduplication will NOT work. Pass store=store to enable."
)
self._handlers: dict[type[Message], MessageHandler[Any]] = {}
self._running = False
self._stopping = False # Flag for graceful stop (stop accepting new work)
self._executor: ThreadPoolExecutor | None = None
self._poll_thread: threading.Thread | None = None
self._lock = threading.Lock()
self._processing_lock = threading.Lock() # Lock for process_all
self._active_count = 0
self._in_flight: set[str] = set()
self._in_flight_lock = threading.Lock()
self._last_dlq_check = 0.0
# Auto-register default handlers when both store and task_registry are provided
if store is not None and task_registry is not None:
self._register_default_handlers(
queue,
store,
task_registry,
bulkhead_manager,
circuit_factory,
handler_config,
)
[docs]
def register_handler(self, handler: MessageHandler[Any]) -> None:
"""
Register a message handler.
Raises ValueError if a handler for the same message type is already
registered. Use :meth:`replace_handler` to override an existing handler.
Args:
handler: The handler to register
Raises:
ValueError: If a handler for this message type is already registered.
"""
if handler.message_type in self._handlers:
raise ValueError(
f"Handler for {handler.message_type.__name__} is already registered. Use replace_handler() to override."
)
self._handlers[handler.message_type] = handler
logger.debug("Registered handler for %s", handler.message_type.__name__)
[docs]
def replace_handler(self, handler: MessageHandler[Any]) -> None:
"""
Replace an existing handler.
Args:
handler: The new handler to use
Raises:
ValueError: If no handler is registered for this message type.
"""
if handler.message_type not in self._handlers:
raise ValueError(f"No handler registered for {handler.message_type.__name__}")
self._handlers[handler.message_type] = handler
logger.debug("Replaced handler for %s", handler.message_type.__name__)
[docs]
def register_handler_func(
self,
message_type: type[M],
handler_func: Callable[[M], None],
) -> None:
"""
Register a handler function for a message type.
Args:
message_type: The type of message to handle
handler_func: Function to call with the message
"""
_msg_type = message_type
_handler_fn = handler_func
class FuncHandler(MessageHandler[Any]):
@property
def message_type(self) -> type[Any]:
return _msg_type
def handle(self, message: Any) -> None:
_handler_fn(message)
self.register_handler(FuncHandler())
[docs]
def start(self) -> None:
"""Start the queue processor."""
if self._running:
return
self._running = True
self._executor = ThreadPoolExecutor(max_workers=self.config.max_workers)
self._poll_thread = threading.Thread(target=self._poll_loop, daemon=True)
self._poll_thread.start()
logger.info("Queue processor started")
[docs]
def stop(self, wait: bool = True) -> None:
"""
Stop the queue processor.
Args:
wait: Whether to wait for pending messages to complete
"""
self._stopping = True
self._running = False
if self._poll_thread:
self._poll_thread.join(timeout=5.0)
if self._executor:
self._executor.shutdown(wait=wait)
if not wait:
with self._in_flight_lock:
count = len(self._in_flight)
if count > 0:
logger.warning(
"Forced shutdown with %d messages in-flight; will be redelivered after queue lock timeout",
count,
)
logger.info("Queue processor stopped")
[docs]
def request_stop(self) -> None:
"""
Request graceful stop without blocking.
Sets the stopping flag to stop accepting new messages,
but doesn't wait for active tasks to complete.
Use the active_count property to monitor progress.
Example:
processor.request_stop()
while processor.active_count > 0:
time.sleep(0.1)
processor.stop()
"""
self._stopping = True
logger.info("Queue processor stop requested (active=%d)", self.active_count)
@property
def is_stopping(self) -> bool:
"""Check if stop has been requested but not yet completed."""
return self._stopping and self._running
def _poll_loop(self) -> None:
"""Main polling loop.
Uses a lock to prevent race condition between stopping flag check
and message polling. This ensures no messages are submitted after
shutdown is requested.
"""
poll_interval = self.config.poll_frequency_ms / 1000.0
while self._running:
try:
# Hold lock while checking stopping flag AND polling to prevent race
# condition where a message could be submitted after stop is requested
message = None
with self._lock:
# Check if stopping - don't accept new work
if self._stopping:
pass # Will sleep outside lock
# Check if we have capacity
elif self._active_count >= self.config.max_workers:
pass # Will sleep outside lock
else:
# Poll under lock to prevent race
message = self.queue.poll_one()
if message:
# Increment count while still holding lock
self._active_count += 1
if message:
# Submit outside lock (submit doesn't need lock protection)
self._submit_message_internal(message)
else:
time.sleep(poll_interval)
except Exception as e:
logger.error("Error in poll loop: %s", e, exc_info=True)
if self.config.stop_on_error:
self._running = False
break
time.sleep(poll_interval)
def _submit_message(self, message: Message) -> None:
"""Submit a message to the thread pool for processing.
Increments active_count before submitting. Used for external callers.
"""
with self._lock:
self._active_count += 1
self._submit_message_internal(message)
def _submit_message_internal(self, message: Message) -> None:
"""Submit a message without incrementing active_count.
Used by _poll_loop which already incremented the count under lock.
"""
def process_and_ack() -> None:
msg_id = getattr(message, "message_id", None)
if msg_id:
with self._in_flight_lock:
self._in_flight.add(msg_id)
try:
self._handle_message(message)
self.queue.ack(message)
except Exception as e:
logger.error(
"Error handling %s: %s",
get_message_type_name(message),
e,
exc_info=True,
)
# Store error context for debugging and auditing
# This allows tracking of failure patterns across retries
message.set_error_context(e)
# Message will be reprocessed after lock expires or reschedule
self.queue.reschedule(message, self.config.retry_delay)
finally:
if msg_id:
with self._in_flight_lock:
self._in_flight.discard(msg_id)
with self._lock:
self._active_count -= 1
if self._executor is not None:
self._executor.submit(process_and_ack)
[docs]
def process_one(self) -> bool:
"""
Process a single message synchronously.
Useful for testing and debugging.
Returns:
True if a message was processed, False otherwise
"""
message = self.queue.poll_one()
if message:
try:
self._handle_message(message)
self.queue.ack(message)
return True
except Exception as e:
logger.error("Error handling message: %s", e, exc_info=True)
# Store error context for debugging and auditing
message.set_error_context(e)
self.queue.reschedule(message, self.config.retry_delay)
raise
return False
[docs]
def process_all(self, timeout: float = 60.0) -> int:
"""
Process all messages synchronously until queue is empty.
Thread-safe: uses a processing lock to prevent concurrent calls.
Also performs periodic DLQ cleanup for expired messages.
Args:
timeout: Maximum time to wait for processing
Returns:
Number of messages processed
"""
# Use processing lock to prevent concurrent calls
# Non-blocking acquire - if another thread is processing, return immediately
acquired = self._processing_lock.acquire(blocking=False)
if not acquired:
logger.debug("Another thread is processing, skipping")
return 0
try:
count = 0
# Use monotonic time for elapsed time calculations to avoid
# issues with clock drift, NTP adjustments, or leap seconds
start = time.monotonic()
# Periodic DLQ check (every 30 seconds)
if time.monotonic() - self._last_dlq_check > 30.0:
self._check_dlq()
self._last_dlq_check = time.monotonic()
while time.monotonic() - start < timeout:
if self.queue.size() == 0:
break
if self.process_one():
count += 1
else:
# No ready messages, wait a bit
time.sleep(0.01)
return count
finally:
self._processing_lock.release()
@property
def is_running(self) -> bool:
"""Check if the processor is running."""
return self._running
@property
def active_count(self) -> int:
"""Get the number of actively processing messages."""
with self._lock:
return self._active_count