Dynamic Execution
The dynamic execution module lets you modify the DAG at runtime. When a node completes, an expander function can inspect its result and add or remove nodes before execution continues. This supports fan-out patterns where the number of downstream tasks depends on data discovered at runtime (e.g., one task per file found in a directory, one task per API page to fetch).
See the Dynamic DAGs guide for usage patterns and best practices.
DynamicExecutor
class DynamicExecutor(
dag: DAG,
expanders: dict[str, Callable[[Any], DynamicModification | None]],
max_workers: int | None = None,
callbacks: ExecutionCallbacks | None = None,
fail_fast: bool = True,
enable_tracing: bool = False,
)
An executor that supports runtime graph modifications. After each node
completes, the executor checks if an expander function is registered for that
node. If so, the expander is called with the node's return value and can return
a DynamicModification describing nodes and edges to add or remove.
| Parameter | Type | Default | Description |
|---|---|---|---|
dag | DAG | required | The initial DAG. Will be modified in-place during execution. |
expanders | dict[str, Callable] | required | Map of node names to expander functions. Each receives the node's return value and returns a DynamicModification or None. |
max_workers | int | None | None | Maximum concurrent workers. |
callbacks | ExecutionCallbacks | None | None | Optional lifecycle callbacks. The on_dynamic_expand callback is called when new nodes are added. |
fail_fast | bool | True | If True, skip downstream nodes when any node fails. |
enable_tracing | bool | False | If True, record a Chrome-compatible execution trace. |
execute
def execute(
tasks: dict[str, Callable],
) -> ExecutionResult
Execute tasks with dynamic expansion. The tasks dictionary should contain
tasks for all initially known nodes. Dynamically added nodes must include their
task callable in the DynamicNodeSpec.
| Parameter | Type | Default | Description |
|---|---|---|---|
tasks | dict[str, Callable] | required | Map of node names to callable tasks for the initial nodes. |
Returns: ExecutionResult — includes results for both initial and dynamically added nodes.
import dagron
dag = (
dagron.DAG.builder()
.add_node("discover")
.add_node("aggregate")
.add_edge("discover", "aggregate")
.build()
)
def discover_expander(result):
"""Add one processing node per discovered file."""
files = result # e.g., ["a.csv", "b.csv", "c.csv"]
nodes = [
dagron.DynamicNodeSpec(
name=f"process_{f}",
task=lambda f=f: f"processed {f}",
dependencies=["discover"],
dependents=["aggregate"],
)
for f in files
]
return dagron.DynamicModification(add_nodes=nodes)
executor = dagron.DynamicExecutor(
dag,
expanders={"discover": discover_expander},
max_workers=4,
)
result = executor.execute({
"discover": lambda: ["a.csv", "b.csv", "c.csv"],
"aggregate": lambda: "all files processed",
})
print(result.succeeded) # 5 (discover + 3 process + aggregate)
DynamicModification
class DynamicModification(
add_nodes: list[DynamicNodeSpec] | None = None,
remove_nodes: list[str] | None = None,
)
A description of graph modifications to apply after a node completes. Returned by expander functions.
| Parameter | Type | Default | Description |
|---|---|---|---|
add_nodes | list[DynamicNodeSpec] | None | None | Nodes to add to the graph. Each spec includes the task callable and edge connections. |
remove_nodes | list[str] | None | None | Node names to remove from the graph. Their edges are also removed. |
# Add nodes
mod = dagron.DynamicModification(
add_nodes=[
dagron.DynamicNodeSpec("task_1", task=lambda: "result_1", dependencies=["source"]),
dagron.DynamicNodeSpec("task_2", task=lambda: "result_2", dependencies=["source"]),
]
)
# Remove nodes
mod = dagron.DynamicModification(remove_nodes=["obsolete_node"])
# Both
mod = dagron.DynamicModification(
add_nodes=[dagron.DynamicNodeSpec("replacement", task=lambda: "new", dependencies=["source"])],
remove_nodes=["old_task"],
)
DynamicNodeSpec
class DynamicNodeSpec(
name: str,
task: Callable,
dependencies: list[str] | None = None,
dependents: list[str] | None = None,
)
A specification for a node to be dynamically added during execution. Includes the task callable and edge connections.
| Parameter | Type | Default | Description |
|---|---|---|---|
name | str | required | Unique name for the new node. |
task | Callable | required | The callable to execute for this node. |
dependencies | list[str] | None | None | Names of existing nodes that this node depends on (incoming edges). |
dependents | list[str] | None | None | Names of existing nodes that depend on this node (outgoing edges). |
The dependencies list creates edges dep -> new_node and the dependents
list creates edges new_node -> dependent. This wires the new node into the
existing graph topology.
spec = dagron.DynamicNodeSpec(
name="process_chunk_42",
task=lambda: "chunk 42 processed",
dependencies=["split"], # split -> process_chunk_42
dependents=["merge"], # process_chunk_42 -> merge
)
Complete Example: Map-Reduce
A dynamic map-reduce pipeline where the mapper discovers the number of chunks at runtime:
import dagron
dag = (
dagron.DAG.builder()
.add_node("read_input")
.add_node("reduce")
.add_edge("read_input", "reduce")
.build()
)
def map_expander(data):
"""Split input into chunks and create a mapper node per chunk."""
chunks = [data[i:i+100] for i in range(0, len(data), 100)]
return dagron.DynamicModification(
add_nodes=[
dagron.DynamicNodeSpec(
name=f"map_{i}",
task=lambda chunk=chunk: sum(chunk),
dependencies=["read_input"],
dependents=["reduce"],
)
for i, chunk in enumerate(chunks)
]
)
executor = dagron.DynamicExecutor(
dag,
expanders={"read_input": map_expander},
max_workers=8,
callbacks=dagron.ExecutionCallbacks(
on_dynamic_expand=lambda name, new_nodes: print(
f"[EXPAND] {name} added {len(new_nodes)} nodes"
),
),
)
result = executor.execute({
"read_input": lambda: list(range(500)),
"reduce": lambda: "reduction complete",
})
print(f"Total nodes executed: {result.succeeded}")
# read_input + 5 mappers + reduce = 7
Callbacks for Dynamic Expansion
The on_dynamic_expand callback in ExecutionCallbacks is
called whenever new nodes are added:
callbacks = dagron.ExecutionCallbacks(
on_dynamic_expand=lambda node_name, new_nodes: print(
f"Node '{node_name}' expanded with: {new_nodes}"
),
)
This is useful for logging, monitoring, and debugging dynamic DAGs in production.
Constraints and Safety
- No cycles: dynamically added edges must not create cycles. The executor
validates this and raises
CycleErrorif a cycle would be introduced. - No duplicate names: dynamically added nodes must have unique names. A
DuplicateNodeErroris raised otherwise. - Topological consistency: new nodes are inserted into the execution schedule at the correct topological position. Nodes that have already been dispatched are not re-executed.
- Expander idempotency: expanders should be idempotent. If execution is retried (e.g., via checkpointing), expanders may run again.
Related
- DAGExecutor — the base executor without dynamic expansion.
- Conditional Execution — skip branches without modifying the graph.
- Pipeline — a static decorator-based pipeline API.
- Dynamic DAGs guide — patterns and best practices.