Incremental Execution
The incremental execution module provides an executor that re-runs only the nodes affected by a set of changes. Unchanged nodes are reused from the previous run, dramatically reducing execution time for large DAGs where only a few inputs have changed.
dagron computes the "dirty set" from the changed nodes and their downstream descendants, then applies early cutoff optimization: if a recomputed node produces the same result as its cached value, its descendants are not recomputed even if they were in the initial dirty set.
See the Incremental Execution guide for workflow patterns and best practices.
IncrementalExecutor
class IncrementalExecutor(
dag: DAG,
callbacks: ExecutionCallbacks | None = None,
fail_fast: bool = True,
enable_tracing: bool = False,
)
An executor that tracks previous results and only re-executes nodes in the dirty
set. On the first invocation, all nodes are executed. On subsequent invocations
with changed_nodes, only the affected subset is re-executed.
| Parameter | Type | Default | Description |
|---|---|---|---|
dag | DAG | required | The DAG whose topology drives execution order. |
callbacks | ExecutionCallbacks | None | None | Optional lifecycle callbacks. |
fail_fast | bool | True | If True, skip downstream nodes when any node fails. |
enable_tracing | bool | False | If True, record a Chrome-compatible execution trace. |
execute
def execute(
tasks: dict[str, Callable],
changed_nodes: list[str] | None = None,
) -> IncrementalResult
Execute tasks incrementally. On the first call (or when changed_nodes is
None), all nodes are executed. On subsequent calls, only nodes downstream of
the changed nodes are re-executed. The executor applies early cutoff: if a
recomputed node produces the same output as the cached value, its descendants
are skipped.
| Parameter | Type | Default | Description |
|---|---|---|---|
tasks | dict[str, Callable] | required | Map of node names to callable tasks. |
changed_nodes | list[str] | None | None | Node names that have changed since the last execution. None means execute all. |
Returns: IncrementalResult
import dagron
dag = (
dagron.DAG.builder()
.add_node("source_a")
.add_node("source_b")
.add_node("transform")
.add_node("aggregate")
.add_node("report")
.add_edge("source_a", "transform")
.add_edge("source_b", "transform")
.add_edge("transform", "aggregate")
.add_edge("aggregate", "report")
.build()
)
tasks = {
"source_a": lambda: [1, 2, 3],
"source_b": lambda: [4, 5, 6],
"transform": lambda: "transformed",
"aggregate": lambda: "aggregated",
"report": lambda: "report ready",
}
executor = dagron.IncrementalExecutor(dag)
# First run: execute everything
result = executor.execute(tasks)
print(result.recomputed) # 5 (all nodes)
print(result.reused) # 0
# Second run: only source_a changed
result = executor.execute(tasks, changed_nodes=["source_a"])
print(result.recomputed) # 4 (source_a, transform, aggregate, report)
print(result.reused) # 1 (source_b)
IncrementalResult
class IncrementalResult(
node_results: dict[str, NodeResult],
recomputed: int,
early_cutoff: int,
reused: int,
provenance: dict[str, list[str]],
total_duration_seconds: float,
trace: Trace | None,
)
The result of an incremental execution. Extends the standard execution result with incremental-specific metrics.
| Property | Type | Description |
|---|---|---|
node_results | dict[str, NodeResult] | Per-node results keyed by node name. Includes both recomputed and reused nodes. |
recomputed | int | Number of nodes that were actually re-executed. |
early_cutoff | int | Number of nodes in the dirty set that were skipped because an upstream recomputation produced the same result. |
reused | int | Number of nodes that were reused from the previous execution without recomputation. |
provenance | dict[str, list[str]] | For each recomputed node, the list of changed root nodes responsible for its invalidation. |
total_duration_seconds | float | Wall-clock duration of this incremental execution. |
trace | `Trace | None` |
result = executor.execute(tasks, changed_nodes=["source_a"])
print(f"Recomputed: {result.recomputed}")
print(f"Early cutoff: {result.early_cutoff}")
print(f"Reused: {result.reused}")
print(f"Duration: {result.total_duration_seconds:.3f}s")
# Why was each node recomputed?
for node, causes in result.provenance.items():
print(f" {node} invalidated by: {causes}")
How Dirty Set Computation Works
The dirty set is the set of nodes that must be re-evaluated. It is computed as follows:
- Start with the explicitly
changed_nodes. - Add all downstream descendants (transitive successors) of each changed node.
- The union of these sets is the dirty set.
You can preview the dirty set without executing via DAG.dirty_set():
dirty = dag.dirty_set(["source_a"])
print(dirty) # ["source_a", "transform", "aggregate", "report"]
And the provenance (which change caused which recomputation) via
DAG.change_provenance():
prov = dag.change_provenance(["source_a"])
print(prov)
# {
# "source_a": ["source_a"],
# "transform": ["source_a"],
# "aggregate": ["source_a"],
# "report": ["source_a"],
# }
Early Cutoff
Early cutoff is an optimization that stops propagation when a recomputed node
produces the same result as its cached value. The comparison uses Python's ==
operator.
For example, if transform is in the dirty set but produces the same output as
the previous run, then aggregate and report are cut off and reused from
cache, even though they were in the dirty set.
counter = {"calls": 0}
def transform():
counter["calls"] += 1
return "always_same" # same output regardless of input
# First run
result1 = executor.execute(tasks)
# Second run: source_a changed, but transform produces same output
result2 = executor.execute(tasks, changed_nodes=["source_a"])
print(result2.early_cutoff) # 2 (aggregate and report were cut off)
print(result2.recomputed) # 2 (source_a and transform were recomputed)
print(result2.reused) # 1 (source_b)
Complete Example
A build system that recompiles only changed source files:
import dagron
dag = (
dagron.DAG.builder()
.add_node("parse_module_a")
.add_node("parse_module_b")
.add_node("type_check")
.add_node("optimize")
.add_node("codegen")
.add_node("link")
.add_edge("parse_module_a", "type_check")
.add_edge("parse_module_b", "type_check")
.add_edge("type_check", "optimize")
.add_edge("optimize", "codegen")
.add_edge("codegen", "link")
.build()
)
tasks = {
"parse_module_a": lambda: "ast_a",
"parse_module_b": lambda: "ast_b",
"type_check": lambda: "typed_ast",
"optimize": lambda: "optimized_ir",
"codegen": lambda: "machine_code",
"link": lambda: "executable",
}
executor = dagron.IncrementalExecutor(dag, enable_tracing=True)
# Initial full build
result = executor.execute(tasks)
print(f"Full build: {result.recomputed} nodes in {result.total_duration_seconds:.3f}s")
# Developer edits module_a.py — incremental rebuild
result = executor.execute(tasks, changed_nodes=["parse_module_a"])
print(f"Incremental: {result.recomputed} recomputed, {result.reused} reused")
print(f"Speed improvement: {result.reused}/{result.recomputed + result.reused} nodes skipped")
Related
- DAG.dirty_set — preview the dirty set without executing.
- DAG.change_provenance — understand why each node needs recomputation.
- Caching — content-addressable caching for persistent result storage.
- Checkpointing — save and resume execution state.
- Executing Tasks — general execution guide.