Runtime System
The runtime system executes compiled workflow graphs, manages the program counter, handles call stacks, and performs dependency injection at node execution time. WorkflowInterpreter is the core execution engine in AmritaSense and is designed for step-by-step interpretation rather than bulk graph traversal.
WorkflowInterpreter
class WorkflowInterpreter(Generic[io_T]):
...WorkflowInterpreter is the main engine for executing a rendered workflow graph. It tracks the current execution pointer using PointerVector, manages subroutine calls through a return address stack, and supports external interruption and streaming via a generic object_io interface.
Constructor
WorkflowInterpreter(
node_compose: NodeComposeRendered | SelfCompileInstruction,
object_io: SuspendObjectStream[Any] | None = None,
*,
exception_ignored: tuple[type[BaseException], ...] = (),
extra_args: tuple = (),
extra_kwargs: dict[str, Any] | None = None,
addr_stack: Stack[PointerVector] | None = None,
middleware: Callable[['WorkflowInterpreter'], Awaitable[Any]] | None = None,
)Arguments:
node_compose: A rendered workflow graph or a self-compiling instruction.object_io: Optional external I/O object. Defaults to a newSuspendObjectStream.exception_ignored: Exception types to bypass TRY/CATCH blocks.extra_args/extra_kwargs: Additional runtime values available for dependency injection.addr_stack: Optional return address stack.middleware: Optional async callable that receives theWorkflowInterpreterinstance. When set,run_step_by()andcall_sub()delegate to the middleware instead of calling nodes directly. The middleware can decide whether and how to execute nodes, transform results, or inject custom logic around every step.parent_interpreter(v0.3.0+): Optional parentWorkflowInterpreterfor building an interpreter tree. Set automatically byfork_interpreter()— rarely needed directly.
Panic / Recover (v0.3.1+)
When an unhandled exception escapes the main execution loop, the interpreter enters a panic state: it preserves the exception (_panic_exc), the current pointer position, and all stack state so that the crash site can be inspected and execution can be resumed.
This is distinct from the TRY/CATCH mechanism:
| Aspect | Try-Catch | Panic / Recover |
|---|---|---|
| Scope | Local, predictable business errors | Global, unexpected crashes |
| Overhead | Low (instruction-level interception) | High (preserves full interpreter state) |
| After crash | CATCH block handles, execution continues | Interpreter dumps, retains crash site |
| Recovery | Automatic inside CATCH | Call run() / run_step_by() again to resume from crash point |
| Use case | Node-level retry, fallback, rollback | Debugging, audit, post-crash continuation |
To recover from a panic, simply call run() (or run_step_by()) again on the same interpreter — the pointer is still at the crash location, and execution will resume from there. The interpreter logs "Recovered from panic" and clears _panic_exc on the next run.
Key attributes
_graph: The compiled workflow graph being executed._pointer: CurrentPointerVectorexecution address._ret_addr_stack: Return address stack for subroutine calls._jump_marked: Flag indicating whether a jump operation occurred._interpret_lock: Async lock used to guarantee one-node-at-a-time execution.object_io: External I/O stream used for suspend/resume and streaming output.
Interpreter Tree (v0.3.0+)
Interpreters form a tree: a top-level interpreter may have child interpreters created via fork_interpreter(), and those children may have their own children.
id: str — Unique UUID string identifying this interpreter instance.
parent: WorkflowInterpreter | None — The parent interpreter, or None if this is the top-level interpreter.
top_interpreter: WorkflowInterpreter — The root of the interpreter tree.
sub_interpreters: dict[str, WorkflowInterpreter] — Dict of direct child interpreters, keyed by their IDs.
all_sub_interpreters: dict[str, WorkflowInterpreter] — (Top-level only) All descendant interpreters in the entire tree.
is_running: bool — True if the interpreter's main loop is currently executing. After the workflow completes (or terminates), returns False.
pending_stop: bool — True if terminate() has been called on this interpreter.
wait: asyncio.Future[None] — A future that resolves when the interpreter finishes execution. Raises IllegalState if the interpreter is not running.
get_exception() -> Exception | None (v0.3.1+) — Return the last panic exception, or None if the interpreter finished normally or has never crashed. Available immediately after a panic for diagnostic purposes.
Important methods
async run() -> None
Execute the entire workflow to completion. This method internally iterates over run_step_by() and consumes all generated results.
async run_step_by() -> AsyncGenerator[Any, None]
Execute the workflow step by step, yielding the result of each node execution. This is the main entry point for external monitoring and cooperative suspension.
The generated sequence includes:
- waiting for
object_iosuspend signals at the globalWorkflowInterpreter::each_nodecheckpoint - acquiring
_interpret_lock - executing the current node via
_call() - advancing the pointer unless a jump was marked
jump_to(addr: list[int])
Perform an absolute jump to a new address. This sets the current pointer using a full PointerVector replacement.
jump_near(addr: int)
Replace the last dimension of the current pointer within the current scope.
jump_offset(offset: int)
Apply a relative offset to the current pointer position.
jump_to_top(addr: int)
Jump to an address at the top-level workflow.
jump_offset_top(offset: int)
Apply a relative offset at the top level and reset nested dimensions.
jump_far_ptr(offset: list[int])
Perform a multi-dimensional absolute jump. Replaces the entire _pointer with the given address vector via far_to(). Used by RET_FAR to return from nested scopes.
jump_offset_far(offset: list[int])
Apply a multi-dimensional offset to the current pointer position. Unlike jump_offset() which only adjusts the innermost dimension, this applies an offset vector across all nesting levels.
async call_sub(addr, /, *extra_arg, interrupt: bool = False, **extra_kwargs)
Call a subroutine at the specified address. It pushes the current pointer onto the return address stack, switches execution to the subroutine, and restores the pointer after the call.
interrupt=Trueacquires the interpreter lock during the call, making it safe for external injection.interrupt=Falseis the normal internal call path.
async call_near(addr: int, *ag, interrupt: bool = False, **kw)
Call a subroutine within the current scope using a relative near address.
async call_offset(offset: int, *ag, interrupt: bool = False, **kw)
Call a subroutine by applying a relative offset to the current pointer.
async call_offset_far(offset: list[int], *ag, interrupt: bool = False, **kw)
Call a subroutine at a multi-dimensional offset from the current position. Applies offset_far() to compute the target address, then delegates to call_sub(). Useful for invoking nodes across nested scopes.
fork_interpreter(compose=None, middleware=UNSET, object_io=None) -> WorkflowInterpreter (v0.3.0+)
Create a child interpreter in the interpreter tree. By default inherits the parent's graph and middleware.
compose: OptionalNodeComposeRenderedfor the child. IfNone, uses the parent's graph.middleware:UNSET(inherit parent's),None(no middleware), or a custom callable.object_io: OptionalSuspendObjectStream. IfNone, shares the parent'sobject_io. Since v0.3.2,SuspendObjectStreamis concurrency-safe via the CLCA signal design pattern.
async terminate(eol: bool = True) (v0.3.0+)
Mark this interpreter for graceful stop. Sets pending_stop = True and awaits the wait future. If eol=True, removes the interpreter from the tree after termination.
terminate_all_forks(eol: bool = True, exclude_self: bool = False) -> asyncio.Future (v0.3.0+)
Mark all direct child interpreters for termination. Returns a future that resolves when all children have terminated.
async terminate_all(eol: bool = True, exclude_self: bool = False) (v0.3.0+)
Top-level only: mark this interpreter and all descendants for termination. Raises IllegalState if called on a non-top-level interpreter.
async wait_all_forks(return_exc=False, exclude_self=False) (v0.3.0+)
Wait for all direct child interpreters to finish. If return_exc=True, returns a list of BaseException | None.
async wait_all(return_exc=False, exclude_self=False) (v0.3.0+)
Top-level only: wait for the entire interpreter tree to finish. Raises IllegalState if called on a non-top-level interpreter.
get_exception() -> Exception | None (v0.3.1+)
Return the last panic exception, or None if the interpreter finished normally or has never crashed. Useful for checking whether a previous run() crashed and what went wrong.
reset() (v0.3.1+)
Reset the interpreter's execution state to its initial values: clear the pointer, return address stack, jump marker, pending stop flag, waiter future, and panic exception. This is independent of the recovery flow — to recover from a panic, simply call run() again without resetting.
reset() is intended for scenarios where you want to restart execution from scratch on the same workflow graph without creating a new interpreter.
find_addr_alias(alias: str) -> list[int]
Resolve an alias to its absolute address vector. Raises NullPointerException if the alias does not exist.
find_addr(addr: list[int]) -> BaseNode | NodeComposeRendered
Find a node or rendered composition by absolute address.
find_node_alias(alias: str) -> BaseNode | NodeComposeRendered
Resolve an alias and return the corresponding node object.
advance_pointer(ptr: PointerVector | None = None) -> bool
Advance the execution pointer to the next node in the workflow graph. This method implements the logic for navigating through nested workflow structures, handling both sequential execution and hierarchical traversal.
Parameters
ptr: Optional external pointer vector to advance. When provided, the method advances this pointer without modifying the interpreter's own_pointer. Defaults toNone, in which caseself._pointeris advanced. This enables external systems to preview pointer advancement paths without disturbing interpreter state.
Returns
Trueif the pointer was successfully advanced to the next node.Falseif the end of the workflow has been reached.
Algorithm
- Starting from
ptr(orself._pointer), traversebase_addrlayer-by-layer to locate the container of the current node. - If the current node is a non-empty
NodeComposeRendered→ enter the nested container (append(0)), returnTrue. - If the current node has a next sibling:
- Sibling is a non-empty
NodeComposeRendered→ enter that nested container, returnTrue. - Otherwise → move to the sibling node, return
True.
- Sibling is a non-empty
- If no next sibling → backtrack up the pointer stack layer-by-layer, looking for a parent container's next sibling.
- If a next sibling is found during backtracking → apply the same logic, return
True. - If backtracking reaches the top level with no more siblings → return
False(end of workflow).
Deprecation
The _advance_pointer property is deprecated since v0.3.0. Use advance_pointer() instead. The old property exists only as a compatibility shim and will be removed in a future version.
Execution behavior
WorkflowInterpreter preserves execution atomicity by holding _interpret_lock while a single node is executed. It only checks suspend points at safe boundaries:
- before each node execution via the global checkpoint
WorkflowInterpreter::each_node - before each individual node via the node’s tag
The object_io implementation is responsible for coordinating suspension and resumption.
Example
from amrita_sense.node.core import Node
from amrita_sense.runtime.workflow import WorkflowInterpreter
@Node()
async def a():
return 1
@Node()
async def b():
return 2
compose = a >> b
rendered = compose.render()
pc = WorkflowInterpreter(rendered)
await pc.run()