Building DAGs
dagron offers multiple ways to construct a directed acyclic graph. This guide covers every construction pattern, from quick one-liners to advanced builder configurations with payloads and metadata.
Construction patterns at a glance
| Pattern | Best for | Example |
|---|---|---|
DAG.builder() | Most use cases — fluent, validated | DAG.builder().add_node("a").build() |
DAG() + mutations | Incremental / dynamic construction | dag.add_node("a") |
| Bulk helpers | Large graphs from lists | builder.add_nodes([...]).add_edges([...]) |
Pipeline | Linear function chains | Pipeline(tasks=[fn1, fn2]) |
The fluent builder
DAG.builder() returns a DAGBuilder that chains method
calls and validates the graph when you call .build().
import dagron
dag = (
dagron.DAG.builder()
.add_node("fetch")
.add_node("parse")
.add_node("validate")
.add_node("store")
.add_edge("fetch", "parse")
.add_edge("parse", "validate")
.add_edge("validate", "store")
.build()
)
Linear four-node pipeline created with the builder.
Cycle detection
The builder rejects cycles at .build() time:
try:
dagron.DAG.builder() \
.add_node("a").add_node("b").add_node("c") \
.add_edge("a", "b") \
.add_edge("b", "c") \
.add_edge("c", "a") \
.build()
except dagron.CycleError as e:
print(e) # Cycle detected: c -> a
This guarantee means that any DAG instance you hold is always valid.
Implicit node creation
When you add an edge, both endpoints are created automatically if they do not already exist:
dag = (
dagron.DAG.builder()
.add_edge("a", "b")
.add_edge("b", "c")
.build()
)
print(dag.node_count()) # 3
This shorthand is convenient for small graphs where you do not need to attach metadata to every node.
Direct construction
If you prefer an imperative style, create a bare DAG and mutate it:
dag = dagron.DAG()
dag.add_node("ingest")
dag.add_node("clean")
dag.add_node("enrich")
dag.add_node("publish")
dag.add_edge("ingest", "clean")
dag.add_edge("clean", "enrich")
dag.add_edge("enrich", "publish")
Direct mutation is useful when the graph structure is determined at runtime — for example, when reading a config file or discovering tasks from a plugin registry.
Checking membership
print(dag.has_node("clean")) # True
print(dag.has_edge("clean", "enrich")) # True
Removing nodes and edges
dag.remove_edge("enrich", "publish")
dag.remove_node("publish")
print(dag.node_count()) # 3
print(dag.edge_count()) # 2
Removing a node also removes all edges connected to it.
Bulk operations
When building large graphs, individual add_node / add_edge calls become
verbose. Use the bulk helpers instead:
dag = (
dagron.DAG.builder()
.add_nodes(["extract", "transform_a", "transform_b", "merge", "load"])
.add_edges([
("extract", "transform_a"),
("extract", "transform_b"),
("transform_a", "merge"),
("transform_b", "merge"),
("merge", "load"),
])
.build()
)
Fan-out / fan-in graph created with bulk helpers.
Building from data
A common pattern is constructing the graph from a list of records (e.g., rows from a database or lines in a YAML file):
records = [
{"name": "extract", "depends_on": []},
{"name": "transform", "depends_on": ["extract"]},
{"name": "load", "depends_on": ["transform"]},
]
builder = dagron.DAG.builder()
for rec in records:
builder.add_node(rec["name"])
for rec in records:
for dep in rec["depends_on"]:
builder.add_edge(dep, rec["name"])
dag = builder.build()
This makes it straightforward to drive graph construction from external configuration.
Node payloads
Every node can carry an arbitrary Python object called a payload. Payloads are useful for attaching configuration, metadata, or cost hints without polluting the task functions.
dag = dagron.DAG()
dag.add_node("train", payload={"epochs": 10, "lr": 0.001})
dag.add_node("evaluate", payload={"metrics": ["accuracy", "f1"]})
dag.add_edge("train", "evaluate")
Retrieve payloads later:
# Access via the nodes iterator
for name in dag.nodes():
print(name, dag.get_payload(name))
Payloads with the builder
dag = (
dagron.DAG.builder()
.add_node("fetch", payload={"url": "https://api.example.com/data"})
.add_node("parse", payload={"format": "json"})
.add_edge("fetch", "parse")
.build()
)
Contracts
The builder supports contracts that declare the expected output type of a node. Contracts are checked at build time and serve as living documentation of your pipeline's data flow.
dag = (
dagron.DAG.builder()
.add_node("extract")
.add_node("transform")
.add_node("load")
.add_edge("extract", "transform")
.add_edge("transform", "load")
.contract("extract", output=list)
.contract("transform", output=dict)
.contract("load", output=str)
.build()
)
See Contracts in the API reference for details on runtime enforcement and custom validators.
Understanding the graph structure
Once a DAG is built, you have a rich set of read-only accessors:
Nodes and edges
print(list(dag.nodes())) # ['extract', 'transform', 'load']
print(dag.node_count()) # 3
print(dag.edge_count()) # 2
Roots and leaves
Roots have no incoming edges (in-degree 0). Leaves have no outgoing edges (out-degree 0).
print(dag.roots()) # ['extract']
print(dag.leaves()) # ['load']
Roots (green) and leaves (yellow) of a linear pipeline.
Degree
print(dag.in_degree("transform")) # 1
print(dag.out_degree("transform")) # 1
print(dag.in_degree("extract")) # 0 (root)
print(dag.out_degree("load")) # 0 (leaf)
Neighbourhood queries
# Direct parents
print(dag.predecessors("transform")) # ['extract']
# Direct children
print(dag.successors("transform")) # ['load']
# Transitive ancestors (all upstream nodes)
print(dag.ancestors("load")) # ['extract', 'transform']
# Transitive descendants (all downstream nodes)
print(dag.descendants("extract")) # ['transform', 'load']
Topological ordering
A topological sort produces an ordering where every node appears after all of its dependencies:
print(dag.topological_sort())
# ['extract', 'transform', 'load']
For parallel execution planning, topological levels group nodes that can run concurrently:
for level, nodes in enumerate(dag.topological_levels()):
print(f"Level {level}: {nodes}")
# Level 0: ['extract']
# Level 1: ['transform']
# Level 2: ['load']
See Inspecting Graphs for the full analysis toolkit.
Diamond dependencies
A common pattern in data pipelines is the diamond shape — one root fans out to multiple branches that converge at a single join node:
dag = (
dagron.DAG.builder()
.add_nodes(["source", "branch_a", "branch_b", "branch_c", "join"])
.add_edges([
("source", "branch_a"),
("source", "branch_b"),
("source", "branch_c"),
("branch_a", "join"),
("branch_b", "join"),
("branch_c", "join"),
])
.build()
)
Diamond DAG. The three branches execute in parallel; join waits for all of them.
The executor automatically parallelises the three branches and synchronises at the join node.
Multi-layer pipelines
For complex ML workflows you might have many layers:
dag = (
dagron.DAG.builder()
# Data layer
.add_node("raw_data")
.add_node("clean_data")
.add_edge("raw_data", "clean_data")
# Feature layer
.add_node("feature_numeric")
.add_node("feature_text")
.add_node("feature_image")
.add_edge("clean_data", "feature_numeric")
.add_edge("clean_data", "feature_text")
.add_edge("clean_data", "feature_image")
# Model layer
.add_node("train_model")
.add_edge("feature_numeric", "train_model")
.add_edge("feature_text", "train_model")
.add_edge("feature_image", "train_model")
# Evaluation layer
.add_node("evaluate")
.add_node("deploy")
.add_edge("train_model", "evaluate")
.add_edge("evaluate", "deploy")
.build()
)
print(dag.node_count()) # 8
print(dag.edge_count()) # 8
for level, nodes in enumerate(dag.topological_levels()):
print(f"Level {level}: {nodes}")
# Level 0: ['raw_data']
# Level 1: ['clean_data']
# Level 2: ['feature_numeric', 'feature_text', 'feature_image']
# Level 3: ['train_model']
# Level 4: ['evaluate']
# Level 5: ['deploy']
Multi-layer ML pipeline with parallel feature extraction.
Pattern matching on node names
dagron supports finding nodes by name patterns, which is handy when you build graphs programmatically with naming conventions:
# Glob-style matching
feature_nodes = dag.nodes_matching_glob("feature_*")
print(feature_nodes) # ['feature_numeric', 'feature_text', 'feature_image']
# Regex matching
data_nodes = dag.nodes_matching_regex(r".*_data$")
print(data_nodes) # ['raw_data', 'clean_data']
Graph statistics
The stats() method returns a summary of the graph:
s = dag.stats()
print(s)
# DAGStats(nodes=8, edges=8, roots=1, leaves=1, depth=5, width=3, density=0.143)
This is useful for logging and monitoring in production.
Composing multiple DAGs
Large systems often consist of several independent DAGs that need to be wired
together. The compose() function merges DAGs with namespaced
node names and cross-DAG connections:
etl_dag = (
dagron.DAG.builder()
.add_edge("extract", "transform")
.add_edge("transform", "load")
.build()
)
ml_dag = (
dagron.DAG.builder()
.add_edge("train", "evaluate")
.build()
)
combined = dagron.compose(
dags={"etl": etl_dag, "ml": ml_dag},
connections=[("etl/load", "ml/train")],
)
print(list(combined.nodes()))
# ['etl/extract', 'etl/transform', 'etl/load', 'ml/train', 'ml/evaluate']
Two DAGs composed into a single pipeline with a cross-DAG edge.
Composition is covered fully in Graph Transforms.
Validation and linting
After building a DAG, you can run the built-in linter to catch common issues:
from dagron import lint
warnings = lint(dag)
for w in warnings:
print(w)
# e.g., "Node 'deploy' has in-degree 1 and out-degree 0 — consider if it should be a leaf."
And validate structural integrity at any time:
dag.validate() # Raises if the graph is malformed
See Inspecting Graphs for the full analysis and linting toolkit.
Best practices
-
Use the builder for static graphs. It gives you cycle detection and a clean, readable construction block.
-
Use direct mutation for dynamic graphs. When the structure depends on runtime decisions, building imperatively is simpler.
-
Attach payloads for configuration. Keep task functions pure; put parameters in payloads.
-
Name nodes with conventions. Use prefixes like
extract_,transform_,load_so you can use glob/regex matching later. -
Start small and compose. Build self-contained sub-DAGs and wire them together with
compose().
API reference
For the full list of construction methods, see:
DAG— the core graph class and all its methods.DAGBuilder— fluent builder API.compose()— DAG composition.
Next steps
- Executing Tasks — learn how to run your DAG with executors.
- Inspecting Graphs — analyze structure, find critical paths, run queries.
- Graph Transforms — filter, merge, reverse, and reshape DAGs.