Tracing
The tracing module provides a structured timeline log of every event that occurs
during DAG execution. Traces capture node starts, completions, failures, gate
interactions, resource acquisitions, and cache events. You can export traces as
JSON or in Chrome Tracing format for visualization in chrome://tracing.
For a higher-level introduction, see the Tracing & Profiling guide.
from dagron.execution.tracing import ExecutionTrace, TraceEvent, TraceEventType
TraceEventType
class TraceEventType(Enum):
EXECUTION_STARTED = "execution_started"
STEP_STARTED = "step_started"
NODE_STARTED = "node_started"
NODE_COMPLETED = "node_completed"
NODE_FAILED = "node_failed"
NODE_SKIPPED = "node_skipped"
NODE_TIMED_OUT = "node_timed_out"
NODE_CANCELLED = "node_cancelled"
STEP_COMPLETED = "step_completed"
EXECUTION_COMPLETED = "execution_completed"
NODE_GATE_WAITING = "node_gate_waiting"
NODE_GATE_RESOLVED = "node_gate_resolved"
RESOURCE_ACQUIRED = "resource_acquired"
RESOURCE_RELEASED = "resource_released"
NODE_CACHE_HIT = "node_cache_hit"
NODE_CACHE_MISS = "node_cache_miss"
An enumeration of all event types that can be recorded during DAG execution. Each value corresponds to a distinct lifecycle moment.
Event categories
| Category | Events | Description |
|---|---|---|
| Execution | EXECUTION_STARTED, EXECUTION_COMPLETED | Overall execution boundaries. |
| Step | STEP_STARTED, STEP_COMPLETED | Topological level boundaries within execution. |
| Node lifecycle | NODE_STARTED, NODE_COMPLETED, NODE_FAILED, NODE_SKIPPED, NODE_TIMED_OUT, NODE_CANCELLED | Individual node state transitions. |
| Gates | NODE_GATE_WAITING, NODE_GATE_RESOLVED | Approval gate interactions. See Gates. |
| Resources | RESOURCE_ACQUIRED, RESOURCE_RELEASED | Resource pool acquisition and release. |
| Cache | NODE_CACHE_HIT, NODE_CACHE_MISS | Content-addressable cache interactions. |
# Check event type
event = trace.events[0]
if event.event_type == TraceEventType.NODE_FAILED:
print(f"Node {event.node_name} failed: {event.error}")
TraceEvent
@dataclass
class TraceEvent:
event_type: TraceEventType
timestamp: float
node_name: str | None = None
step_index: int | None = None
duration: float | None = None
error: str | None = None
metadata: dict[str, Any] = field(default_factory=dict)
A single trace event captured during execution. Each event has a type, a monotonic timestamp relative to the start of recording, and optional fields that vary by event type.
| Parameter | Type | Default | Description |
|---|---|---|---|
event_type | TraceEventType | required | The kind of event that occurred. |
timestamp | float | required | Seconds elapsed since the first event in this trace (monotonic clock). |
node_name | str | None | None | Name of the node this event relates to, if applicable. |
step_index | int | None | None | Topological level index for step events. |
duration | float | None | None | Duration in seconds for completion events. |
error | str | None | None | Error message for failure events. |
metadata | dict[str, Any] | {} | Arbitrary metadata attached by hooks or plugins. |
for event in trace.events:
if event.node_name:
print(f"[{event.timestamp:.4f}s] {event.event_type.value}: {event.node_name}")
if event.duration:
print(f" Duration: {event.duration:.4f}s")
if event.error:
print(f" Error: {event.error}")
ExecutionTrace
class ExecutionTrace:
def __init__(self) -> None: ...
A structured timeline log that collects events during DAG execution. The
executor creates an ExecutionTrace automatically when tracing is enabled.
You can also create one manually for custom recording scenarios.
The trace uses time.monotonic() internally, so all timestamps are relative
to the first recorded event and monotonically increasing.
from dagron.execution.tracing import ExecutionTrace, TraceEventType
trace = ExecutionTrace()
trace.record(TraceEventType.EXECUTION_STARTED)
trace.record(TraceEventType.NODE_STARTED, node_name="extract")
trace.record(TraceEventType.NODE_COMPLETED, node_name="extract", duration=0.45)
trace.record(TraceEventType.EXECUTION_COMPLETED)
print(len(trace.events)) # 4
Methods
ExecutionTrace.record
def record(
self,
event_type: TraceEventType,
*,
node_name: str | None = None,
step_index: int | None = None,
duration: float | None = None,
error: str | None = None,
metadata: dict[str, Any] | None = None,
) -> None
Record a trace event. The timestamp is captured automatically using
time.monotonic() relative to the first recorded event.
| Parameter | Type | Default | Description |
|---|---|---|---|
event_type | TraceEventType | required | The type of event to record. |
node_name | str | None | None | Name of the relevant node. |
step_index | int | None | None | Topological level index (for step events). |
duration | float | None | None | Duration in seconds (for completion events). |
error | str | None | None | Error message (for failure events). |
metadata | dict[str, Any] | None | None | Arbitrary metadata dict. Stored as empty dict if None. |
trace.record(
TraceEventType.NODE_FAILED,
node_name="transform",
error="ValueError: missing column 'id'",
metadata={"retry_count": 2},
)
ExecutionTrace.events
@property
def events(self) -> list[TraceEvent]
Returns a copy of all recorded events in chronological order.
Returns: list[TraceEvent] -- All events recorded so far.
for event in trace.events:
print(f"{event.event_type.value} at {event.timestamp:.4f}s")
ExecutionTrace.events_for_node
def events_for_node(self, name: str) -> list[TraceEvent]
Filter events for a specific node. Returns only events where node_name
matches the given name.
| Parameter | Type | Default | Description |
|---|---|---|---|
name | str | required | The node name to filter by. |
Returns: list[TraceEvent] -- Events associated with the named node.
extract_events = trace.events_for_node("extract")
for e in extract_events:
print(f" {e.event_type.value}: {e.timestamp:.4f}s")
# NODE_STARTED: 0.0001s
# NODE_COMPLETED: 0.4502s
ExecutionTrace.to_json
def to_json(self) -> str
Export the trace as a JSON string. Each event becomes a JSON object with
event_type, timestamp, and any non-None optional fields.
Returns: str -- Pretty-printed JSON array of event objects.
import json
json_str = trace.to_json()
events = json.loads(json_str)
print(events[0])
# {"event_type": "execution_started", "timestamp": 0.0}
ExecutionTrace.to_chrome_trace
def to_chrome_trace(self) -> str
Export the trace in Chrome Tracing format.
The output is a JSON string compatible with chrome://tracing or
Perfetto.
Node executions become Duration events (B/E pairs). Each unique node
gets its own thread ID for visual separation. Timestamps are converted
to microseconds.
Returns: str -- Chrome Tracing JSON string.
# Write the trace to a file and open in chrome://tracing
chrome_json = trace.to_chrome_trace()
with open("trace.json", "w") as f:
f.write(chrome_json)
ExecutionTrace.summary
def summary(self) -> str
Return a human-readable summary of the trace, including total event count, unique node count, and counts by outcome (completed, failed, skipped, timed out, cancelled).
Returns: str -- Multi-line summary string.
print(trace.summary())
# Execution Trace Summary
# Total events: 14
# Unique nodes: 5
# Completed: 4
# Failed: 1
# Skipped: 0
# Timed out: 0
# Cancelled: 0
# Duration: 1.2345s
Complete example
import dagron
from dagron.execution.tracing import ExecutionTrace, TraceEventType
# Build a DAG
dag = (
dagron.DAG.builder()
.add_edge("extract", "transform")
.add_edge("transform", "load")
.build()
)
# Execute with tracing enabled
executor = dagron.DAGExecutor(dag, trace=True)
tasks = {
"extract": lambda: [1, 2, 3],
"transform": lambda: [2, 4, 6],
"load": lambda: "done",
}
result = executor.execute(tasks)
# Access the trace from the result
trace = result.trace
# Inspect events
print(trace.summary())
# Find slow nodes
for event in trace.events:
if event.event_type == TraceEventType.NODE_COMPLETED and event.duration:
if event.duration > 1.0:
print(f"Slow node: {event.node_name} ({event.duration:.2f}s)")
# Export for Chrome Tracing visualization
with open("pipeline_trace.json", "w") as f:
f.write(trace.to_chrome_trace())
# Export as plain JSON for custom analysis
with open("pipeline_events.json", "w") as f:
f.write(trace.to_json())
See also
- Profiling -- post-execution performance analysis built on trace data.
- Tracing & Profiling guide -- walkthrough of tracing and profiling workflows.
- Execution --
DAGExecutorand thetraceparameter.