Skip to main content

Graph Transforms

dagron provides a rich set of structural transformations that produce new DAGs from existing ones. Transforms are non-destructive — the original DAG is never mutated. This guide covers every built-in transform with before/after diagrams.

Transform overview

TransformWhat it does
reverse()Flip every edge
filter()Keep nodes matching a predicate
merge()Combine two DAGs into one
collapse()Replace a set of nodes with a single node
transitive_reduction()Remove redundant edges
transitive_closure()Add all implied edges
subgraph()Extract a subgraph by node set
subgraph_by_depth()Extract nodes within N hops
snapshot()Immutable frozen copy
compose()Namespace and wire multiple DAGs

Sample DAG

We will use this graph for most examples:

import dagron

dag = (
dagron.DAG.builder()
.add_nodes(["a", "b", "c", "d", "e"])
.add_edges([
("a", "b"), ("a", "c"),
("b", "d"), ("c", "d"),
("d", "e"),
])
.build()
)

Original DAG used throughout this guide.

reverse()

Flipping every edge is useful when you want to reason about upstream dependencies as downstream propagation (e.g., "what gets affected if this node changes?").

rev = dag.reverse()

print(rev.roots()) # ['e'] (was a leaf)
print(rev.leaves()) # ['a'] (was a root)
print(rev.successors("e")) # ['d']
print(rev.predecessors("a")) # ['b', 'c']

Reversed DAG — edges point upstream.

Reversing is an O(V + E) operation implemented in Rust.

filter()

Keep only the nodes that satisfy a predicate. Edges between remaining nodes are preserved; edges to/from removed nodes are dropped.

# Keep only nodes whose names are NOT 'c'
filtered = dag.filter(lambda name: name != "c")

print(list(filtered.nodes())) # ['a', 'b', 'd', 'e']
print(filtered.edge_count()) # 3

After filtering out node 'c'.

Filtering with payloads

If you attached payloads, you can filter based on them:

dag2 = dagron.DAG()
dag2.add_node("gpu_train", payload={"gpu": True})
dag2.add_node("cpu_prep", payload={"gpu": False})
dag2.add_node("cpu_eval", payload={"gpu": False})
dag2.add_edge("cpu_prep", "gpu_train")
dag2.add_edge("gpu_train", "cpu_eval")

# Keep only GPU nodes
gpu_only = dag2.filter(lambda name: dag2.get_payload(name).get("gpu", False))
print(list(gpu_only.nodes())) # ['gpu_train']

merge()

Combine two DAGs into a single graph. Nodes with the same name are unified; edges from both graphs are included.

dag_a = (
dagron.DAG.builder()
.add_edge("x", "y")
.add_edge("y", "z")
.build()
)

dag_b = (
dagron.DAG.builder()
.add_edge("y", "w")
.add_edge("w", "z")
.build()
)

merged = dag_a.merge(dag_b)
print(list(merged.nodes())) # ['x', 'y', 'z', 'w']
print(merged.edge_count()) # 4

Merged DAG. Node 'y' and 'z' were shared between the two inputs.

caution

If merging two DAGs would introduce a cycle, merge() raises a CycleError.

collapse()

Replace a set of nodes with a single representative node. All incoming edges to the set become incoming edges to the representative; all outgoing edges from the set become outgoing edges from the representative.

# Collapse the feature extraction branch into one node
collapsed = dag.collapse(
nodes=["b", "c"],
into="features",
)

print(list(collapsed.nodes())) # ['a', 'features', 'd', 'e']
print(collapsed.successors("a")) # ['features']
print(collapsed.predecessors("d")) # ['features']

After collapsing nodes b and c into 'features'.

Collapsing is useful for:

  • Simplifying large graphs for visualization.
  • Creating summary views for stakeholders.
  • Reducing overhead when scheduling tightly-coupled tasks.

transitive_reduction()

Remove edges that are implied by other paths. The transitive reduction has the same reachability as the original graph but with the minimum number of edges.

# Add a redundant shortcut edge
dag_with_shortcut = (
dagron.DAG.builder()
.add_edge("a", "b")
.add_edge("b", "c")
.add_edge("a", "c") # redundant — a->b->c already implies a can reach c
.build()
)

reduced = dag_with_shortcut.transitive_reduction()
print(reduced.edge_count()) # 2 (shortcut removed)
print(reduced.has_edge("a", "c")) # False
print(reduced.has_edge("a", "b")) # True
print(reduced.has_edge("b", "c")) # True

Before (with shortcut)

Dashed edge is redundant.

After (transitive reduction)

Minimum edges preserving reachability.

This is especially useful for cleaning up graphs generated from broad dependency specifications.

transitive_closure()

The opposite of reduction: add an edge for every pair of nodes (u, v) where v is reachable from u.

closure = dag.transitive_closure()

# a can reach e (a->b->d->e), so there is now a direct edge:
print(closure.has_edge("a", "e")) # True
print(closure.has_edge("a", "d")) # True
print(closure.has_edge("b", "e")) # True

print(f"Original edges: {dag.edge_count()}") # 5
print(f"Closure edges: {closure.edge_count()}") # 10

The transitive closure is useful for pre-computing reachability queries.

subgraph()

Extract a subgraph containing only the specified nodes and the edges between them:

sub = dag.subgraph(["a", "b", "d"])

print(list(sub.nodes())) # ['a', 'b', 'd']
print(sub.edge_count()) # 2 (a->b, b->d)

Subgraph of nodes a, b, d.

Extracting ancestors or descendants

A common pattern is extracting the full upstream or downstream of a node:

# Everything upstream of 'train' (including train itself)
upstream_nodes = set(dag.ancestors("d")) | {"d"}
upstream = dag.subgraph(list(upstream_nodes))
print(list(upstream.nodes())) # ['a', 'b', 'c', 'd']

subgraph_by_depth()

Extract nodes within a certain number of hops from a starting node:

# All nodes within 1 hop of 'd'
nearby = dag.subgraph_by_depth("d", depth=1)
print(list(nearby.nodes())) # ['b', 'c', 'd', 'e']

Subgraph within 1 hop of node 'd' (both directions).

# Within 2 hops
wider = dag.subgraph_by_depth("d", depth=2)
print(list(wider.nodes())) # ['a', 'b', 'c', 'd', 'e']

snapshot()

Create an immutable, frozen copy of the DAG:

snap = dag.snapshot()

Snapshots are useful for:

  • Recording the state of a graph before mutations.
  • Passing a read-only view to analysis functions.
  • Implementing undo/redo.

See Serialization for persisting snapshots to disk.

Diffing snapshots

Compare two snapshots to see what changed:

snap1 = dag.snapshot()

# Mutate the original
dag.add_node("f")
dag.add_edge("e", "f")

snap2 = dag.snapshot()

diff = dag.diff(snap1, snap2)
print(diff)
# DagDiff(added_nodes=['f'], removed_nodes=[], added_edges=[('e', 'f')], removed_edges=[])

compose()

The compose() function wires multiple DAGs together under namespaces:

etl = (
dagron.DAG.builder()
.add_edge("extract", "transform")
.add_edge("transform", "load")
.build()
)

ml = (
dagron.DAG.builder()
.add_edge("train", "evaluate")
.build()
)

combined = dagron.compose(
dags={"etl": etl, "ml": ml},
connections=[("etl/load", "ml/train")],
)

print(list(combined.nodes()))
# ['etl/extract', 'etl/transform', 'etl/load', 'ml/train', 'ml/evaluate']

print(combined.successors("etl/load")) # ['ml/train']

Two sub-DAGs composed into a single pipeline.

Composing many DAGs

ingestion = dagron.DAG.builder().add_edge("fetch", "validate").build()
processing = dagron.DAG.builder().add_edge("clean", "aggregate").build()
reporting = dagron.DAG.builder().add_edge("render", "email").build()

full_pipeline = dagron.compose(
dags={
"ingest": ingestion,
"process": processing,
"report": reporting,
},
connections=[
("ingest/validate", "process/clean"),
("process/aggregate", "report/render"),
],
)

for level, nodes in enumerate(full_pipeline.topological_levels()):
print(f"Level {level}: {nodes}")
# Level 0: ['ingest/fetch']
# Level 1: ['ingest/validate']
# Level 2: ['process/clean']
# Level 3: ['process/aggregate']
# Level 4: ['report/render']
# Level 5: ['report/email']

Three composed DAGs forming a full data pipeline.

Chaining transforms

Transforms return new DAG instances, so you can chain them:

result = (
dag
.filter(lambda n: n != "c")
.transitive_reduction()
.reverse()
)

print(list(result.nodes())) # ['e', 'd', 'b', 'a']

Since each transform produces a new DAG, the original is never modified.

Partitioning

dagron includes two partitioning strategies for splitting a DAG into independent sub-DAGs:

Level-based partitioning

Splits the DAG at topological level boundaries:

partitions = dag.partition_level_based(num_partitions=2)
for i, part in enumerate(partitions):
print(f"Partition {i}: {list(part.nodes())}")

Balanced partitioning

Tries to balance the number of nodes across partitions:

partitions = dag.partition_balanced(num_partitions=3)
for i, part in enumerate(partitions):
print(f"Partition {i}: {list(part.nodes())}")

Partitioning is useful for distributed execution where each partition is sent to a different worker.

Practical example: simplifying a graph for stakeholders

import dagron

# A complex internal pipeline
pipeline = (
dagron.DAG.builder()
.add_nodes([
"fetch_api", "fetch_db", "fetch_s3",
"validate_api", "validate_db", "validate_s3",
"merge_sources", "feature_eng", "train_xgb",
"train_nn", "ensemble", "evaluate", "deploy"
])
.add_edges([
("fetch_api", "validate_api"), ("fetch_db", "validate_db"),
("fetch_s3", "validate_s3"),
("validate_api", "merge_sources"), ("validate_db", "merge_sources"),
("validate_s3", "merge_sources"),
("merge_sources", "feature_eng"),
("feature_eng", "train_xgb"), ("feature_eng", "train_nn"),
("train_xgb", "ensemble"), ("train_nn", "ensemble"),
("ensemble", "evaluate"), ("evaluate", "deploy"),
])
.build()
)

# Collapse ingestion into one node for the executive summary
simplified = pipeline.collapse(
nodes=["fetch_api", "fetch_db", "fetch_s3",
"validate_api", "validate_db", "validate_s3",
"merge_sources"],
into="data_ingestion",
)

simplified = simplified.collapse(
nodes=["train_xgb", "train_nn", "ensemble"],
into="model_training",
)

print(list(simplified.nodes()))
# ['data_ingestion', 'feature_eng', 'model_training', 'evaluate', 'deploy']

print(simplified.to_mermaid())

Simplified stakeholder view after collapsing internal details.

API reference

MethodDocs
dag.reverse()DAG
dag.filter()DAG
dag.merge()DAG
dag.collapse()DAG
dag.transitive_reduction()DAG
dag.transitive_closure()DAG
dag.subgraph()DAG
dag.subgraph_by_depth()DAG
dag.snapshot()DAG
dag.diff()DAG
dagron.compose()DAG
dag.partition_level_based()DAG
dag.partition_balanced()DAG

Next steps