Skip to main content

Tracing

The tracing module provides a structured timeline log of every event that occurs during DAG execution. Traces capture node starts, completions, failures, gate interactions, resource acquisitions, and cache events. You can export traces as JSON or in Chrome Tracing format for visualization in chrome://tracing.

For a higher-level introduction, see the Tracing & Profiling guide.

from dagron.execution.tracing import ExecutionTrace, TraceEvent, TraceEventType

TraceEventType

TraceEventType
class TraceEventType(Enum):
EXECUTION_STARTED = "execution_started"
STEP_STARTED = "step_started"
NODE_STARTED = "node_started"
NODE_COMPLETED = "node_completed"
NODE_FAILED = "node_failed"
NODE_SKIPPED = "node_skipped"
NODE_TIMED_OUT = "node_timed_out"
NODE_CANCELLED = "node_cancelled"
STEP_COMPLETED = "step_completed"
EXECUTION_COMPLETED = "execution_completed"
NODE_GATE_WAITING = "node_gate_waiting"
NODE_GATE_RESOLVED = "node_gate_resolved"
RESOURCE_ACQUIRED = "resource_acquired"
RESOURCE_RELEASED = "resource_released"
NODE_CACHE_HIT = "node_cache_hit"
NODE_CACHE_MISS = "node_cache_miss"

An enumeration of all event types that can be recorded during DAG execution. Each value corresponds to a distinct lifecycle moment.

Event categories

CategoryEventsDescription
ExecutionEXECUTION_STARTED, EXECUTION_COMPLETEDOverall execution boundaries.
StepSTEP_STARTED, STEP_COMPLETEDTopological level boundaries within execution.
Node lifecycleNODE_STARTED, NODE_COMPLETED, NODE_FAILED, NODE_SKIPPED, NODE_TIMED_OUT, NODE_CANCELLEDIndividual node state transitions.
GatesNODE_GATE_WAITING, NODE_GATE_RESOLVEDApproval gate interactions. See Gates.
ResourcesRESOURCE_ACQUIRED, RESOURCE_RELEASEDResource pool acquisition and release.
CacheNODE_CACHE_HIT, NODE_CACHE_MISSContent-addressable cache interactions.
# Check event type
event = trace.events[0]
if event.event_type == TraceEventType.NODE_FAILED:
print(f"Node {event.node_name} failed: {event.error}")

TraceEvent

TraceEvent
@dataclass
class TraceEvent:
event_type: TraceEventType
timestamp: float
node_name: str | None = None
step_index: int | None = None
duration: float | None = None
error: str | None = None
metadata: dict[str, Any] = field(default_factory=dict)

A single trace event captured during execution. Each event has a type, a monotonic timestamp relative to the start of recording, and optional fields that vary by event type.

ParameterTypeDefaultDescription
event_typeTraceEventTyperequiredThe kind of event that occurred.
timestampfloatrequiredSeconds elapsed since the first event in this trace (monotonic clock).
node_namestr | NoneNoneName of the node this event relates to, if applicable.
step_indexint | NoneNoneTopological level index for step events.
durationfloat | NoneNoneDuration in seconds for completion events.
errorstr | NoneNoneError message for failure events.
metadatadict[str, Any]{}Arbitrary metadata attached by hooks or plugins.
for event in trace.events:
if event.node_name:
print(f"[{event.timestamp:.4f}s] {event.event_type.value}: {event.node_name}")
if event.duration:
print(f" Duration: {event.duration:.4f}s")
if event.error:
print(f" Error: {event.error}")

ExecutionTrace

ExecutionTrace
class ExecutionTrace:
def __init__(self) -> None: ...

A structured timeline log that collects events during DAG execution. The executor creates an ExecutionTrace automatically when tracing is enabled. You can also create one manually for custom recording scenarios.

The trace uses time.monotonic() internally, so all timestamps are relative to the first recorded event and monotonically increasing.

from dagron.execution.tracing import ExecutionTrace, TraceEventType

trace = ExecutionTrace()
trace.record(TraceEventType.EXECUTION_STARTED)
trace.record(TraceEventType.NODE_STARTED, node_name="extract")
trace.record(TraceEventType.NODE_COMPLETED, node_name="extract", duration=0.45)
trace.record(TraceEventType.EXECUTION_COMPLETED)

print(len(trace.events)) # 4

Methods


ExecutionTrace.record

ExecutionTrace.record
def record(
self,
event_type: TraceEventType,
*,
node_name: str | None = None,
step_index: int | None = None,
duration: float | None = None,
error: str | None = None,
metadata: dict[str, Any] | None = None,
) -> None

Record a trace event. The timestamp is captured automatically using time.monotonic() relative to the first recorded event.

ParameterTypeDefaultDescription
event_typeTraceEventTyperequiredThe type of event to record.
node_namestr | NoneNoneName of the relevant node.
step_indexint | NoneNoneTopological level index (for step events).
durationfloat | NoneNoneDuration in seconds (for completion events).
errorstr | NoneNoneError message (for failure events).
metadatadict[str, Any] | NoneNoneArbitrary metadata dict. Stored as empty dict if None.
trace.record(
TraceEventType.NODE_FAILED,
node_name="transform",
error="ValueError: missing column 'id'",
metadata={"retry_count": 2},
)

ExecutionTrace.events

ExecutionTrace.events
@property
def events(self) -> list[TraceEvent]

Returns a copy of all recorded events in chronological order.

Returns: list[TraceEvent] -- All events recorded so far.

for event in trace.events:
print(f"{event.event_type.value} at {event.timestamp:.4f}s")

ExecutionTrace.events_for_node

ExecutionTrace.events_for_node
def events_for_node(self, name: str) -> list[TraceEvent]

Filter events for a specific node. Returns only events where node_name matches the given name.

ParameterTypeDefaultDescription
namestrrequiredThe node name to filter by.

Returns: list[TraceEvent] -- Events associated with the named node.

extract_events = trace.events_for_node("extract")
for e in extract_events:
print(f" {e.event_type.value}: {e.timestamp:.4f}s")
# NODE_STARTED: 0.0001s
# NODE_COMPLETED: 0.4502s

ExecutionTrace.to_json

ExecutionTrace.to_json
def to_json(self) -> str

Export the trace as a JSON string. Each event becomes a JSON object with event_type, timestamp, and any non-None optional fields.

Returns: str -- Pretty-printed JSON array of event objects.

import json

json_str = trace.to_json()
events = json.loads(json_str)
print(events[0])
# {"event_type": "execution_started", "timestamp": 0.0}

ExecutionTrace.to_chrome_trace

ExecutionTrace.to_chrome_trace
def to_chrome_trace(self) -> str

Export the trace in Chrome Tracing format. The output is a JSON string compatible with chrome://tracing or Perfetto.

Node executions become Duration events (B/E pairs). Each unique node gets its own thread ID for visual separation. Timestamps are converted to microseconds.

Returns: str -- Chrome Tracing JSON string.

# Write the trace to a file and open in chrome://tracing
chrome_json = trace.to_chrome_trace()
with open("trace.json", "w") as f:
f.write(chrome_json)

ExecutionTrace.summary

ExecutionTrace.summary
def summary(self) -> str

Return a human-readable summary of the trace, including total event count, unique node count, and counts by outcome (completed, failed, skipped, timed out, cancelled).

Returns: str -- Multi-line summary string.

print(trace.summary())
# Execution Trace Summary
# Total events: 14
# Unique nodes: 5
# Completed: 4
# Failed: 1
# Skipped: 0
# Timed out: 0
# Cancelled: 0
# Duration: 1.2345s

Complete example

import dagron
from dagron.execution.tracing import ExecutionTrace, TraceEventType

# Build a DAG
dag = (
dagron.DAG.builder()
.add_edge("extract", "transform")
.add_edge("transform", "load")
.build()
)

# Execute with tracing enabled
executor = dagron.DAGExecutor(dag, trace=True)
tasks = {
"extract": lambda: [1, 2, 3],
"transform": lambda: [2, 4, 6],
"load": lambda: "done",
}
result = executor.execute(tasks)

# Access the trace from the result
trace = result.trace

# Inspect events
print(trace.summary())

# Find slow nodes
for event in trace.events:
if event.event_type == TraceEventType.NODE_COMPLETED and event.duration:
if event.duration > 1.0:
print(f"Slow node: {event.node_name} ({event.duration:.2f}s)")

# Export for Chrome Tracing visualization
with open("pipeline_trace.json", "w") as f:
f.write(trace.to_chrome_trace())

# Export as plain JSON for custom analysis
with open("pipeline_events.json", "w") as f:
f.write(trace.to_json())

See also