Skip to main content

Inspecting Graphs

dagron gives you deep introspection into your DAG's structure. This guide covers every analysis tool — from basic traversals to critical-path analysis, what-if exploration, linting, and the query DSL.

Building a sample graph

We will use this graph throughout the guide:

import dagron

dag = (
dagron.DAG.builder()
.add_nodes(["raw", "clean", "features_a", "features_b", "train", "evaluate", "deploy"])
.add_edges([
("raw", "clean"),
("clean", "features_a"),
("clean", "features_b"),
("features_a", "train"),
("features_b", "train"),
("train", "evaluate"),
("evaluate", "deploy"),
])
.build()
)

ML pipeline used throughout this guide.

Basic structure

Node and edge counts

print(dag.node_count())  # 7
print(dag.edge_count()) # 7

Listing nodes

print(list(dag.nodes()))
# ['raw', 'clean', 'features_a', 'features_b', 'train', 'evaluate', 'deploy']

Roots and leaves

Roots are nodes with no incoming edges — the entry points of your pipeline. Leaves are nodes with no outgoing edges — the terminal outputs.

print(dag.roots())   # ['raw']
print(dag.leaves()) # ['deploy']

Membership checks

print(dag.has_node("train"))           # True
print(dag.has_edge("clean", "features_a")) # True
print(dag.has_node("nonexistent")) # False

Neighbourhood queries

Direct neighbours

# Parents (nodes that point TO this node)
print(dag.predecessors("train")) # ['features_a', 'features_b']

# Children (nodes this node points TO)
print(dag.successors("clean")) # ['features_a', 'features_b']

Degree

print(dag.in_degree("train"))   # 2
print(dag.out_degree("clean")) # 2
print(dag.in_degree("raw")) # 0 (root)
print(dag.out_degree("deploy")) # 0 (leaf)

Transitive closure queries

# All upstream nodes (recursive predecessors)
print(dag.ancestors("train"))
# ['raw', 'clean', 'features_a', 'features_b']

# All downstream nodes (recursive successors)
print(dag.descendants("clean"))
# ['features_a', 'features_b', 'train', 'evaluate', 'deploy']

Ancestors of 'train' (blue) and the target node (red).

Graph statistics

The stats() method returns a comprehensive summary:

s = dag.stats()
print(s)
# DAGStats(nodes=7, edges=7, roots=1, leaves=1, depth=5, width=2, density=0.167)
FieldMeaning
nodesTotal node count
edgesTotal edge count
rootsNumber of root nodes
leavesNumber of leaf nodes
depthLength of the longest path (in edges)
widthMaximum number of nodes at any single level
densityEdge count / maximum possible edges

Topological ordering

Topological sort

A flat ordering where every node appears after all its dependencies:

print(dag.topological_sort())
# ['raw', 'clean', 'features_a', 'features_b', 'train', 'evaluate', 'deploy']

Topological levels

Groups nodes that can execute in parallel — this is what the executor uses internally:

for level, nodes in enumerate(dag.topological_levels()):
print(f"Level {level}: {nodes}")
# Level 0: ['raw']
# Level 1: ['clean']
# Level 2: ['features_a', 'features_b']
# Level 3: ['train']
# Level 4: ['evaluate']
# Level 5: ['deploy']

Level 2 shows that features_a and features_b can run concurrently.

Execution plan

The execution plan is a richer version of topological levels that includes scheduling metadata:

plan = dag.execution_plan()
for step in plan:
print(step)

Critical path

The critical path is the longest dependency chain through the DAG. It determines the minimum wall-clock time, assuming unlimited parallelism.

path = dag.critical_path()
print(path)
# ['raw', 'clean', 'features_a', 'train', 'evaluate', 'deploy']

With cost hints, the critical path accounts for task durations:

costs = {
"raw": 1.0,
"clean": 2.0,
"features_a": 5.0,
"features_b": 3.0,
"train": 10.0,
"evaluate": 2.0,
"deploy": 1.0,
}

path = dag.critical_path(costs=costs)
print(path)
# ['raw', 'clean', 'features_a', 'train', 'evaluate', 'deploy']
# Total cost: 21.0

Critical path highlighted in red. Optimising these nodes reduces total pipeline time.

Understanding the critical path is key to performance tuning — see Tracing & Profiling for post-execution analysis.

Shortest and longest paths

Find the shortest or longest path between any two nodes:

print(dag.shortest_path("raw", "deploy"))
# ['raw', 'clean', 'features_a', 'train', 'evaluate', 'deploy']

print(dag.longest_path("raw", "deploy"))
# ['raw', 'clean', 'features_a', 'train', 'evaluate', 'deploy']

All paths

Enumerate every path between two nodes:

paths = dag.all_paths("clean", "train")
for p in paths:
print(p)
# ['clean', 'features_a', 'train']
# ['clean', 'features_b', 'train']

Pattern matching

Find nodes by name using glob or regex patterns:

# Glob-style
print(dag.nodes_matching_glob("features_*"))
# ['features_a', 'features_b']

# Regex
print(dag.nodes_matching_regex(r"^(train|evaluate|deploy)$"))
# ['train', 'evaluate', 'deploy']

This is especially useful in large graphs with naming conventions.

explain()

The explain() function gives a human-readable summary of a single node's role in the graph:

from dagron import explain

info = explain(dag, "train")
print(info)

Output:

Node: train
In-degree: 2
Out-degree: 1
Predecessors: features_a, features_b
Successors: evaluate
Ancestors: raw, clean, features_a, features_b
Descendants: evaluate, deploy
Level: 3
Is root: False
Is leaf: False
On critical path: Yes

what_if()

The what_if() function lets you explore hypothetical changes without modifying the DAG:

from dagron import what_if

report = what_if(dag, remove_nodes=["features_b"])
print(report)

Output:

What-if: remove nodes ['features_b']
Nodes removed: 1
Edges removed: 2
New roots: ['raw']
New leaves: ['deploy']
Disconnected: False
Affected downstream: ['train', 'evaluate', 'deploy']

This is invaluable for understanding the impact of removing a step or a dependency before making the change.

# What if we remove an edge instead?
report = what_if(dag, remove_edges=[("clean", "features_b")])
print(report)

lint()

The lint() function checks for common structural issues:

from dagron import lint

warnings = lint(dag)
for w in warnings:
print(w)

Possible warnings include:

WarningMeaning
Isolated nodeNode with no edges (in or out)
Single-child bottleneckNode with high in-degree feeding a single successor
Redundant edgeEdge that is implied by transitivity
Wide fan-outNode with many successors (may be a design smell)

Linting is especially useful in CI pipelines to enforce graph hygiene.

query()

The query() function provides a mini DSL for selecting nodes:

from dagron import query

# Find all root nodes
roots = query(dag, "roots")
print(roots) # ['raw']

# Find all leaves
leaves = query(dag, "leaves")
print(leaves) # ['deploy']

# Glob on node names
features = query(dag, "name:features_*")
print(features) # ['features_a', 'features_b']

# Combine with set operators
result = query(dag, "roots & name:raw*")
print(result) # ['raw']

Query syntax reference

ExpressionMeaning
rootsNodes with in-degree 0
leavesNodes with out-degree 0
name:patternGlob match on node names
A & BIntersection
A | BUnion
!AComplement
ancestors(node)All ancestors of a node
descendants(node)All descendants of a node
# All non-leaf nodes
non_leaves = query(dag, "!leaves")
print(non_leaves)
# ['raw', 'clean', 'features_a', 'features_b', 'train', 'evaluate']

# Ancestors of train that match a glob
query(dag, "ancestors(train) & name:feature*")
# ['features_a', 'features_b']

Reachability index

For large graphs where you repeatedly query ancestors/descendants, building a reachability index dramatically speeds up lookups:

dag.build_reachability_index()

# Now these calls use the precomputed index:
print(dag.ancestors("deploy"))
print(dag.descendants("raw"))

The index is invalidated when the graph is mutated and can be rebuilt at any time.

Dominator tree

The dominator tree reveals which nodes are mandatory bottlenecks — a node d dominates node n if every path from any root to n passes through d.

dom_tree = dag.dominator_tree()
print(dom_tree)
# {'clean': 'raw', 'features_a': 'clean', 'features_b': 'clean',
# 'train': 'clean', 'evaluate': 'train', 'deploy': 'evaluate'}

This tells you, for example, that clean dominates everything downstream of it, making it a critical bottleneck.

Validation

At any time you can verify the DAG is structurally sound:

dag.validate()  # Raises if the graph contains a cycle or is otherwise invalid

This is called automatically by the builder at .build() time, but you may want to call it after manual mutations.

Practical example: debugging a slow pipeline

Suppose your pipeline is slower than expected. Here is a systematic inspection workflow:

import dagron
from dagron import explain, what_if, lint, query

# 1. Check graph stats
s = dag.stats()
print(f"Nodes: {s.nodes}, Depth: {s.depth}, Width: {s.width}")

# 2. Find the critical path
cp = dag.critical_path()
print(f"Critical path ({len(cp)} nodes): {' -> '.join(cp)}")

# 3. Explain the bottleneck node
explain(dag, cp[len(cp) // 2])

# 4. What if we parallelise the bottleneck?
what_if(dag, remove_nodes=[cp[len(cp) // 2]])

# 5. Lint for structural issues
for w in lint(dag):
print(f"Warning: {w}")

# 6. Query for specific patterns
heavy_nodes = query(dag, "name:train* | name:feature*")
print(f"Compute-heavy nodes: {heavy_nodes}")

API reference

Function / MethodDocs
dag.stats()DAG
dag.topological_sort()DAG
dag.topological_levels()DAG
dag.critical_path()DAG
dag.execution_plan()DAG
dag.shortest_path()DAG
dag.longest_path()DAG
dag.all_paths()DAG
dag.dominator_tree()DAG
explain()Analysis
what_if()Analysis
lint()Analysis
query()Analysis

Next steps