Skip to main content

Dynamic DAGs

Sometimes you cannot know the full shape of your pipeline until execution is underway. A data-discovery step might reveal 50 tables that each need their own processing branch. A model-selection step might choose between three architectures. A file scanner might find a variable number of inputs.

dagron's dynamic execution system lets you define expander functions that modify the DAG at runtime — adding new nodes, removing nodes, or rewiring edges based on the results of upstream tasks.

Concepts

Expanders

An expander is a Python function associated with a specific node. After that node completes, the executor calls the expander with the node's name and result. The expander returns a DynamicModification describing what to change.

DynamicModification

A DynamicModification is a data class with two fields:

FieldTypeDescription
add_nodeslist[DynamicNodeSpec]Nodes to add to the DAG
remove_nodeslist[str]Nodes to remove from the DAG

DynamicNodeSpec

Each new node is described by a DynamicNodeSpec:

FieldTypeDescription
namestrThe new node's name
taskCallableThe task function to execute
dependencieslist[str]Nodes this new node depends on
dependentslist[str]Nodes that depend on this new node

DynamicExecutor

The DynamicExecutor wraps the standard executor and applies modifications between scheduling rounds.

Basic example

Let us build a pipeline where a discovery step determines how many files to process:

import dagron

# Initial DAG with a discovery node and a merge node
dag = (
dagron.DAG.builder()
.add_node("discover")
.add_node("merge")
.add_edge("discover", "merge")
.build()
)

Initial DAG before expansion. The discover node will spawn dynamic children.

Define the expander

def discover_expander(name, result):
"""After 'discover' runs, add one processing node per file."""
files = result # e.g., ["a.csv", "b.csv", "c.csv"]

new_nodes = []
for filename in files:
node_name = f"process_{filename.replace('.', '_')}"
new_nodes.append(
dagron.DynamicNodeSpec(
name=node_name,
task=lambda fn=filename: f"processed {fn}",
dependencies=["discover"],
dependents=["merge"],
)
)

return dagron.DynamicModification(
add_nodes=new_nodes,
remove_nodes=[],
)

Execute

tasks = {
"discover": lambda: ["a.csv", "b.csv", "c.csv"],
"merge": lambda: "all files merged",
}

expanders = {
"discover": discover_expander,
}

executor = dagron.DynamicExecutor(dag, expanders=expanders, max_workers=4)
result = executor.execute(tasks)

print(result.succeeded) # 5 (discover + 3 process nodes + merge)
print(list(result.node_results.keys()))
# ['discover', 'process_a_csv', 'process_b_csv', 'process_c_csv', 'merge']

After discover completes, the expander fires and adds three new nodes. The executor then schedules them in parallel, and finally runs merge:

DAG after dynamic expansion. Three processing nodes were added at runtime.

DynamicModification in detail

Adding nodes

dagron.DynamicModification(
add_nodes=[
dagron.DynamicNodeSpec(
name="new_node",
task=lambda: "hello",
dependencies=["existing_parent"],
dependents=["existing_child"],
),
],
remove_nodes=[],
)

The dependencies field creates edges FROM those nodes TO the new node. The dependents field creates edges FROM the new node TO those nodes.

Removing nodes

dagron.DynamicModification(
add_nodes=[],
remove_nodes=["obsolete_node"],
)

Removing a node also removes all its edges. Be careful not to remove a node that has already been scheduled or completed.

Combined add and remove

def replace_placeholder(name, result):
"""Replace a placeholder node with specific implementations."""
return dagron.DynamicModification(
add_nodes=[
dagron.DynamicNodeSpec(
name="specific_impl_a",
task=lambda: "impl a",
dependencies=["upstream"],
dependents=["downstream"],
),
dagron.DynamicNodeSpec(
name="specific_impl_b",
task=lambda: "impl b",
dependencies=["upstream"],
dependents=["downstream"],
),
],
remove_nodes=["placeholder"],
)

Real-world example: dynamic ETL pipeline

A common scenario is an ETL pipeline that discovers database tables at runtime and creates a processing branch for each:

import dagron
import time

# Initial DAG
dag = (
dagron.DAG.builder()
.add_node("discover_tables")
.add_node("aggregate")
.add_node("publish")
.add_edge("discover_tables", "aggregate")
.add_edge("aggregate", "publish")
.build()
)

def discover_tables():
"""Simulate querying a database catalog."""
time.sleep(0.1)
return ["users", "orders", "products", "reviews"]

def aggregate():
return "aggregated all tables"

def publish():
return "published to data warehouse"

def make_table_processor(table_name):
"""Factory function for table-specific processors."""
def process():
time.sleep(0.2) # simulate processing
return f"processed {table_name}: 1000 rows"
return process

def table_expander(name, result):
"""Create one processing node per discovered table."""
tables = result
nodes = []
for table in tables:
nodes.append(
dagron.DynamicNodeSpec(
name=f"process_{table}",
task=make_table_processor(table),
dependencies=["discover_tables"],
dependents=["aggregate"],
)
)
return dagron.DynamicModification(add_nodes=nodes, remove_nodes=[])

tasks = {
"discover_tables": discover_tables,
"aggregate": aggregate,
"publish": publish,
}

executor = dagron.DynamicExecutor(
dag,
expanders={"discover_tables": table_expander},
max_workers=4,
)
result = executor.execute(tasks)

print(f"Executed {result.succeeded} tasks")
# Executed 7 tasks

for name, nr in result.node_results.items():
print(f" {name}: {nr.status.name} ({nr.duration_seconds:.3f}s)")

After expansion: four table-processing nodes run in parallel.

Chained expansion

Expanders can trigger further expansions. If a dynamically-added node also has an expander, it fires after that node completes:

dag = (
dagron.DAG.builder()
.add_node("level_0")
.add_node("final")
.add_edge("level_0", "final")
.build()
)

def level_0_expander(name, result):
return dagron.DynamicModification(
add_nodes=[
dagron.DynamicNodeSpec(
name="level_1",
task=lambda: ["sub_a", "sub_b"],
dependencies=["level_0"],
dependents=["final"],
),
],
remove_nodes=[],
)

def level_1_expander(name, result):
nodes = []
for sub in result:
nodes.append(
dagron.DynamicNodeSpec(
name=f"level_2_{sub}",
task=lambda s=sub: f"processed {s}",
dependencies=["level_1"],
dependents=["final"],
)
)
return dagron.DynamicModification(add_nodes=nodes, remove_nodes=[])

tasks = {
"level_0": lambda: "started",
"final": lambda: "done",
}

expanders = {
"level_0": level_0_expander,
"level_1": level_1_expander,
}

executor = dagron.DynamicExecutor(dag, expanders=expanders)
result = executor.execute(tasks)

print(list(result.node_results.keys()))
# ['level_0', 'level_1', 'level_2_sub_a', 'level_2_sub_b', 'final']

Two-level chained expansion. level_0 spawns level_1, which spawns level_2 nodes.

Model selection example

Use dynamic expansion to choose a model architecture at runtime:

import dagron

dag = (
dagron.DAG.builder()
.add_edge("prepare_data", "select_model")
.add_edge("select_model", "evaluate")
.build()
)

def model_selector_expander(name, result):
"""Based on data characteristics, pick the right model."""
data_size = result["rows"]

if data_size < 1000:
model = dagron.DynamicNodeSpec(
name="train_linear",
task=lambda: {"model": "linear", "accuracy": 0.85},
dependencies=["select_model"],
dependents=["evaluate"],
)
elif data_size < 100000:
model = dagron.DynamicNodeSpec(
name="train_xgboost",
task=lambda: {"model": "xgboost", "accuracy": 0.92},
dependencies=["select_model"],
dependents=["evaluate"],
)
else:
model = dagron.DynamicNodeSpec(
name="train_neural_net",
task=lambda: {"model": "nn", "accuracy": 0.96},
dependencies=["select_model"],
dependents=["evaluate"],
)

return dagron.DynamicModification(add_nodes=[model], remove_nodes=[])

tasks = {
"prepare_data": lambda: {"rows": 50000, "features": 20},
"select_model": lambda: {"rows": 50000},
"evaluate": lambda: "evaluated",
}

executor = dagron.DynamicExecutor(
dag,
expanders={"select_model": model_selector_expander},
)
result = executor.execute(tasks)

# With 50000 rows, xgboost was selected
print("train_xgboost" in result.node_results) # True
print(result.node_results["train_xgboost"].result)
# {'model': 'xgboost', 'accuracy': 0.92}

Dynamic model selection. XGBoost was chosen based on the data size.

Error handling in expanders

If an expander raises an exception, the node that triggered it is marked as failed and its descendants are skipped:

def bad_expander(name, result):
raise RuntimeError("Expansion failed!")

expanders = {"discover": bad_expander}
executor = dagron.DynamicExecutor(dag, expanders=expanders)
result = executor.execute(tasks)

print(result.node_results["discover"].status) # FAILED

To handle expansion errors gracefully, wrap your expander logic in try/except:

def safe_expander(name, result):
try:
# ... expansion logic ...
return dagron.DynamicModification(add_nodes=nodes, remove_nodes=[])
except Exception:
# Return empty modification — no expansion, but no failure
return dagron.DynamicModification(add_nodes=[], remove_nodes=[])

Best practices

  1. Use factory functions for tasks. When creating tasks in a loop, use a factory to capture the loop variable correctly:

    def make_task(item):
    def task():
    return process(item)
    return task

    # NOT: lambda: process(item) -- captures the variable, not the value
  2. Name dynamic nodes predictably. Use naming conventions like process_{table} so you can use glob/regex matching later.

  3. Limit expansion depth. Chained expansions can grow the graph unexpectedly. Set reasonable limits in your expander logic.

  4. Combine with conditional edges. Use Conditional Execution to gate whether expansion happens at all.

  5. Test expanders in isolation. Write unit tests for your expander functions with various inputs before running them in the full pipeline.

  6. Monitor graph size. Log dag.node_count() after expansion to detect runaway growth.

API reference

Class / MethodDocs
DynamicExecutorDynamic
DynamicModificationDynamic
DynamicNodeSpecDynamic

Next steps