Dynamic DAGs
Sometimes you cannot know the full shape of your pipeline until execution is underway. A data-discovery step might reveal 50 tables that each need their own processing branch. A model-selection step might choose between three architectures. A file scanner might find a variable number of inputs.
dagron's dynamic execution system lets you define expander functions that modify the DAG at runtime — adding new nodes, removing nodes, or rewiring edges based on the results of upstream tasks.
Concepts
Expanders
An expander is a Python function associated with a specific node. After that
node completes, the executor calls the expander with the node's name and result.
The expander returns a DynamicModification describing
what to change.
DynamicModification
A DynamicModification is a data class with two fields:
| Field | Type | Description |
|---|---|---|
add_nodes | list[DynamicNodeSpec] | Nodes to add to the DAG |
remove_nodes | list[str] | Nodes to remove from the DAG |
DynamicNodeSpec
Each new node is described by a DynamicNodeSpec:
| Field | Type | Description |
|---|---|---|
name | str | The new node's name |
task | Callable | The task function to execute |
dependencies | list[str] | Nodes this new node depends on |
dependents | list[str] | Nodes that depend on this new node |
DynamicExecutor
The DynamicExecutor wraps the standard executor and
applies modifications between scheduling rounds.
Basic example
Let us build a pipeline where a discovery step determines how many files to process:
import dagron
# Initial DAG with a discovery node and a merge node
dag = (
dagron.DAG.builder()
.add_node("discover")
.add_node("merge")
.add_edge("discover", "merge")
.build()
)
Initial DAG before expansion. The discover node will spawn dynamic children.
Define the expander
def discover_expander(name, result):
"""After 'discover' runs, add one processing node per file."""
files = result # e.g., ["a.csv", "b.csv", "c.csv"]
new_nodes = []
for filename in files:
node_name = f"process_{filename.replace('.', '_')}"
new_nodes.append(
dagron.DynamicNodeSpec(
name=node_name,
task=lambda fn=filename: f"processed {fn}",
dependencies=["discover"],
dependents=["merge"],
)
)
return dagron.DynamicModification(
add_nodes=new_nodes,
remove_nodes=[],
)
Execute
tasks = {
"discover": lambda: ["a.csv", "b.csv", "c.csv"],
"merge": lambda: "all files merged",
}
expanders = {
"discover": discover_expander,
}
executor = dagron.DynamicExecutor(dag, expanders=expanders, max_workers=4)
result = executor.execute(tasks)
print(result.succeeded) # 5 (discover + 3 process nodes + merge)
print(list(result.node_results.keys()))
# ['discover', 'process_a_csv', 'process_b_csv', 'process_c_csv', 'merge']
After discover completes, the expander fires and adds three new nodes. The
executor then schedules them in parallel, and finally runs merge:
DAG after dynamic expansion. Three processing nodes were added at runtime.
DynamicModification in detail
Adding nodes
dagron.DynamicModification(
add_nodes=[
dagron.DynamicNodeSpec(
name="new_node",
task=lambda: "hello",
dependencies=["existing_parent"],
dependents=["existing_child"],
),
],
remove_nodes=[],
)
The dependencies field creates edges FROM those nodes TO the new node.
The dependents field creates edges FROM the new node TO those nodes.
Removing nodes
dagron.DynamicModification(
add_nodes=[],
remove_nodes=["obsolete_node"],
)
Removing a node also removes all its edges. Be careful not to remove a node that has already been scheduled or completed.
Combined add and remove
def replace_placeholder(name, result):
"""Replace a placeholder node with specific implementations."""
return dagron.DynamicModification(
add_nodes=[
dagron.DynamicNodeSpec(
name="specific_impl_a",
task=lambda: "impl a",
dependencies=["upstream"],
dependents=["downstream"],
),
dagron.DynamicNodeSpec(
name="specific_impl_b",
task=lambda: "impl b",
dependencies=["upstream"],
dependents=["downstream"],
),
],
remove_nodes=["placeholder"],
)
Real-world example: dynamic ETL pipeline
A common scenario is an ETL pipeline that discovers database tables at runtime and creates a processing branch for each:
import dagron
import time
# Initial DAG
dag = (
dagron.DAG.builder()
.add_node("discover_tables")
.add_node("aggregate")
.add_node("publish")
.add_edge("discover_tables", "aggregate")
.add_edge("aggregate", "publish")
.build()
)
def discover_tables():
"""Simulate querying a database catalog."""
time.sleep(0.1)
return ["users", "orders", "products", "reviews"]
def aggregate():
return "aggregated all tables"
def publish():
return "published to data warehouse"
def make_table_processor(table_name):
"""Factory function for table-specific processors."""
def process():
time.sleep(0.2) # simulate processing
return f"processed {table_name}: 1000 rows"
return process
def table_expander(name, result):
"""Create one processing node per discovered table."""
tables = result
nodes = []
for table in tables:
nodes.append(
dagron.DynamicNodeSpec(
name=f"process_{table}",
task=make_table_processor(table),
dependencies=["discover_tables"],
dependents=["aggregate"],
)
)
return dagron.DynamicModification(add_nodes=nodes, remove_nodes=[])
tasks = {
"discover_tables": discover_tables,
"aggregate": aggregate,
"publish": publish,
}
executor = dagron.DynamicExecutor(
dag,
expanders={"discover_tables": table_expander},
max_workers=4,
)
result = executor.execute(tasks)
print(f"Executed {result.succeeded} tasks")
# Executed 7 tasks
for name, nr in result.node_results.items():
print(f" {name}: {nr.status.name} ({nr.duration_seconds:.3f}s)")
After expansion: four table-processing nodes run in parallel.
Chained expansion
Expanders can trigger further expansions. If a dynamically-added node also has an expander, it fires after that node completes:
dag = (
dagron.DAG.builder()
.add_node("level_0")
.add_node("final")
.add_edge("level_0", "final")
.build()
)
def level_0_expander(name, result):
return dagron.DynamicModification(
add_nodes=[
dagron.DynamicNodeSpec(
name="level_1",
task=lambda: ["sub_a", "sub_b"],
dependencies=["level_0"],
dependents=["final"],
),
],
remove_nodes=[],
)
def level_1_expander(name, result):
nodes = []
for sub in result:
nodes.append(
dagron.DynamicNodeSpec(
name=f"level_2_{sub}",
task=lambda s=sub: f"processed {s}",
dependencies=["level_1"],
dependents=["final"],
)
)
return dagron.DynamicModification(add_nodes=nodes, remove_nodes=[])
tasks = {
"level_0": lambda: "started",
"final": lambda: "done",
}
expanders = {
"level_0": level_0_expander,
"level_1": level_1_expander,
}
executor = dagron.DynamicExecutor(dag, expanders=expanders)
result = executor.execute(tasks)
print(list(result.node_results.keys()))
# ['level_0', 'level_1', 'level_2_sub_a', 'level_2_sub_b', 'final']
Two-level chained expansion. level_0 spawns level_1, which spawns level_2 nodes.
Model selection example
Use dynamic expansion to choose a model architecture at runtime:
import dagron
dag = (
dagron.DAG.builder()
.add_edge("prepare_data", "select_model")
.add_edge("select_model", "evaluate")
.build()
)
def model_selector_expander(name, result):
"""Based on data characteristics, pick the right model."""
data_size = result["rows"]
if data_size < 1000:
model = dagron.DynamicNodeSpec(
name="train_linear",
task=lambda: {"model": "linear", "accuracy": 0.85},
dependencies=["select_model"],
dependents=["evaluate"],
)
elif data_size < 100000:
model = dagron.DynamicNodeSpec(
name="train_xgboost",
task=lambda: {"model": "xgboost", "accuracy": 0.92},
dependencies=["select_model"],
dependents=["evaluate"],
)
else:
model = dagron.DynamicNodeSpec(
name="train_neural_net",
task=lambda: {"model": "nn", "accuracy": 0.96},
dependencies=["select_model"],
dependents=["evaluate"],
)
return dagron.DynamicModification(add_nodes=[model], remove_nodes=[])
tasks = {
"prepare_data": lambda: {"rows": 50000, "features": 20},
"select_model": lambda: {"rows": 50000},
"evaluate": lambda: "evaluated",
}
executor = dagron.DynamicExecutor(
dag,
expanders={"select_model": model_selector_expander},
)
result = executor.execute(tasks)
# With 50000 rows, xgboost was selected
print("train_xgboost" in result.node_results) # True
print(result.node_results["train_xgboost"].result)
# {'model': 'xgboost', 'accuracy': 0.92}
Dynamic model selection. XGBoost was chosen based on the data size.
Error handling in expanders
If an expander raises an exception, the node that triggered it is marked as failed and its descendants are skipped:
def bad_expander(name, result):
raise RuntimeError("Expansion failed!")
expanders = {"discover": bad_expander}
executor = dagron.DynamicExecutor(dag, expanders=expanders)
result = executor.execute(tasks)
print(result.node_results["discover"].status) # FAILED
To handle expansion errors gracefully, wrap your expander logic in try/except:
def safe_expander(name, result):
try:
# ... expansion logic ...
return dagron.DynamicModification(add_nodes=nodes, remove_nodes=[])
except Exception:
# Return empty modification — no expansion, but no failure
return dagron.DynamicModification(add_nodes=[], remove_nodes=[])
Best practices
-
Use factory functions for tasks. When creating tasks in a loop, use a factory to capture the loop variable correctly:
def make_task(item):
def task():
return process(item)
return task
# NOT: lambda: process(item) -- captures the variable, not the value -
Name dynamic nodes predictably. Use naming conventions like
process_{table}so you can use glob/regex matching later. -
Limit expansion depth. Chained expansions can grow the graph unexpectedly. Set reasonable limits in your expander logic.
-
Combine with conditional edges. Use Conditional Execution to gate whether expansion happens at all.
-
Test expanders in isolation. Write unit tests for your expander functions with various inputs before running them in the full pipeline.
-
Monitor graph size. Log
dag.node_count()after expansion to detect runaway growth.
API reference
| Class / Method | Docs |
|---|---|
DynamicExecutor | Dynamic |
DynamicModification | Dynamic |
DynamicNodeSpec | Dynamic |
Next steps
- Conditional Execution — gate branches with runtime predicates.
- Checkpointing — checkpoint dynamic pipelines for resume.
- Tracing & Profiling — trace dynamic expansion events.