Graph Transforms
dagron provides a rich set of structural transformations that produce new DAGs from existing ones. Transforms are non-destructive — the original DAG is never mutated. This guide covers every built-in transform with before/after diagrams.
Transform overview
| Transform | What it does |
|---|---|
reverse() | Flip every edge |
filter() | Keep nodes matching a predicate |
merge() | Combine two DAGs into one |
collapse() | Replace a set of nodes with a single node |
transitive_reduction() | Remove redundant edges |
transitive_closure() | Add all implied edges |
subgraph() | Extract a subgraph by node set |
subgraph_by_depth() | Extract nodes within N hops |
snapshot() | Immutable frozen copy |
compose() | Namespace and wire multiple DAGs |
Sample DAG
We will use this graph for most examples:
import dagron
dag = (
dagron.DAG.builder()
.add_nodes(["a", "b", "c", "d", "e"])
.add_edges([
("a", "b"), ("a", "c"),
("b", "d"), ("c", "d"),
("d", "e"),
])
.build()
)
Original DAG used throughout this guide.
reverse()
Flipping every edge is useful when you want to reason about upstream dependencies as downstream propagation (e.g., "what gets affected if this node changes?").
rev = dag.reverse()
print(rev.roots()) # ['e'] (was a leaf)
print(rev.leaves()) # ['a'] (was a root)
print(rev.successors("e")) # ['d']
print(rev.predecessors("a")) # ['b', 'c']
Reversed DAG — edges point upstream.
Reversing is an O(V + E) operation implemented in Rust.
filter()
Keep only the nodes that satisfy a predicate. Edges between remaining nodes are preserved; edges to/from removed nodes are dropped.
# Keep only nodes whose names are NOT 'c'
filtered = dag.filter(lambda name: name != "c")
print(list(filtered.nodes())) # ['a', 'b', 'd', 'e']
print(filtered.edge_count()) # 3
After filtering out node 'c'.
Filtering with payloads
If you attached payloads, you can filter based on them:
dag2 = dagron.DAG()
dag2.add_node("gpu_train", payload={"gpu": True})
dag2.add_node("cpu_prep", payload={"gpu": False})
dag2.add_node("cpu_eval", payload={"gpu": False})
dag2.add_edge("cpu_prep", "gpu_train")
dag2.add_edge("gpu_train", "cpu_eval")
# Keep only GPU nodes
gpu_only = dag2.filter(lambda name: dag2.get_payload(name).get("gpu", False))
print(list(gpu_only.nodes())) # ['gpu_train']
merge()
Combine two DAGs into a single graph. Nodes with the same name are unified; edges from both graphs are included.
dag_a = (
dagron.DAG.builder()
.add_edge("x", "y")
.add_edge("y", "z")
.build()
)
dag_b = (
dagron.DAG.builder()
.add_edge("y", "w")
.add_edge("w", "z")
.build()
)
merged = dag_a.merge(dag_b)
print(list(merged.nodes())) # ['x', 'y', 'z', 'w']
print(merged.edge_count()) # 4
Merged DAG. Node 'y' and 'z' were shared between the two inputs.
If merging two DAGs would introduce a cycle, merge() raises a CycleError.
collapse()
Replace a set of nodes with a single representative node. All incoming edges to the set become incoming edges to the representative; all outgoing edges from the set become outgoing edges from the representative.
# Collapse the feature extraction branch into one node
collapsed = dag.collapse(
nodes=["b", "c"],
into="features",
)
print(list(collapsed.nodes())) # ['a', 'features', 'd', 'e']
print(collapsed.successors("a")) # ['features']
print(collapsed.predecessors("d")) # ['features']
After collapsing nodes b and c into 'features'.
Collapsing is useful for:
- Simplifying large graphs for visualization.
- Creating summary views for stakeholders.
- Reducing overhead when scheduling tightly-coupled tasks.
transitive_reduction()
Remove edges that are implied by other paths. The transitive reduction has the same reachability as the original graph but with the minimum number of edges.
# Add a redundant shortcut edge
dag_with_shortcut = (
dagron.DAG.builder()
.add_edge("a", "b")
.add_edge("b", "c")
.add_edge("a", "c") # redundant — a->b->c already implies a can reach c
.build()
)
reduced = dag_with_shortcut.transitive_reduction()
print(reduced.edge_count()) # 2 (shortcut removed)
print(reduced.has_edge("a", "c")) # False
print(reduced.has_edge("a", "b")) # True
print(reduced.has_edge("b", "c")) # True
Before (with shortcut)
Dashed edge is redundant.
After (transitive reduction)
Minimum edges preserving reachability.
This is especially useful for cleaning up graphs generated from broad dependency specifications.
transitive_closure()
The opposite of reduction: add an edge for every pair of nodes (u, v) where v is reachable from u.
closure = dag.transitive_closure()
# a can reach e (a->b->d->e), so there is now a direct edge:
print(closure.has_edge("a", "e")) # True
print(closure.has_edge("a", "d")) # True
print(closure.has_edge("b", "e")) # True
print(f"Original edges: {dag.edge_count()}") # 5
print(f"Closure edges: {closure.edge_count()}") # 10
The transitive closure is useful for pre-computing reachability queries.
subgraph()
Extract a subgraph containing only the specified nodes and the edges between them:
sub = dag.subgraph(["a", "b", "d"])
print(list(sub.nodes())) # ['a', 'b', 'd']
print(sub.edge_count()) # 2 (a->b, b->d)
Subgraph of nodes a, b, d.
Extracting ancestors or descendants
A common pattern is extracting the full upstream or downstream of a node:
# Everything upstream of 'train' (including train itself)
upstream_nodes = set(dag.ancestors("d")) | {"d"}
upstream = dag.subgraph(list(upstream_nodes))
print(list(upstream.nodes())) # ['a', 'b', 'c', 'd']
subgraph_by_depth()
Extract nodes within a certain number of hops from a starting node:
# All nodes within 1 hop of 'd'
nearby = dag.subgraph_by_depth("d", depth=1)
print(list(nearby.nodes())) # ['b', 'c', 'd', 'e']
Subgraph within 1 hop of node 'd' (both directions).
# Within 2 hops
wider = dag.subgraph_by_depth("d", depth=2)
print(list(wider.nodes())) # ['a', 'b', 'c', 'd', 'e']
snapshot()
Create an immutable, frozen copy of the DAG:
snap = dag.snapshot()
Snapshots are useful for:
- Recording the state of a graph before mutations.
- Passing a read-only view to analysis functions.
- Implementing undo/redo.
See Serialization for persisting snapshots to disk.
Diffing snapshots
Compare two snapshots to see what changed:
snap1 = dag.snapshot()
# Mutate the original
dag.add_node("f")
dag.add_edge("e", "f")
snap2 = dag.snapshot()
diff = dag.diff(snap1, snap2)
print(diff)
# DagDiff(added_nodes=['f'], removed_nodes=[], added_edges=[('e', 'f')], removed_edges=[])
compose()
The compose() function wires multiple DAGs together under
namespaces:
etl = (
dagron.DAG.builder()
.add_edge("extract", "transform")
.add_edge("transform", "load")
.build()
)
ml = (
dagron.DAG.builder()
.add_edge("train", "evaluate")
.build()
)
combined = dagron.compose(
dags={"etl": etl, "ml": ml},
connections=[("etl/load", "ml/train")],
)
print(list(combined.nodes()))
# ['etl/extract', 'etl/transform', 'etl/load', 'ml/train', 'ml/evaluate']
print(combined.successors("etl/load")) # ['ml/train']
Two sub-DAGs composed into a single pipeline.
Composing many DAGs
ingestion = dagron.DAG.builder().add_edge("fetch", "validate").build()
processing = dagron.DAG.builder().add_edge("clean", "aggregate").build()
reporting = dagron.DAG.builder().add_edge("render", "email").build()
full_pipeline = dagron.compose(
dags={
"ingest": ingestion,
"process": processing,
"report": reporting,
},
connections=[
("ingest/validate", "process/clean"),
("process/aggregate", "report/render"),
],
)
for level, nodes in enumerate(full_pipeline.topological_levels()):
print(f"Level {level}: {nodes}")
# Level 0: ['ingest/fetch']
# Level 1: ['ingest/validate']
# Level 2: ['process/clean']
# Level 3: ['process/aggregate']
# Level 4: ['report/render']
# Level 5: ['report/email']
Three composed DAGs forming a full data pipeline.
Chaining transforms
Transforms return new DAG instances, so you can chain them:
result = (
dag
.filter(lambda n: n != "c")
.transitive_reduction()
.reverse()
)
print(list(result.nodes())) # ['e', 'd', 'b', 'a']
Since each transform produces a new DAG, the original is never modified.
Partitioning
dagron includes two partitioning strategies for splitting a DAG into independent sub-DAGs:
Level-based partitioning
Splits the DAG at topological level boundaries:
partitions = dag.partition_level_based(num_partitions=2)
for i, part in enumerate(partitions):
print(f"Partition {i}: {list(part.nodes())}")
Balanced partitioning
Tries to balance the number of nodes across partitions:
partitions = dag.partition_balanced(num_partitions=3)
for i, part in enumerate(partitions):
print(f"Partition {i}: {list(part.nodes())}")
Partitioning is useful for distributed execution where each partition is sent to a different worker.
Practical example: simplifying a graph for stakeholders
import dagron
# A complex internal pipeline
pipeline = (
dagron.DAG.builder()
.add_nodes([
"fetch_api", "fetch_db", "fetch_s3",
"validate_api", "validate_db", "validate_s3",
"merge_sources", "feature_eng", "train_xgb",
"train_nn", "ensemble", "evaluate", "deploy"
])
.add_edges([
("fetch_api", "validate_api"), ("fetch_db", "validate_db"),
("fetch_s3", "validate_s3"),
("validate_api", "merge_sources"), ("validate_db", "merge_sources"),
("validate_s3", "merge_sources"),
("merge_sources", "feature_eng"),
("feature_eng", "train_xgb"), ("feature_eng", "train_nn"),
("train_xgb", "ensemble"), ("train_nn", "ensemble"),
("ensemble", "evaluate"), ("evaluate", "deploy"),
])
.build()
)
# Collapse ingestion into one node for the executive summary
simplified = pipeline.collapse(
nodes=["fetch_api", "fetch_db", "fetch_s3",
"validate_api", "validate_db", "validate_s3",
"merge_sources"],
into="data_ingestion",
)
simplified = simplified.collapse(
nodes=["train_xgb", "train_nn", "ensemble"],
into="model_training",
)
print(list(simplified.nodes()))
# ['data_ingestion', 'feature_eng', 'model_training', 'evaluate', 'deploy']
print(simplified.to_mermaid())
Simplified stakeholder view after collapsing internal details.
API reference
| Method | Docs |
|---|---|
dag.reverse() | DAG |
dag.filter() | DAG |
dag.merge() | DAG |
dag.collapse() | DAG |
dag.transitive_reduction() | DAG |
dag.transitive_closure() | DAG |
dag.subgraph() | DAG |
dag.subgraph_by_depth() | DAG |
dag.snapshot() | DAG |
dag.diff() | DAG |
dagron.compose() | DAG |
dag.partition_level_based() | DAG |
dag.partition_balanced() | DAG |
Next steps
- Serialization — persist DAGs to JSON, binary, DOT, and Mermaid.
- Inspecting Graphs — analyze structure, critical paths, and queries.
- Incremental Execution — use transforms to understand dirty propagation.