Skip to main content

Building DAGs

dagron offers multiple ways to construct a directed acyclic graph. This guide covers every construction pattern, from quick one-liners to advanced builder configurations with payloads and metadata.

Construction patterns at a glance

PatternBest forExample
DAG.builder()Most use cases — fluent, validatedDAG.builder().add_node("a").build()
DAG() + mutationsIncremental / dynamic constructiondag.add_node("a")
Bulk helpersLarge graphs from listsbuilder.add_nodes([...]).add_edges([...])
PipelineLinear function chainsPipeline(tasks=[fn1, fn2])

The fluent builder

DAG.builder() returns a DAGBuilder that chains method calls and validates the graph when you call .build().

import dagron

dag = (
dagron.DAG.builder()
.add_node("fetch")
.add_node("parse")
.add_node("validate")
.add_node("store")
.add_edge("fetch", "parse")
.add_edge("parse", "validate")
.add_edge("validate", "store")
.build()
)

Linear four-node pipeline created with the builder.

Cycle detection

The builder rejects cycles at .build() time:

try:
dagron.DAG.builder() \
.add_node("a").add_node("b").add_node("c") \
.add_edge("a", "b") \
.add_edge("b", "c") \
.add_edge("c", "a") \
.build()
except dagron.CycleError as e:
print(e) # Cycle detected: c -> a

This guarantee means that any DAG instance you hold is always valid.

Implicit node creation

When you add an edge, both endpoints are created automatically if they do not already exist:

dag = (
dagron.DAG.builder()
.add_edge("a", "b")
.add_edge("b", "c")
.build()
)
print(dag.node_count()) # 3

This shorthand is convenient for small graphs where you do not need to attach metadata to every node.

Direct construction

If you prefer an imperative style, create a bare DAG and mutate it:

dag = dagron.DAG()

dag.add_node("ingest")
dag.add_node("clean")
dag.add_node("enrich")
dag.add_node("publish")

dag.add_edge("ingest", "clean")
dag.add_edge("clean", "enrich")
dag.add_edge("enrich", "publish")

Direct mutation is useful when the graph structure is determined at runtime — for example, when reading a config file or discovering tasks from a plugin registry.

Checking membership

print(dag.has_node("clean"))      # True
print(dag.has_edge("clean", "enrich")) # True

Removing nodes and edges

dag.remove_edge("enrich", "publish")
dag.remove_node("publish")

print(dag.node_count()) # 3
print(dag.edge_count()) # 2

Removing a node also removes all edges connected to it.

Bulk operations

When building large graphs, individual add_node / add_edge calls become verbose. Use the bulk helpers instead:

dag = (
dagron.DAG.builder()
.add_nodes(["extract", "transform_a", "transform_b", "merge", "load"])
.add_edges([
("extract", "transform_a"),
("extract", "transform_b"),
("transform_a", "merge"),
("transform_b", "merge"),
("merge", "load"),
])
.build()
)

Fan-out / fan-in graph created with bulk helpers.

Building from data

A common pattern is constructing the graph from a list of records (e.g., rows from a database or lines in a YAML file):

records = [
{"name": "extract", "depends_on": []},
{"name": "transform", "depends_on": ["extract"]},
{"name": "load", "depends_on": ["transform"]},
]

builder = dagron.DAG.builder()
for rec in records:
builder.add_node(rec["name"])
for rec in records:
for dep in rec["depends_on"]:
builder.add_edge(dep, rec["name"])
dag = builder.build()

This makes it straightforward to drive graph construction from external configuration.

Node payloads

Every node can carry an arbitrary Python object called a payload. Payloads are useful for attaching configuration, metadata, or cost hints without polluting the task functions.

dag = dagron.DAG()
dag.add_node("train", payload={"epochs": 10, "lr": 0.001})
dag.add_node("evaluate", payload={"metrics": ["accuracy", "f1"]})
dag.add_edge("train", "evaluate")

Retrieve payloads later:

# Access via the nodes iterator
for name in dag.nodes():
print(name, dag.get_payload(name))

Payloads with the builder

dag = (
dagron.DAG.builder()
.add_node("fetch", payload={"url": "https://api.example.com/data"})
.add_node("parse", payload={"format": "json"})
.add_edge("fetch", "parse")
.build()
)

Contracts

The builder supports contracts that declare the expected output type of a node. Contracts are checked at build time and serve as living documentation of your pipeline's data flow.

dag = (
dagron.DAG.builder()
.add_node("extract")
.add_node("transform")
.add_node("load")
.add_edge("extract", "transform")
.add_edge("transform", "load")
.contract("extract", output=list)
.contract("transform", output=dict)
.contract("load", output=str)
.build()
)

See Contracts in the API reference for details on runtime enforcement and custom validators.

Understanding the graph structure

Once a DAG is built, you have a rich set of read-only accessors:

Nodes and edges

print(list(dag.nodes()))    # ['extract', 'transform', 'load']
print(dag.node_count()) # 3
print(dag.edge_count()) # 2

Roots and leaves

Roots have no incoming edges (in-degree 0). Leaves have no outgoing edges (out-degree 0).

print(dag.roots())   # ['extract']
print(dag.leaves()) # ['load']

Roots (green) and leaves (yellow) of a linear pipeline.

Degree

print(dag.in_degree("transform"))   # 1
print(dag.out_degree("transform")) # 1
print(dag.in_degree("extract")) # 0 (root)
print(dag.out_degree("load")) # 0 (leaf)

Neighbourhood queries

# Direct parents
print(dag.predecessors("transform")) # ['extract']

# Direct children
print(dag.successors("transform")) # ['load']

# Transitive ancestors (all upstream nodes)
print(dag.ancestors("load")) # ['extract', 'transform']

# Transitive descendants (all downstream nodes)
print(dag.descendants("extract")) # ['transform', 'load']

Topological ordering

A topological sort produces an ordering where every node appears after all of its dependencies:

print(dag.topological_sort())
# ['extract', 'transform', 'load']

For parallel execution planning, topological levels group nodes that can run concurrently:

for level, nodes in enumerate(dag.topological_levels()):
print(f"Level {level}: {nodes}")
# Level 0: ['extract']
# Level 1: ['transform']
# Level 2: ['load']

See Inspecting Graphs for the full analysis toolkit.

Diamond dependencies

A common pattern in data pipelines is the diamond shape — one root fans out to multiple branches that converge at a single join node:

dag = (
dagron.DAG.builder()
.add_nodes(["source", "branch_a", "branch_b", "branch_c", "join"])
.add_edges([
("source", "branch_a"),
("source", "branch_b"),
("source", "branch_c"),
("branch_a", "join"),
("branch_b", "join"),
("branch_c", "join"),
])
.build()
)

Diamond DAG. The three branches execute in parallel; join waits for all of them.

The executor automatically parallelises the three branches and synchronises at the join node.

Multi-layer pipelines

For complex ML workflows you might have many layers:

dag = (
dagron.DAG.builder()
# Data layer
.add_node("raw_data")
.add_node("clean_data")
.add_edge("raw_data", "clean_data")
# Feature layer
.add_node("feature_numeric")
.add_node("feature_text")
.add_node("feature_image")
.add_edge("clean_data", "feature_numeric")
.add_edge("clean_data", "feature_text")
.add_edge("clean_data", "feature_image")
# Model layer
.add_node("train_model")
.add_edge("feature_numeric", "train_model")
.add_edge("feature_text", "train_model")
.add_edge("feature_image", "train_model")
# Evaluation layer
.add_node("evaluate")
.add_node("deploy")
.add_edge("train_model", "evaluate")
.add_edge("evaluate", "deploy")
.build()
)

print(dag.node_count()) # 8
print(dag.edge_count()) # 8

for level, nodes in enumerate(dag.topological_levels()):
print(f"Level {level}: {nodes}")
# Level 0: ['raw_data']
# Level 1: ['clean_data']
# Level 2: ['feature_numeric', 'feature_text', 'feature_image']
# Level 3: ['train_model']
# Level 4: ['evaluate']
# Level 5: ['deploy']

Multi-layer ML pipeline with parallel feature extraction.

Pattern matching on node names

dagron supports finding nodes by name patterns, which is handy when you build graphs programmatically with naming conventions:

# Glob-style matching
feature_nodes = dag.nodes_matching_glob("feature_*")
print(feature_nodes) # ['feature_numeric', 'feature_text', 'feature_image']

# Regex matching
data_nodes = dag.nodes_matching_regex(r".*_data$")
print(data_nodes) # ['raw_data', 'clean_data']

Graph statistics

The stats() method returns a summary of the graph:

s = dag.stats()
print(s)
# DAGStats(nodes=8, edges=8, roots=1, leaves=1, depth=5, width=3, density=0.143)

This is useful for logging and monitoring in production.

Composing multiple DAGs

Large systems often consist of several independent DAGs that need to be wired together. The compose() function merges DAGs with namespaced node names and cross-DAG connections:

etl_dag = (
dagron.DAG.builder()
.add_edge("extract", "transform")
.add_edge("transform", "load")
.build()
)

ml_dag = (
dagron.DAG.builder()
.add_edge("train", "evaluate")
.build()
)

combined = dagron.compose(
dags={"etl": etl_dag, "ml": ml_dag},
connections=[("etl/load", "ml/train")],
)

print(list(combined.nodes()))
# ['etl/extract', 'etl/transform', 'etl/load', 'ml/train', 'ml/evaluate']

Two DAGs composed into a single pipeline with a cross-DAG edge.

Composition is covered fully in Graph Transforms.

Validation and linting

After building a DAG, you can run the built-in linter to catch common issues:

from dagron import lint

warnings = lint(dag)
for w in warnings:
print(w)
# e.g., "Node 'deploy' has in-degree 1 and out-degree 0 — consider if it should be a leaf."

And validate structural integrity at any time:

dag.validate()  # Raises if the graph is malformed

See Inspecting Graphs for the full analysis and linting toolkit.

Best practices

  1. Use the builder for static graphs. It gives you cycle detection and a clean, readable construction block.

  2. Use direct mutation for dynamic graphs. When the structure depends on runtime decisions, building imperatively is simpler.

  3. Attach payloads for configuration. Keep task functions pure; put parameters in payloads.

  4. Name nodes with conventions. Use prefixes like extract_, transform_, load_ so you can use glob/regex matching later.

  5. Start small and compose. Build self-contained sub-DAGs and wire them together with compose().

API reference

For the full list of construction methods, see:

  • DAG — the core graph class and all its methods.
  • DAGBuilder — fluent builder API.
  • compose() — DAG composition.

Next steps