CLCA Design Pattern
In async programming, one classic scenario keeps surfacing: multiple coroutines need to simultaneously wait for the same external "signal", then all resume together. That signal might be a button click, a long-awaited network response, or a background workflow node completing its task.
Two paths typically present themselves: bring in a full message queue (too heavy), or hand-roll a simple notification using asyncio.Event or a shared Future. The hand-rolled approach, however, has fatal gaps: a bare shared Future only lets one coroutine consume the result; swap in an Event, and you can wake many waiters, but safely crossing event-loop boundaries becomes precarious. Ensuring one signal reliably wakes all waiters — even across multiple event loops — usually forces polling or per-waiter Future bookkeeping, burdening the external controller with significant complexity.
This article presents a lightweight signal-distribution pattern distilled from practice — CLCA (Cross Loop Callback‑Allocate). It decouples "suspend" and "resume" from complex flow-control logic, turning them into a standalone, highly reusable cross-event-loop communication primitive.
What It Isn't: Not a Queue
First, a clarification: CLCA is not a "queue". A queue is a FIFO pipeline that carries a data stream and cares about "who gets this element". CLCA cares about the signal — a bare "you may proceed" notification. Its core responsibility is allocation: from a single signal source, precisely waking every coroutine waiting on that signal.
That is the meaning of "Callback‑Allocate": through a callback mechanism, the signal is allocated to every registered waiter.
Core Mechanism
CLCA is built from three cooperating primitives:
Shared Future (Signal Source)
An asyncio.Future completed via set_result(). It carries no business data — only the "wake" signal.
Ephemeral Future + add_done_callback (Allocation Channel)
This is the heart of CLCA. Waiters attach their own "ephemeral Future" to the shared Future via add_done_callback. When the signal source fires, every attached ephemeral Future completes automatically, waking all waiters in one shot. Crucially, this mechanism is inherently cross-event-loop: each coroutine ultimately awaits a Future inside its own event loop, sidestepping the compatibility issues of cross-loop await.
aiologic.Lock (State Synchronization)
In multi-threaded, multi-event-loop environments, ensures that mutations to the shared Future and waiter state are atomic and visible across loops.
Two Variants: Voluntary Yield vs. Checkpoint Suspend
Depending on who initiates the suspend, CLCA manifests in two typical forms:
- Voluntary yield (active) — The coroutine itself decides when to suspend and wait for an external signal. This is the most direct one-to-many notification scenario, represented by the
Signalclass. - Checkpoint suspend (passive) — An external controller pre-sets a "checkpoint"; when a coroutine reaches that point it passively detects and suspends until explicitly resumed. This form excels when the system needs fine-grained external control over coroutine execution pacing, represented by the
CheckpointSignalclass.
Both forms share the same CLCA kernel; they differ only in the direction of the suspend trigger.
Practice I: Signal — Voluntary Yield
Sequence diagram:
The coroutine voluntarily calls wait() to yield control, waiting for an external signal. This is the classic scenario described earlier, implemented as follows:
import asyncio
import aiologic
class Signal:
"""A reusable signal. Workers voluntarily yield and suspend, waiting for
an external signal to resume."""
def __init__(self):
self._shared_future: asyncio.Future | None = None # shared signal source
self._lock = aiologic.Lock()
async def wait(self) -> None:
"""Suspend the current coroutine, waiting for the signal.
Can be called from any event loop."""
async with self._lock:
# Decide: are we the allocator or a follower?
if self._shared_future is None or self._shared_future.done():
self._shared_future = asyncio.Future()
shared_fut = self._shared_future
is_allocator = True
else:
shared_fut = self._shared_future
is_allocator = False
await asyncio.sleep(0) # brief yield
if is_allocator:
fut: asyncio.Future[None] = asyncio.Future()
shared_fut.add_done_callback(lambda _: fut.set_result(None))
else:
fut = shared_fut
try:
await fut
finally:
if is_allocator:
async with self._lock: # safely clean up under the lock
if (
self._shared_future is shared_fut
): # guard against next-cycle Future
self._shared_future = None
async def signal(self) -> None:
"""Fire the signal, waking all waiters.
Can be called from any event loop."""
async with self._lock:
if self._shared_future and not self._shared_future.done():
self._shared_future.set_result(True)Key Points
- Allocator vs. waiter: The first coroutine calling
wait()creates the sharedFutureand cleans up after waking; subsequent coroutines bind to the same sharedFutureviaadd_done_callback. Theawait asyncio.sleep(0)inside the lock ensures callbacks are safely attached after the lock is released. - One shot wakes all:
signal()completes the sharedFuture, firing all registered callbacks and waking every waiter simultaneously. - Cross-loop safety:
aiologic.Lockprotects all shared-state reads and writes. Each coroutine ultimatelyawaits aFuturein its own event loop, naturally avoiding cross-loop waiting issues.
Practice II: CheckpointSignal — External Checkpoint Suspend
Sequence diagram:
When control resides externally, a "checkpoint" mechanism is needed: the outside sets a suspension point, and internal coroutines automatically suspend upon arrival, waiting for external resumption. Stripping away business logic (queue, tag filtering, callbacks, etc.) from the real-world SuspendObjectStream yields the pure CheckpointSignal.
Design Rationale
CheckpointSignal exposes three core operations:
arm()— Called externally; sets the checkpoint and blocks until a coroutine actually arrives and suspends. This lets the outside world confirm the coroutine has paused.wait()— Called internally by coroutines. If a checkpoint has been armed, suspend untilresume()is called; otherwise return immediately.resume()— Called externally; wakes all coroutines waiting at the checkpoint.
This mechanism is fully built on CLCA: two Futures implement a bidirectional handshake. _suspend_signal handles "suspend acknowledgment" (external waits for coroutine arrival), and _resume_signal handles "resume notification" (coroutines wait for external resumption).
Implementation
import asyncio
import aiologic
class CheckpointSignal:
"""An externally-armed checkpoint: coroutines automatically suspend
on arrival and wait for external resumption."""
def __init__(self):
self._suspend_signal: asyncio.Future | None = None # external awaits "suspend confirmed"
self._resume_signal: asyncio.Future | None = None # coroutines await "resume"
self._lock = aiologic.Lock()
async def arm(self) -> None:
"""Arm the checkpoint and block until a coroutine actually arrives
and suspends."""
async with self._lock:
if self._suspend_signal is not None and not self._suspend_signal.done():
raise RuntimeError("Already armed")
self._suspend_signal = asyncio.Future()
await self._suspend_signal
async def wait(self) -> None:
"""
Called internally by coroutines. If a checkpoint is armed, suspend
until resume() is called; otherwise return immediately.
"""
async with self._lock:
if self._suspend_signal is None:
return
if self._resume_signal is not None and not self._resume_signal.done():
# Another coroutine is already waiting — attach our own ephemeral Future
shared = self._resume_signal
is_first = False
else:
# First coroutine to arrive: confirm suspend, create resume Future
if not self._suspend_signal.done():
self._suspend_signal.set_result(True)
self._resume_signal = asyncio.Future()
shared = self._resume_signal
is_first = True
await asyncio.sleep(0)
if is_first:
fut = shared
else:
fut = asyncio.Future()
shared.add_done_callback(lambda _: fut.set_result(None))
try:
await fut
finally:
if is_first:
async with self._lock:
if self._resume_signal is fut:
self._resume_signal = None
def resume(self) -> None:
"""Wake all coroutines suspended at the checkpoint."""
with self._lock:
if self._resume_signal is not None and not self._resume_signal.done():
self._resume_signal.set_result(True)Key Points
- Bidirectional handshake:
arm()creates_suspend_signaland blocks. The first coroutine to reachwait()detects_suspend_signal, completes it to wakearm()— this confirms to the external controller that the coroutine is precisely at the checkpoint. - CLCA reuse: Multiple coroutines may arrive at the same checkpoint concurrently. The first creates the shared
_resume_signal; later coroutines attach their own ephemeralFutures viaadd_done_callback.resume()completes the shared_resume_signal, waking all waiters at once. - Lifecycle cleanup: The first coroutine cleans up
_resume_signalafter waking, preventing stale state from leaking into the next cycle. - Idempotency & guardrails: Repeated
arm()raises an exception to prevent state confusion;wait()returns immediately when no checkpoint is armed, ensuring coroutines are never blocked on the normal execution path.
Usage Examples
1. Signal: Multiple Coroutines Waiting for an External Event
signal = Signal()
async def worker(name):
print(f"Worker {name} suspending...")
await signal.wait()
print(f"Worker {name} received signal, resuming!")
async def controller():
await asyncio.sleep(1)
print("Controller firing signal!")
await signal.signal()
async def main():
await asyncio.gather(worker("A"), worker("B"), controller())2. CheckpointSignal: External Control of Coroutine Pacing
checkpoint = CheckpointSignal()
async def stage(name):
print(f"Stage {name} starting...")
await asyncio.sleep(0.5)
print(f"Stage {name} reached checkpoint, awaiting external command...")
await checkpoint.wait()
print(f"Stage {name} completing!")
async def controller():
# Arm the checkpoint and wait for coroutine arrival
print("Controller: arming checkpoint...")
await checkpoint.arm()
print("Controller: checkpoint active, coroutine suspended. Performing other work...")
await asyncio.sleep(1)
print("Controller: resuming execution!")
checkpoint.resume()
async def main():
await asyncio.gather(stage("A"), controller())3. Cross-Event-Loop Synchronization
Both Signal and CheckpointSignal are built on the same CLCA kernel and natively support cross-thread, cross-event-loop usage. Below is a cross-loop example with CheckpointSignal (Python 3.14+):
import threading
checkpoint = CheckpointSignal()
async def worker_in_loop():
print("Coroutine in another loop reached checkpoint...")
await checkpoint.wait()
print("Coroutine in another loop has been woken!")
def run_loop():
asyncio.run(worker_in_loop())
thread = threading.Thread(target=run_loop)
thread.start()
async def main():
await asyncio.sleep(0.5)
print("Main loop: arming checkpoint and waiting...")
await checkpoint.arm()
print("Main loop: checkpoint active, sending resume signal")
checkpoint.resume()
asyncio.run(main())
thread.join()Summary
CLCA abstracts "suspend‑resume" into a standalone signal-distribution primitive, solving the core challenge of multi-coroutine synchronized waiting. From this foundation, two practical variants emerge depending on the control relationship:
Signal— Voluntary yield, suitable for coroutines willingly waiting on external events;CheckpointSignal— External checkpoint suspend, suitable for externally controlling coroutine execution pacing with precision.
Both share the same lightweight, cross-event-loop CLCA kernel, achieving high flexibility and reusability with nothing more than Future + Lock. If your system needs to insert precise synchronization points into complex async flows, these two signal primitives will serve as exceptionally effective tools.
