Skip to main content

Dynamic Execution

The dynamic execution module lets you modify the DAG at runtime. When a node completes, an expander function can inspect its result and add or remove nodes before execution continues. This supports fan-out patterns where the number of downstream tasks depends on data discovered at runtime (e.g., one task per file found in a directory, one task per API page to fetch).

See the Dynamic DAGs guide for usage patterns and best practices.


DynamicExecutor

DynamicExecutor
class DynamicExecutor(
dag: DAG,
expanders: dict[str, Callable[[Any], DynamicModification | None]],
max_workers: int | None = None,
callbacks: ExecutionCallbacks | None = None,
fail_fast: bool = True,
enable_tracing: bool = False,
)

An executor that supports runtime graph modifications. After each node completes, the executor checks if an expander function is registered for that node. If so, the expander is called with the node's return value and can return a DynamicModification describing nodes and edges to add or remove.

ParameterTypeDefaultDescription
dagDAGrequiredThe initial DAG. Will be modified in-place during execution.
expandersdict[str, Callable]requiredMap of node names to expander functions. Each receives the node's return value and returns a DynamicModification or None.
max_workersint | NoneNoneMaximum concurrent workers.
callbacksExecutionCallbacks | NoneNoneOptional lifecycle callbacks. The on_dynamic_expand callback is called when new nodes are added.
fail_fastboolTrueIf True, skip downstream nodes when any node fails.
enable_tracingboolFalseIf True, record a Chrome-compatible execution trace.

execute

DynamicExecutor.execute
def execute(
tasks: dict[str, Callable],
) -> ExecutionResult

Execute tasks with dynamic expansion. The tasks dictionary should contain tasks for all initially known nodes. Dynamically added nodes must include their task callable in the DynamicNodeSpec.

ParameterTypeDefaultDescription
tasksdict[str, Callable]requiredMap of node names to callable tasks for the initial nodes.

Returns: ExecutionResult — includes results for both initial and dynamically added nodes.

import dagron

dag = (
dagron.DAG.builder()
.add_node("discover")
.add_node("aggregate")
.add_edge("discover", "aggregate")
.build()
)

def discover_expander(result):
"""Add one processing node per discovered file."""
files = result # e.g., ["a.csv", "b.csv", "c.csv"]
nodes = [
dagron.DynamicNodeSpec(
name=f"process_{f}",
task=lambda f=f: f"processed {f}",
dependencies=["discover"],
dependents=["aggregate"],
)
for f in files
]
return dagron.DynamicModification(add_nodes=nodes)

executor = dagron.DynamicExecutor(
dag,
expanders={"discover": discover_expander},
max_workers=4,
)

result = executor.execute({
"discover": lambda: ["a.csv", "b.csv", "c.csv"],
"aggregate": lambda: "all files processed",
})

print(result.succeeded) # 5 (discover + 3 process + aggregate)

DynamicModification

DynamicModification
class DynamicModification(
add_nodes: list[DynamicNodeSpec] | None = None,
remove_nodes: list[str] | None = None,
)

A description of graph modifications to apply after a node completes. Returned by expander functions.

ParameterTypeDefaultDescription
add_nodeslist[DynamicNodeSpec] | NoneNoneNodes to add to the graph. Each spec includes the task callable and edge connections.
remove_nodeslist[str] | NoneNoneNode names to remove from the graph. Their edges are also removed.
# Add nodes
mod = dagron.DynamicModification(
add_nodes=[
dagron.DynamicNodeSpec("task_1", task=lambda: "result_1", dependencies=["source"]),
dagron.DynamicNodeSpec("task_2", task=lambda: "result_2", dependencies=["source"]),
]
)

# Remove nodes
mod = dagron.DynamicModification(remove_nodes=["obsolete_node"])

# Both
mod = dagron.DynamicModification(
add_nodes=[dagron.DynamicNodeSpec("replacement", task=lambda: "new", dependencies=["source"])],
remove_nodes=["old_task"],
)

DynamicNodeSpec

DynamicNodeSpec
class DynamicNodeSpec(
name: str,
task: Callable,
dependencies: list[str] | None = None,
dependents: list[str] | None = None,
)

A specification for a node to be dynamically added during execution. Includes the task callable and edge connections.

ParameterTypeDefaultDescription
namestrrequiredUnique name for the new node.
taskCallablerequiredThe callable to execute for this node.
dependencieslist[str] | NoneNoneNames of existing nodes that this node depends on (incoming edges).
dependentslist[str] | NoneNoneNames of existing nodes that depend on this node (outgoing edges).

The dependencies list creates edges dep -> new_node and the dependents list creates edges new_node -> dependent. This wires the new node into the existing graph topology.

spec = dagron.DynamicNodeSpec(
name="process_chunk_42",
task=lambda: "chunk 42 processed",
dependencies=["split"], # split -> process_chunk_42
dependents=["merge"], # process_chunk_42 -> merge
)

Complete Example: Map-Reduce

A dynamic map-reduce pipeline where the mapper discovers the number of chunks at runtime:

import dagron

dag = (
dagron.DAG.builder()
.add_node("read_input")
.add_node("reduce")
.add_edge("read_input", "reduce")
.build()
)

def map_expander(data):
"""Split input into chunks and create a mapper node per chunk."""
chunks = [data[i:i+100] for i in range(0, len(data), 100)]
return dagron.DynamicModification(
add_nodes=[
dagron.DynamicNodeSpec(
name=f"map_{i}",
task=lambda chunk=chunk: sum(chunk),
dependencies=["read_input"],
dependents=["reduce"],
)
for i, chunk in enumerate(chunks)
]
)

executor = dagron.DynamicExecutor(
dag,
expanders={"read_input": map_expander},
max_workers=8,
callbacks=dagron.ExecutionCallbacks(
on_dynamic_expand=lambda name, new_nodes: print(
f"[EXPAND] {name} added {len(new_nodes)} nodes"
),
),
)

result = executor.execute({
"read_input": lambda: list(range(500)),
"reduce": lambda: "reduction complete",
})

print(f"Total nodes executed: {result.succeeded}")
# read_input + 5 mappers + reduce = 7

Callbacks for Dynamic Expansion

The on_dynamic_expand callback in ExecutionCallbacks is called whenever new nodes are added:

callbacks = dagron.ExecutionCallbacks(
on_dynamic_expand=lambda node_name, new_nodes: print(
f"Node '{node_name}' expanded with: {new_nodes}"
),
)

This is useful for logging, monitoring, and debugging dynamic DAGs in production.


Constraints and Safety

  • No cycles: dynamically added edges must not create cycles. The executor validates this and raises CycleError if a cycle would be introduced.
  • No duplicate names: dynamically added nodes must have unique names. A DuplicateNodeError is raised otherwise.
  • Topological consistency: new nodes are inserted into the execution schedule at the correct topological position. Nodes that have already been dispatched are not re-executed.
  • Expander idempotency: expanders should be idempotent. If execution is retried (e.g., via checkpointing), expanders may run again.