Profiling
The profiling module analyzes completed executions against the DAG structure
to identify the critical path, compute slack for every node, detect
bottlenecks, and measure parallelism efficiency. Unlike tracing (which
records events in real time), profiling is a post-execution analysis step
that requires both the DAG and a completed ExecutionResult.
For a guided walkthrough, see Tracing & Profiling.
from dagron.execution.profiling import profile_execution, ProfileReport, NodeProfile
profile_execution
def profile_execution(
dag: DAG,
result: ExecutionResult,
) -> ProfileReport
Analyze an execution result against the DAG structure. This function performs a forward and backward pass over the DAG to compute earliest start times, latest start times, slack, and critical path membership from actual recorded durations.
| Parameter | Type | Default | Description |
|---|---|---|---|
dag | DAG | required | The DAG that was executed. |
result | ExecutionResult | required | The execution result containing per-node timings and statuses. |
Returns: ProfileReport -- A complete profiling report with per-node
analysis, critical path, bottlenecks, and efficiency metrics.
The function performs the following analysis:
- Forward pass -- compute earliest start time for each node based on predecessor completion times.
- Backward pass -- compute latest start time from the makespan working backwards.
- Slack -- the difference between latest and earliest start times. Zero-slack nodes are on the critical path.
- Bottleneck scoring -- nodes ranked by
duration * (1 + descendant_count). - Parallelism efficiency -- ratio of total work to makespan, indicating how well the DAG exploits concurrency.
import dagron
from dagron.execution.profiling import profile_execution
dag = (
dagron.DAG.builder()
.add_edge("extract", "transform_a")
.add_edge("extract", "transform_b")
.add_edge("transform_a", "merge")
.add_edge("transform_b", "merge")
.add_edge("merge", "load")
.build()
)
executor = dagron.DAGExecutor(dag, max_workers=4)
result = executor.execute(tasks)
report = profile_execution(dag, result)
print(report.summary())
Only nodes with NodeStatus.COMPLETED are included in the analysis.
Failed, skipped, or cancelled nodes are excluded from critical path
and slack computations.
ProfileReport
@dataclass
class ProfileReport:
node_profiles: dict[str, NodeProfile] = field(default_factory=dict)
critical_path: list[str] = field(default_factory=list)
critical_path_duration: float = 0.0
bottlenecks: list[str] = field(default_factory=list)
parallelism_efficiency: float = 0.0
actual_max_parallelism: int = 0
Complete profiling report for a DAG execution. Contains per-node profiles, the critical path, bottleneck rankings, and overall efficiency metrics.
| Parameter | Type | Default | Description |
|---|---|---|---|
node_profiles | dict[str, NodeProfile] | {} | Mapping of node names to their individual profile data. |
critical_path | list[str] | [] | Ordered list of node names on the critical path (longest weighted path). |
critical_path_duration | float | 0.0 | Total duration of the critical path in seconds. |
bottlenecks | list[str] | [] | Top 5 nodes ranked by bottleneck score (duration weighted by descendant count). |
parallelism_efficiency | float | 0.0 | Ratio of total work to makespan. A value of 1.0 means fully sequential; higher values indicate more parallelism was exploited. |
actual_max_parallelism | int | 0 | Maximum number of nodes that ran (or could run) in parallel at any topological level. |
Methods
ProfileReport.summary
def summary(self) -> str
Return a human-readable summary of the profiling report.
Returns: str -- Multi-line summary including critical path, efficiency, and bottleneck list.
print(report.summary())
# Profile Report
# Nodes profiled: 5
# Critical path: extract -> transform_a -> merge -> load
# Critical path duration: 2.3456s
# Parallelism efficiency: 1.80
# Max parallelism: 2
# Bottlenecks: extract, merge, transform_a, transform_b, load
ProfileReport.to_dict
def to_dict(self) -> dict[str, Any]
Convert the report to a plain dictionary suitable for JSON serialization or logging.
Returns: dict[str, Any] -- Dictionary representation of the report, including nested node profiles.
import json
report_dict = report.to_dict()
print(json.dumps(report_dict, indent=2))
# Access specific node data
extract_profile = report_dict["node_profiles"]["extract"]
print(f"Extract slack: {extract_profile['slack']:.4f}s")
NodeProfile
@dataclass
class NodeProfile:
name: str
duration: float
earliest_start: float
latest_start: float
slack: float
on_critical_path: bool
blocked_descendants: int
Profile data for a single node, computed from the forward/backward pass over actual execution timings.
| Parameter | Type | Default | Description |
|---|---|---|---|
name | str | required | The node name. |
duration | float | required | Actual execution duration in seconds. |
earliest_start | float | required | Earliest possible start time based on predecessor completion. |
latest_start | float | required | Latest start time that would not delay the overall makespan. |
slack | float | required | Scheduling flexibility (latest_start - earliest_start). Zero means the node is on the critical path. |
on_critical_path | bool | required | Whether this node lies on the critical path. |
blocked_descendants | int | required | Number of downstream nodes that depend (transitively) on this node completing. |
for name, profile in report.node_profiles.items():
status = "CRITICAL" if profile.on_critical_path else f"slack={profile.slack:.3f}s"
print(f"{name}: {profile.duration:.3f}s ({status}), blocks {profile.blocked_descendants} nodes")
Understanding slack
Slack represents how much a node's start time can be delayed without affecting the overall pipeline completion time. Nodes with zero slack form the critical path -- any delay in these nodes directly delays the entire execution.
# Find nodes with scheduling flexibility
flexible = [
(name, p.slack)
for name, p in report.node_profiles.items()
if p.slack > 0
]
flexible.sort(key=lambda x: -x[1])
for name, slack in flexible:
print(f"{name}: can be delayed by {slack:.3f}s without impact")
Complete example
import dagron
from dagron.execution.profiling import profile_execution
# Build and execute a pipeline
dag = (
dagron.DAG.builder()
.add_edge("fetch_api", "parse_api")
.add_edge("fetch_db", "parse_db")
.add_edge("parse_api", "merge")
.add_edge("parse_db", "merge")
.add_edge("merge", "validate")
.add_edge("validate", "store")
.build()
)
import time
tasks = {
"fetch_api": lambda: time.sleep(0.5) or "api_data",
"fetch_db": lambda: time.sleep(0.3) or "db_data",
"parse_api": lambda: time.sleep(0.2) or "parsed_api",
"parse_db": lambda: time.sleep(0.1) or "parsed_db",
"merge": lambda: time.sleep(0.15) or "merged",
"validate": lambda: time.sleep(0.05) or "valid",
"store": lambda: time.sleep(0.1) or "stored",
}
executor = dagron.DAGExecutor(dag, max_workers=4)
result = executor.execute(tasks)
# Profile the execution
report = profile_execution(dag, result)
# Print the overall summary
print(report.summary())
# Identify the critical path
print("\nCritical path:")
for node_name in report.critical_path:
p = report.node_profiles[node_name]
print(f" {node_name}: {p.duration:.3f}s")
print(f" Total: {report.critical_path_duration:.3f}s")
# Find optimization opportunities
print("\nOptimization targets (nodes with most blocked descendants):")
ranked = sorted(
report.node_profiles.values(),
key=lambda p: p.blocked_descendants,
reverse=True,
)
for p in ranked[:3]:
print(f" {p.name}: blocks {p.blocked_descendants} nodes, duration {p.duration:.3f}s")
# Export for logging
import json
print(json.dumps(report.to_dict(), indent=2))
See also
- Tracing -- the event recording system that feeds execution timings.
- Analysis -- structural analysis including
explain()andwhat_if(). - Tracing & Profiling guide -- end-to-end walkthrough.