Inspecting Graphs
dagron gives you deep introspection into your DAG's structure. This guide covers every analysis tool — from basic traversals to critical-path analysis, what-if exploration, linting, and the query DSL.
Building a sample graph
We will use this graph throughout the guide:
import dagron
dag = (
dagron.DAG.builder()
.add_nodes(["raw", "clean", "features_a", "features_b", "train", "evaluate", "deploy"])
.add_edges([
("raw", "clean"),
("clean", "features_a"),
("clean", "features_b"),
("features_a", "train"),
("features_b", "train"),
("train", "evaluate"),
("evaluate", "deploy"),
])
.build()
)
ML pipeline used throughout this guide.
Basic structure
Node and edge counts
print(dag.node_count()) # 7
print(dag.edge_count()) # 7
Listing nodes
print(list(dag.nodes()))
# ['raw', 'clean', 'features_a', 'features_b', 'train', 'evaluate', 'deploy']
Roots and leaves
Roots are nodes with no incoming edges — the entry points of your pipeline. Leaves are nodes with no outgoing edges — the terminal outputs.
print(dag.roots()) # ['raw']
print(dag.leaves()) # ['deploy']
Membership checks
print(dag.has_node("train")) # True
print(dag.has_edge("clean", "features_a")) # True
print(dag.has_node("nonexistent")) # False
Neighbourhood queries
Direct neighbours
# Parents (nodes that point TO this node)
print(dag.predecessors("train")) # ['features_a', 'features_b']
# Children (nodes this node points TO)
print(dag.successors("clean")) # ['features_a', 'features_b']
Degree
print(dag.in_degree("train")) # 2
print(dag.out_degree("clean")) # 2
print(dag.in_degree("raw")) # 0 (root)
print(dag.out_degree("deploy")) # 0 (leaf)
Transitive closure queries
# All upstream nodes (recursive predecessors)
print(dag.ancestors("train"))
# ['raw', 'clean', 'features_a', 'features_b']
# All downstream nodes (recursive successors)
print(dag.descendants("clean"))
# ['features_a', 'features_b', 'train', 'evaluate', 'deploy']
Ancestors of 'train' (blue) and the target node (red).
Graph statistics
The stats() method returns a comprehensive summary:
s = dag.stats()
print(s)
# DAGStats(nodes=7, edges=7, roots=1, leaves=1, depth=5, width=2, density=0.167)
| Field | Meaning |
|---|---|
nodes | Total node count |
edges | Total edge count |
roots | Number of root nodes |
leaves | Number of leaf nodes |
depth | Length of the longest path (in edges) |
width | Maximum number of nodes at any single level |
density | Edge count / maximum possible edges |
Topological ordering
Topological sort
A flat ordering where every node appears after all its dependencies:
print(dag.topological_sort())
# ['raw', 'clean', 'features_a', 'features_b', 'train', 'evaluate', 'deploy']
Topological levels
Groups nodes that can execute in parallel — this is what the executor uses internally:
for level, nodes in enumerate(dag.topological_levels()):
print(f"Level {level}: {nodes}")
# Level 0: ['raw']
# Level 1: ['clean']
# Level 2: ['features_a', 'features_b']
# Level 3: ['train']
# Level 4: ['evaluate']
# Level 5: ['deploy']
Level 2 shows that features_a and features_b can run concurrently.
Execution plan
The execution plan is a richer version of topological levels that includes scheduling metadata:
plan = dag.execution_plan()
for step in plan:
print(step)
Critical path
The critical path is the longest dependency chain through the DAG. It determines the minimum wall-clock time, assuming unlimited parallelism.
path = dag.critical_path()
print(path)
# ['raw', 'clean', 'features_a', 'train', 'evaluate', 'deploy']
With cost hints, the critical path accounts for task durations:
costs = {
"raw": 1.0,
"clean": 2.0,
"features_a": 5.0,
"features_b": 3.0,
"train": 10.0,
"evaluate": 2.0,
"deploy": 1.0,
}
path = dag.critical_path(costs=costs)
print(path)
# ['raw', 'clean', 'features_a', 'train', 'evaluate', 'deploy']
# Total cost: 21.0
Critical path highlighted in red. Optimising these nodes reduces total pipeline time.
Understanding the critical path is key to performance tuning — see Tracing & Profiling for post-execution analysis.
Shortest and longest paths
Find the shortest or longest path between any two nodes:
print(dag.shortest_path("raw", "deploy"))
# ['raw', 'clean', 'features_a', 'train', 'evaluate', 'deploy']
print(dag.longest_path("raw", "deploy"))
# ['raw', 'clean', 'features_a', 'train', 'evaluate', 'deploy']
All paths
Enumerate every path between two nodes:
paths = dag.all_paths("clean", "train")
for p in paths:
print(p)
# ['clean', 'features_a', 'train']
# ['clean', 'features_b', 'train']
Pattern matching
Find nodes by name using glob or regex patterns:
# Glob-style
print(dag.nodes_matching_glob("features_*"))
# ['features_a', 'features_b']
# Regex
print(dag.nodes_matching_regex(r"^(train|evaluate|deploy)$"))
# ['train', 'evaluate', 'deploy']
This is especially useful in large graphs with naming conventions.
explain()
The explain() function gives a human-readable summary
of a single node's role in the graph:
from dagron import explain
info = explain(dag, "train")
print(info)
Output:
Node: train
In-degree: 2
Out-degree: 1
Predecessors: features_a, features_b
Successors: evaluate
Ancestors: raw, clean, features_a, features_b
Descendants: evaluate, deploy
Level: 3
Is root: False
Is leaf: False
On critical path: Yes
what_if()
The what_if() function lets you explore hypothetical
changes without modifying the DAG:
from dagron import what_if
report = what_if(dag, remove_nodes=["features_b"])
print(report)
Output:
What-if: remove nodes ['features_b']
Nodes removed: 1
Edges removed: 2
New roots: ['raw']
New leaves: ['deploy']
Disconnected: False
Affected downstream: ['train', 'evaluate', 'deploy']
This is invaluable for understanding the impact of removing a step or a dependency before making the change.
# What if we remove an edge instead?
report = what_if(dag, remove_edges=[("clean", "features_b")])
print(report)
lint()
The lint() function checks for common structural
issues:
from dagron import lint
warnings = lint(dag)
for w in warnings:
print(w)
Possible warnings include:
| Warning | Meaning |
|---|---|
| Isolated node | Node with no edges (in or out) |
| Single-child bottleneck | Node with high in-degree feeding a single successor |
| Redundant edge | Edge that is implied by transitivity |
| Wide fan-out | Node with many successors (may be a design smell) |
Linting is especially useful in CI pipelines to enforce graph hygiene.
query()
The query() function provides a mini DSL for selecting
nodes:
from dagron import query
# Find all root nodes
roots = query(dag, "roots")
print(roots) # ['raw']
# Find all leaves
leaves = query(dag, "leaves")
print(leaves) # ['deploy']
# Glob on node names
features = query(dag, "name:features_*")
print(features) # ['features_a', 'features_b']
# Combine with set operators
result = query(dag, "roots & name:raw*")
print(result) # ['raw']
Query syntax reference
| Expression | Meaning |
|---|---|
roots | Nodes with in-degree 0 |
leaves | Nodes with out-degree 0 |
name:pattern | Glob match on node names |
A & B | Intersection |
A | B | Union |
!A | Complement |
ancestors(node) | All ancestors of a node |
descendants(node) | All descendants of a node |
# All non-leaf nodes
non_leaves = query(dag, "!leaves")
print(non_leaves)
# ['raw', 'clean', 'features_a', 'features_b', 'train', 'evaluate']
# Ancestors of train that match a glob
query(dag, "ancestors(train) & name:feature*")
# ['features_a', 'features_b']
Reachability index
For large graphs where you repeatedly query ancestors/descendants, building a reachability index dramatically speeds up lookups:
dag.build_reachability_index()
# Now these calls use the precomputed index:
print(dag.ancestors("deploy"))
print(dag.descendants("raw"))
The index is invalidated when the graph is mutated and can be rebuilt at any time.
Dominator tree
The dominator tree reveals which nodes are mandatory bottlenecks — a node d dominates node n if every path from any root to n passes through d.
dom_tree = dag.dominator_tree()
print(dom_tree)
# {'clean': 'raw', 'features_a': 'clean', 'features_b': 'clean',
# 'train': 'clean', 'evaluate': 'train', 'deploy': 'evaluate'}
This tells you, for example, that clean dominates everything downstream of
it, making it a critical bottleneck.
Validation
At any time you can verify the DAG is structurally sound:
dag.validate() # Raises if the graph contains a cycle or is otherwise invalid
This is called automatically by the builder at .build() time, but you may
want to call it after manual mutations.
Practical example: debugging a slow pipeline
Suppose your pipeline is slower than expected. Here is a systematic inspection workflow:
import dagron
from dagron import explain, what_if, lint, query
# 1. Check graph stats
s = dag.stats()
print(f"Nodes: {s.nodes}, Depth: {s.depth}, Width: {s.width}")
# 2. Find the critical path
cp = dag.critical_path()
print(f"Critical path ({len(cp)} nodes): {' -> '.join(cp)}")
# 3. Explain the bottleneck node
explain(dag, cp[len(cp) // 2])
# 4. What if we parallelise the bottleneck?
what_if(dag, remove_nodes=[cp[len(cp) // 2]])
# 5. Lint for structural issues
for w in lint(dag):
print(f"Warning: {w}")
# 6. Query for specific patterns
heavy_nodes = query(dag, "name:train* | name:feature*")
print(f"Compute-heavy nodes: {heavy_nodes}")
API reference
| Function / Method | Docs |
|---|---|
dag.stats() | DAG |
dag.topological_sort() | DAG |
dag.topological_levels() | DAG |
dag.critical_path() | DAG |
dag.execution_plan() | DAG |
dag.shortest_path() | DAG |
dag.longest_path() | DAG |
dag.all_paths() | DAG |
dag.dominator_tree() | DAG |
explain() | Analysis |
what_if() | Analysis |
lint() | Analysis |
query() | Analysis |
Next steps
- Graph Transforms — reshape your DAG with filter, merge, reverse, and more.
- Tracing & Profiling — post-execution analysis with Chrome traces.
- Serialization — export your DAG to JSON, DOT, Mermaid, or binary.