Skip to main content

Tracing & Profiling

When your DAG pipeline is slow, you need data — not guesswork. dagron provides two complementary observability tools:

  • Tracing records a timestamped event log for every node during execution.
  • Profiling analyses the trace to find the critical path, detect bottlenecks, and measure parallelism efficiency.

This guide shows you how to enable tracing, explore traces, export to Chrome's trace viewer, and use the profiling API to optimise your pipelines.

Enabling tracing

Pass enable_tracing=True to any executor:

import dagron

dag = (
dagron.DAG.builder()
.add_nodes(["extract", "transform_a", "transform_b", "merge", "load"])
.add_edges([
("extract", "transform_a"),
("extract", "transform_b"),
("transform_a", "merge"),
("transform_b", "merge"),
("merge", "load"),
])
.build()
)

import time

tasks = {
"extract": lambda: time.sleep(0.1) or "data",
"transform_a": lambda: time.sleep(0.3) or "a_done",
"transform_b": lambda: time.sleep(0.2) or "b_done",
"merge": lambda: time.sleep(0.05) or "merged",
"load": lambda: time.sleep(0.1) or "loaded",
}

result = dagron.DAGExecutor(
dag,
max_workers=4,
enable_tracing=True,
).execute(tasks)

Pipeline with parallel transform branches — we will trace this execution.

ExecutionTrace

When tracing is enabled, result.trace contains an ExecutionTrace object:

trace = result.trace

# Quick summary
print(trace.summary())

Output:

Execution Trace Summary
=======================
Total duration: 0.552s
Nodes executed: 5
COMPLETED: 5

Timeline:
extract [0.000s - 0.102s] (0.102s) COMPLETED
transform_a [0.102s - 0.401s] (0.299s) COMPLETED
transform_b [0.102s - 0.305s] (0.203s) COMPLETED
merge [0.401s - 0.452s] (0.051s) COMPLETED
load [0.452s - 0.552s] (0.100s) COMPLETED

Trace events

Each node's lifecycle is recorded as a series of TraceEvent objects:

for event in trace.events:
print(f"{event.timestamp:.3f}s {event.node_name:20s} {event.event_type}")

Output:

0.000s  extract               STARTED
0.102s extract COMPLETED
0.102s transform_a STARTED
0.102s transform_b STARTED
0.305s transform_b COMPLETED
0.401s transform_a COMPLETED
0.401s merge STARTED
0.452s merge COMPLETED
0.452s load STARTED
0.552s load COMPLETED

Events for a specific node

events = trace.events_for_node("transform_a")
for e in events:
print(f"{e.event_type}: {e.timestamp:.3f}s")
# STARTED: 0.102s
# COMPLETED: 0.401s

Chrome trace format

dagron can export traces in Chrome's Trace Event Format, which you can visualize in chrome://tracing or Perfetto.

Exporting

chrome_json = trace.to_chrome_trace()

with open("trace.json", "w") as f:
f.write(chrome_json)

Viewing

  1. Open Chrome and navigate to chrome://tracing
  2. Click Load and select trace.json
  3. You will see a timeline with each node as a horizontal bar

Alternatively, open Perfetto UI and drag the file onto the page.

The Chrome trace view shows:

  • Parallel lanes for concurrent tasks
  • Gap analysis between sequential tasks
  • Duration bars proportional to wall-clock time
  • Zoom and pan for exploring long traces

JSON trace format

json_str = trace.to_json()

This exports the raw trace data as JSON (dagron's own format, not Chrome's). Useful for custom analysis or storage.

Profiling with profile_execution()

The profile_execution() function takes a DAG and an execution result, and produces a ProfileReport with actionable insights:

from dagron import profile_execution

report = profile_execution(dag, result)

Critical path analysis

The critical path is the sequence of nodes that determined the total wall-clock time:

print("Critical path:")
for node in report.critical_path:
print(f" {node}")
# extract
# transform_a <-- bottleneck (longest task)
# merge
# load

Critical path highlighted in red. transform_a is the bottleneck.

The critical path tells you exactly which nodes to optimise for maximum speedup.

Bottleneck detection

print("Bottlenecks:")
for b in report.bottlenecks:
print(f" {b.node}: {b.duration_seconds:.3f}s ({b.percentage:.1f}% of total)")
# transform_a: 0.299s (54.2% of total)
# transform_b: 0.203s (36.8% of total)

Bottlenecks are nodes that consume a disproportionate share of the total execution time.

Parallelism efficiency

print(f"Parallelism efficiency: {report.parallelism_efficiency:.1%}")
# Parallelism efficiency: 85.3%

Parallelism efficiency is the ratio of the sequential sum of all task durations to the wall-clock time multiplied by the number of workers. A value of 100% means all workers were busy the entire time. Low values indicate scheduling gaps or sequential bottlenecks.

# Detailed breakdown
print(f"Sequential sum: {report.sequential_sum:.3f}s")
print(f"Wall-clock: {report.wall_clock:.3f}s")
print(f"Speedup: {report.sequential_sum / report.wall_clock:.2f}x")

Complete profiling workflow

Here is a full example that builds, executes, traces, and profiles a pipeline:

import dagron
from dagron import profile_execution
import time

# 1. Build the DAG
dag = (
dagron.DAG.builder()
.add_nodes(["fetch", "validate", "feature_a", "feature_b", "feature_c",
"train", "evaluate", "deploy"])
.add_edges([
("fetch", "validate"),
("validate", "feature_a"),
("validate", "feature_b"),
("validate", "feature_c"),
("feature_a", "train"),
("feature_b", "train"),
("feature_c", "train"),
("train", "evaluate"),
("evaluate", "deploy"),
])
.build()
)

# 2. Define tasks with realistic durations
tasks = {
"fetch": lambda: time.sleep(0.5) or "fetched",
"validate": lambda: time.sleep(0.1) or "valid",
"feature_a": lambda: time.sleep(1.0) or "features_a",
"feature_b": lambda: time.sleep(0.8) or "features_b",
"feature_c": lambda: time.sleep(0.3) or "features_c",
"train": lambda: time.sleep(2.0) or "model",
"evaluate": lambda: time.sleep(0.5) or "metrics",
"deploy": lambda: time.sleep(0.2) or "deployed",
}

# 3. Execute with tracing
result = dagron.DAGExecutor(
dag,
max_workers=4,
enable_tracing=True,
).execute(tasks)

# 4. Profile
report = profile_execution(dag, result)

# 5. Print report
print("=" * 60)
print("PROFILING REPORT")
print("=" * 60)
print(f"Total wall-clock time: {result.total_duration_seconds:.3f}s")
print(f"Parallelism efficiency: {report.parallelism_efficiency:.1%}")
print()

print("Critical path:")
for node in report.critical_path:
nr = result.node_results[node]
print(f" {node:20s} {nr.duration_seconds:.3f}s")
print()

print("Top bottlenecks:")
for b in report.bottlenecks[:3]:
print(f" {b.node:20s} {b.duration_seconds:.3f}s ({b.percentage:.1f}%)")
print()

# 6. Export trace for Chrome
with open("pipeline_trace.json", "w") as f:
f.write(result.trace.to_chrome_trace())
print("Trace exported to pipeline_trace.json")

ML pipeline with three parallel feature extraction branches.

Tracing with other executors

Tracing works with all executor types:

AsyncDAGExecutor

import asyncio

async def main():
executor = dagron.AsyncDAGExecutor(dag, enable_tracing=True)
result = await executor.execute(tasks)
print(result.trace.summary())

asyncio.run(main())

IncrementalExecutor

executor = dagron.IncrementalExecutor(dag, enable_tracing=True)
result = executor.execute(tasks, changed_nodes=["fetch"])

# Trace shows which nodes were recomputed vs reused
print(result.trace.summary())

CheckpointExecutor

executor = dagron.CheckpointExecutor(dag, checkpoint_dir="/tmp/checkpoints")
# Note: CheckpointExecutor uses tracing internally for resume support
result = executor.execute(tasks)

Interpreting traces

Identifying scheduling gaps

Look for periods where no node is running. These indicate:

  • Sequential bottlenecks — a node with high in-degree that must wait for all predecessors.
  • Under-utilisationmax_workers is too low, or the graph is too sequential.

Identifying stragglers

If one branch takes much longer than its siblings, the join node waits for the straggler. In the example above, feature_a (1.0s) is a straggler compared to feature_c (0.3s).

Measuring overhead

Compare the sequential sum to the parallel execution time:

sequential = sum(nr.duration_seconds for nr in result.node_results.values())
parallel = result.total_duration_seconds
overhead = parallel - (sequential / 4) # with 4 workers

print(f"Sequential sum: {sequential:.3f}s")
print(f"Parallel time: {parallel:.3f}s")
print(f"Overhead: {overhead:.3f}s")

Tracing in production

For production pipelines, consider these patterns:

Conditional tracing

import os

enable = os.environ.get("DAGRON_TRACING", "false").lower() == "true"
result = dagron.DAGExecutor(dag, enable_tracing=enable).execute(tasks)

Trace sampling

import random

# Trace 10% of executions
enable = random.random() < 0.1
result = dagron.DAGExecutor(dag, enable_tracing=enable).execute(tasks)

if result.trace:
with open(f"trace_{int(time.time())}.json", "w") as f:
f.write(result.trace.to_chrome_trace())

Monitoring integration

from dagron import profile_execution

result = dagron.DAGExecutor(dag, enable_tracing=True).execute(tasks)
report = profile_execution(dag, result)

# Send metrics to your monitoring system
metrics = {
"pipeline.duration": result.total_duration_seconds,
"pipeline.parallelism_efficiency": report.parallelism_efficiency,
"pipeline.critical_path_length": len(report.critical_path),
"pipeline.nodes_succeeded": result.succeeded,
"pipeline.nodes_failed": result.failed,
}
# send_to_datadog(metrics) # or Prometheus, Grafana, etc.

API reference

Class / FunctionDocs
ExecutionTraceTracing
TraceEventTracing
profile_execution()Tracing
ProfileReportTracing

Next steps