Skip to main content

Profiling

The profiling module analyzes completed executions against the DAG structure to identify the critical path, compute slack for every node, detect bottlenecks, and measure parallelism efficiency. Unlike tracing (which records events in real time), profiling is a post-execution analysis step that requires both the DAG and a completed ExecutionResult.

For a guided walkthrough, see Tracing & Profiling.

from dagron.execution.profiling import profile_execution, ProfileReport, NodeProfile

profile_execution

profile_execution
def profile_execution(
dag: DAG,
result: ExecutionResult,
) -> ProfileReport

Analyze an execution result against the DAG structure. This function performs a forward and backward pass over the DAG to compute earliest start times, latest start times, slack, and critical path membership from actual recorded durations.

ParameterTypeDefaultDescription
dagDAGrequiredThe DAG that was executed.
resultExecutionResultrequiredThe execution result containing per-node timings and statuses.

Returns: ProfileReport -- A complete profiling report with per-node analysis, critical path, bottlenecks, and efficiency metrics.

The function performs the following analysis:

  1. Forward pass -- compute earliest start time for each node based on predecessor completion times.
  2. Backward pass -- compute latest start time from the makespan working backwards.
  3. Slack -- the difference between latest and earliest start times. Zero-slack nodes are on the critical path.
  4. Bottleneck scoring -- nodes ranked by duration * (1 + descendant_count).
  5. Parallelism efficiency -- ratio of total work to makespan, indicating how well the DAG exploits concurrency.
import dagron
from dagron.execution.profiling import profile_execution

dag = (
dagron.DAG.builder()
.add_edge("extract", "transform_a")
.add_edge("extract", "transform_b")
.add_edge("transform_a", "merge")
.add_edge("transform_b", "merge")
.add_edge("merge", "load")
.build()
)

executor = dagron.DAGExecutor(dag, max_workers=4)
result = executor.execute(tasks)

report = profile_execution(dag, result)
print(report.summary())
note

Only nodes with NodeStatus.COMPLETED are included in the analysis. Failed, skipped, or cancelled nodes are excluded from critical path and slack computations.


ProfileReport

ProfileReport
@dataclass
class ProfileReport:
node_profiles: dict[str, NodeProfile] = field(default_factory=dict)
critical_path: list[str] = field(default_factory=list)
critical_path_duration: float = 0.0
bottlenecks: list[str] = field(default_factory=list)
parallelism_efficiency: float = 0.0
actual_max_parallelism: int = 0

Complete profiling report for a DAG execution. Contains per-node profiles, the critical path, bottleneck rankings, and overall efficiency metrics.

ParameterTypeDefaultDescription
node_profilesdict[str, NodeProfile]{}Mapping of node names to their individual profile data.
critical_pathlist[str][]Ordered list of node names on the critical path (longest weighted path).
critical_path_durationfloat0.0Total duration of the critical path in seconds.
bottleneckslist[str][]Top 5 nodes ranked by bottleneck score (duration weighted by descendant count).
parallelism_efficiencyfloat0.0Ratio of total work to makespan. A value of 1.0 means fully sequential; higher values indicate more parallelism was exploited.
actual_max_parallelismint0Maximum number of nodes that ran (or could run) in parallel at any topological level.

Methods


ProfileReport.summary

ProfileReport.summary
def summary(self) -> str

Return a human-readable summary of the profiling report.

Returns: str -- Multi-line summary including critical path, efficiency, and bottleneck list.

print(report.summary())
# Profile Report
# Nodes profiled: 5
# Critical path: extract -> transform_a -> merge -> load
# Critical path duration: 2.3456s
# Parallelism efficiency: 1.80
# Max parallelism: 2
# Bottlenecks: extract, merge, transform_a, transform_b, load

ProfileReport.to_dict

ProfileReport.to_dict
def to_dict(self) -> dict[str, Any]

Convert the report to a plain dictionary suitable for JSON serialization or logging.

Returns: dict[str, Any] -- Dictionary representation of the report, including nested node profiles.

import json

report_dict = report.to_dict()
print(json.dumps(report_dict, indent=2))

# Access specific node data
extract_profile = report_dict["node_profiles"]["extract"]
print(f"Extract slack: {extract_profile['slack']:.4f}s")

NodeProfile

NodeProfile
@dataclass
class NodeProfile:
name: str
duration: float
earliest_start: float
latest_start: float
slack: float
on_critical_path: bool
blocked_descendants: int

Profile data for a single node, computed from the forward/backward pass over actual execution timings.

ParameterTypeDefaultDescription
namestrrequiredThe node name.
durationfloatrequiredActual execution duration in seconds.
earliest_startfloatrequiredEarliest possible start time based on predecessor completion.
latest_startfloatrequiredLatest start time that would not delay the overall makespan.
slackfloatrequiredScheduling flexibility (latest_start - earliest_start). Zero means the node is on the critical path.
on_critical_pathboolrequiredWhether this node lies on the critical path.
blocked_descendantsintrequiredNumber of downstream nodes that depend (transitively) on this node completing.
for name, profile in report.node_profiles.items():
status = "CRITICAL" if profile.on_critical_path else f"slack={profile.slack:.3f}s"
print(f"{name}: {profile.duration:.3f}s ({status}), blocks {profile.blocked_descendants} nodes")

Understanding slack

Slack represents how much a node's start time can be delayed without affecting the overall pipeline completion time. Nodes with zero slack form the critical path -- any delay in these nodes directly delays the entire execution.

# Find nodes with scheduling flexibility
flexible = [
(name, p.slack)
for name, p in report.node_profiles.items()
if p.slack > 0
]
flexible.sort(key=lambda x: -x[1])

for name, slack in flexible:
print(f"{name}: can be delayed by {slack:.3f}s without impact")

Complete example

import dagron
from dagron.execution.profiling import profile_execution

# Build and execute a pipeline
dag = (
dagron.DAG.builder()
.add_edge("fetch_api", "parse_api")
.add_edge("fetch_db", "parse_db")
.add_edge("parse_api", "merge")
.add_edge("parse_db", "merge")
.add_edge("merge", "validate")
.add_edge("validate", "store")
.build()
)

import time

tasks = {
"fetch_api": lambda: time.sleep(0.5) or "api_data",
"fetch_db": lambda: time.sleep(0.3) or "db_data",
"parse_api": lambda: time.sleep(0.2) or "parsed_api",
"parse_db": lambda: time.sleep(0.1) or "parsed_db",
"merge": lambda: time.sleep(0.15) or "merged",
"validate": lambda: time.sleep(0.05) or "valid",
"store": lambda: time.sleep(0.1) or "stored",
}

executor = dagron.DAGExecutor(dag, max_workers=4)
result = executor.execute(tasks)

# Profile the execution
report = profile_execution(dag, result)

# Print the overall summary
print(report.summary())

# Identify the critical path
print("\nCritical path:")
for node_name in report.critical_path:
p = report.node_profiles[node_name]
print(f" {node_name}: {p.duration:.3f}s")
print(f" Total: {report.critical_path_duration:.3f}s")

# Find optimization opportunities
print("\nOptimization targets (nodes with most blocked descendants):")
ranked = sorted(
report.node_profiles.values(),
key=lambda p: p.blocked_descendants,
reverse=True,
)
for p in ranked[:3]:
print(f" {p.name}: blocks {p.blocked_descendants} nodes, duration {p.duration:.3f}s")

# Export for logging
import json
print(json.dumps(report.to_dict(), indent=2))

See also