Tracing & Profiling
When your DAG pipeline is slow, you need data — not guesswork. dagron provides two complementary observability tools:
- Tracing records a timestamped event log for every node during execution.
- Profiling analyses the trace to find the critical path, detect bottlenecks, and measure parallelism efficiency.
This guide shows you how to enable tracing, explore traces, export to Chrome's trace viewer, and use the profiling API to optimise your pipelines.
Enabling tracing
Pass enable_tracing=True to any executor:
import dagron
dag = (
dagron.DAG.builder()
.add_nodes(["extract", "transform_a", "transform_b", "merge", "load"])
.add_edges([
("extract", "transform_a"),
("extract", "transform_b"),
("transform_a", "merge"),
("transform_b", "merge"),
("merge", "load"),
])
.build()
)
import time
tasks = {
"extract": lambda: time.sleep(0.1) or "data",
"transform_a": lambda: time.sleep(0.3) or "a_done",
"transform_b": lambda: time.sleep(0.2) or "b_done",
"merge": lambda: time.sleep(0.05) or "merged",
"load": lambda: time.sleep(0.1) or "loaded",
}
result = dagron.DAGExecutor(
dag,
max_workers=4,
enable_tracing=True,
).execute(tasks)
Pipeline with parallel transform branches — we will trace this execution.
ExecutionTrace
When tracing is enabled, result.trace contains an
ExecutionTrace object:
trace = result.trace
# Quick summary
print(trace.summary())
Output:
Execution Trace Summary
=======================
Total duration: 0.552s
Nodes executed: 5
COMPLETED: 5
Timeline:
extract [0.000s - 0.102s] (0.102s) COMPLETED
transform_a [0.102s - 0.401s] (0.299s) COMPLETED
transform_b [0.102s - 0.305s] (0.203s) COMPLETED
merge [0.401s - 0.452s] (0.051s) COMPLETED
load [0.452s - 0.552s] (0.100s) COMPLETED
Trace events
Each node's lifecycle is recorded as a series of
TraceEvent objects:
for event in trace.events:
print(f"{event.timestamp:.3f}s {event.node_name:20s} {event.event_type}")
Output:
0.000s extract STARTED
0.102s extract COMPLETED
0.102s transform_a STARTED
0.102s transform_b STARTED
0.305s transform_b COMPLETED
0.401s transform_a COMPLETED
0.401s merge STARTED
0.452s merge COMPLETED
0.452s load STARTED
0.552s load COMPLETED
Events for a specific node
events = trace.events_for_node("transform_a")
for e in events:
print(f"{e.event_type}: {e.timestamp:.3f}s")
# STARTED: 0.102s
# COMPLETED: 0.401s
Chrome trace format
dagron can export traces in Chrome's
Trace Event Format,
which you can visualize in chrome://tracing or Perfetto.
Exporting
chrome_json = trace.to_chrome_trace()
with open("trace.json", "w") as f:
f.write(chrome_json)
Viewing
- Open Chrome and navigate to
chrome://tracing - Click Load and select
trace.json - You will see a timeline with each node as a horizontal bar
Alternatively, open Perfetto UI and drag the file onto the page.
The Chrome trace view shows:
- Parallel lanes for concurrent tasks
- Gap analysis between sequential tasks
- Duration bars proportional to wall-clock time
- Zoom and pan for exploring long traces
JSON trace format
json_str = trace.to_json()
This exports the raw trace data as JSON (dagron's own format, not Chrome's). Useful for custom analysis or storage.
Profiling with profile_execution()
The profile_execution() function takes a DAG and an
execution result, and produces a ProfileReport with
actionable insights:
from dagron import profile_execution
report = profile_execution(dag, result)
Critical path analysis
The critical path is the sequence of nodes that determined the total wall-clock time:
print("Critical path:")
for node in report.critical_path:
print(f" {node}")
# extract
# transform_a <-- bottleneck (longest task)
# merge
# load
Critical path highlighted in red. transform_a is the bottleneck.
The critical path tells you exactly which nodes to optimise for maximum speedup.
Bottleneck detection
print("Bottlenecks:")
for b in report.bottlenecks:
print(f" {b.node}: {b.duration_seconds:.3f}s ({b.percentage:.1f}% of total)")
# transform_a: 0.299s (54.2% of total)
# transform_b: 0.203s (36.8% of total)
Bottlenecks are nodes that consume a disproportionate share of the total execution time.
Parallelism efficiency
print(f"Parallelism efficiency: {report.parallelism_efficiency:.1%}")
# Parallelism efficiency: 85.3%
Parallelism efficiency is the ratio of the sequential sum of all task durations to the wall-clock time multiplied by the number of workers. A value of 100% means all workers were busy the entire time. Low values indicate scheduling gaps or sequential bottlenecks.
# Detailed breakdown
print(f"Sequential sum: {report.sequential_sum:.3f}s")
print(f"Wall-clock: {report.wall_clock:.3f}s")
print(f"Speedup: {report.sequential_sum / report.wall_clock:.2f}x")
Complete profiling workflow
Here is a full example that builds, executes, traces, and profiles a pipeline:
import dagron
from dagron import profile_execution
import time
# 1. Build the DAG
dag = (
dagron.DAG.builder()
.add_nodes(["fetch", "validate", "feature_a", "feature_b", "feature_c",
"train", "evaluate", "deploy"])
.add_edges([
("fetch", "validate"),
("validate", "feature_a"),
("validate", "feature_b"),
("validate", "feature_c"),
("feature_a", "train"),
("feature_b", "train"),
("feature_c", "train"),
("train", "evaluate"),
("evaluate", "deploy"),
])
.build()
)
# 2. Define tasks with realistic durations
tasks = {
"fetch": lambda: time.sleep(0.5) or "fetched",
"validate": lambda: time.sleep(0.1) or "valid",
"feature_a": lambda: time.sleep(1.0) or "features_a",
"feature_b": lambda: time.sleep(0.8) or "features_b",
"feature_c": lambda: time.sleep(0.3) or "features_c",
"train": lambda: time.sleep(2.0) or "model",
"evaluate": lambda: time.sleep(0.5) or "metrics",
"deploy": lambda: time.sleep(0.2) or "deployed",
}
# 3. Execute with tracing
result = dagron.DAGExecutor(
dag,
max_workers=4,
enable_tracing=True,
).execute(tasks)
# 4. Profile
report = profile_execution(dag, result)
# 5. Print report
print("=" * 60)
print("PROFILING REPORT")
print("=" * 60)
print(f"Total wall-clock time: {result.total_duration_seconds:.3f}s")
print(f"Parallelism efficiency: {report.parallelism_efficiency:.1%}")
print()
print("Critical path:")
for node in report.critical_path:
nr = result.node_results[node]
print(f" {node:20s} {nr.duration_seconds:.3f}s")
print()
print("Top bottlenecks:")
for b in report.bottlenecks[:3]:
print(f" {b.node:20s} {b.duration_seconds:.3f}s ({b.percentage:.1f}%)")
print()
# 6. Export trace for Chrome
with open("pipeline_trace.json", "w") as f:
f.write(result.trace.to_chrome_trace())
print("Trace exported to pipeline_trace.json")
ML pipeline with three parallel feature extraction branches.
Tracing with other executors
Tracing works with all executor types:
AsyncDAGExecutor
import asyncio
async def main():
executor = dagron.AsyncDAGExecutor(dag, enable_tracing=True)
result = await executor.execute(tasks)
print(result.trace.summary())
asyncio.run(main())
IncrementalExecutor
executor = dagron.IncrementalExecutor(dag, enable_tracing=True)
result = executor.execute(tasks, changed_nodes=["fetch"])
# Trace shows which nodes were recomputed vs reused
print(result.trace.summary())
CheckpointExecutor
executor = dagron.CheckpointExecutor(dag, checkpoint_dir="/tmp/checkpoints")
# Note: CheckpointExecutor uses tracing internally for resume support
result = executor.execute(tasks)
Interpreting traces
Identifying scheduling gaps
Look for periods where no node is running. These indicate:
- Sequential bottlenecks — a node with high in-degree that must wait for all predecessors.
- Under-utilisation —
max_workersis too low, or the graph is too sequential.
Identifying stragglers
If one branch takes much longer than its siblings, the join node waits
for the straggler. In the example above, feature_a (1.0s) is a straggler
compared to feature_c (0.3s).
Measuring overhead
Compare the sequential sum to the parallel execution time:
sequential = sum(nr.duration_seconds for nr in result.node_results.values())
parallel = result.total_duration_seconds
overhead = parallel - (sequential / 4) # with 4 workers
print(f"Sequential sum: {sequential:.3f}s")
print(f"Parallel time: {parallel:.3f}s")
print(f"Overhead: {overhead:.3f}s")
Tracing in production
For production pipelines, consider these patterns:
Conditional tracing
import os
enable = os.environ.get("DAGRON_TRACING", "false").lower() == "true"
result = dagron.DAGExecutor(dag, enable_tracing=enable).execute(tasks)
Trace sampling
import random
# Trace 10% of executions
enable = random.random() < 0.1
result = dagron.DAGExecutor(dag, enable_tracing=enable).execute(tasks)
if result.trace:
with open(f"trace_{int(time.time())}.json", "w") as f:
f.write(result.trace.to_chrome_trace())
Monitoring integration
from dagron import profile_execution
result = dagron.DAGExecutor(dag, enable_tracing=True).execute(tasks)
report = profile_execution(dag, result)
# Send metrics to your monitoring system
metrics = {
"pipeline.duration": result.total_duration_seconds,
"pipeline.parallelism_efficiency": report.parallelism_efficiency,
"pipeline.critical_path_length": len(report.critical_path),
"pipeline.nodes_succeeded": result.succeeded,
"pipeline.nodes_failed": result.failed,
}
# send_to_datadog(metrics) # or Prometheus, Grafana, etc.
API reference
| Class / Function | Docs |
|---|---|
ExecutionTrace | Tracing |
TraceEvent | Tracing |
profile_execution() | Tracing |
ProfileReport | Tracing |
Next steps
- Inspecting Graphs — pre-execution analysis and critical path estimation.
- Incremental Execution — combine tracing with incremental runs.
- Checkpointing — persist progress and resume after failures.