Skip to main content

Incremental Execution

The incremental execution module provides an executor that re-runs only the nodes affected by a set of changes. Unchanged nodes are reused from the previous run, dramatically reducing execution time for large DAGs where only a few inputs have changed.

dagron computes the "dirty set" from the changed nodes and their downstream descendants, then applies early cutoff optimization: if a recomputed node produces the same result as its cached value, its descendants are not recomputed even if they were in the initial dirty set.

See the Incremental Execution guide for workflow patterns and best practices.


IncrementalExecutor

IncrementalExecutor
class IncrementalExecutor(
dag: DAG,
callbacks: ExecutionCallbacks | None = None,
fail_fast: bool = True,
enable_tracing: bool = False,
)

An executor that tracks previous results and only re-executes nodes in the dirty set. On the first invocation, all nodes are executed. On subsequent invocations with changed_nodes, only the affected subset is re-executed.

ParameterTypeDefaultDescription
dagDAGrequiredThe DAG whose topology drives execution order.
callbacksExecutionCallbacks | NoneNoneOptional lifecycle callbacks.
fail_fastboolTrueIf True, skip downstream nodes when any node fails.
enable_tracingboolFalseIf True, record a Chrome-compatible execution trace.

execute

IncrementalExecutor.execute
def execute(
tasks: dict[str, Callable],
changed_nodes: list[str] | None = None,
) -> IncrementalResult

Execute tasks incrementally. On the first call (or when changed_nodes is None), all nodes are executed. On subsequent calls, only nodes downstream of the changed nodes are re-executed. The executor applies early cutoff: if a recomputed node produces the same output as the cached value, its descendants are skipped.

ParameterTypeDefaultDescription
tasksdict[str, Callable]requiredMap of node names to callable tasks.
changed_nodeslist[str] | NoneNoneNode names that have changed since the last execution. None means execute all.

Returns: IncrementalResult

import dagron

dag = (
dagron.DAG.builder()
.add_node("source_a")
.add_node("source_b")
.add_node("transform")
.add_node("aggregate")
.add_node("report")
.add_edge("source_a", "transform")
.add_edge("source_b", "transform")
.add_edge("transform", "aggregate")
.add_edge("aggregate", "report")
.build()
)

tasks = {
"source_a": lambda: [1, 2, 3],
"source_b": lambda: [4, 5, 6],
"transform": lambda: "transformed",
"aggregate": lambda: "aggregated",
"report": lambda: "report ready",
}

executor = dagron.IncrementalExecutor(dag)

# First run: execute everything
result = executor.execute(tasks)
print(result.recomputed) # 5 (all nodes)
print(result.reused) # 0

# Second run: only source_a changed
result = executor.execute(tasks, changed_nodes=["source_a"])
print(result.recomputed) # 4 (source_a, transform, aggregate, report)
print(result.reused) # 1 (source_b)

IncrementalResult

IncrementalResult
class IncrementalResult(
node_results: dict[str, NodeResult],
recomputed: int,
early_cutoff: int,
reused: int,
provenance: dict[str, list[str]],
total_duration_seconds: float,
trace: Trace | None,
)

The result of an incremental execution. Extends the standard execution result with incremental-specific metrics.

PropertyTypeDescription
node_resultsdict[str, NodeResult]Per-node results keyed by node name. Includes both recomputed and reused nodes.
recomputedintNumber of nodes that were actually re-executed.
early_cutoffintNumber of nodes in the dirty set that were skipped because an upstream recomputation produced the same result.
reusedintNumber of nodes that were reused from the previous execution without recomputation.
provenancedict[str, list[str]]For each recomputed node, the list of changed root nodes responsible for its invalidation.
total_duration_secondsfloatWall-clock duration of this incremental execution.
trace`TraceNone`
result = executor.execute(tasks, changed_nodes=["source_a"])

print(f"Recomputed: {result.recomputed}")
print(f"Early cutoff: {result.early_cutoff}")
print(f"Reused: {result.reused}")
print(f"Duration: {result.total_duration_seconds:.3f}s")

# Why was each node recomputed?
for node, causes in result.provenance.items():
print(f" {node} invalidated by: {causes}")

How Dirty Set Computation Works

The dirty set is the set of nodes that must be re-evaluated. It is computed as follows:

  1. Start with the explicitly changed_nodes.
  2. Add all downstream descendants (transitive successors) of each changed node.
  3. The union of these sets is the dirty set.

You can preview the dirty set without executing via DAG.dirty_set():

dirty = dag.dirty_set(["source_a"])
print(dirty) # ["source_a", "transform", "aggregate", "report"]

And the provenance (which change caused which recomputation) via DAG.change_provenance():

prov = dag.change_provenance(["source_a"])
print(prov)
# {
# "source_a": ["source_a"],
# "transform": ["source_a"],
# "aggregate": ["source_a"],
# "report": ["source_a"],
# }

Early Cutoff

Early cutoff is an optimization that stops propagation when a recomputed node produces the same result as its cached value. The comparison uses Python's == operator.

For example, if transform is in the dirty set but produces the same output as the previous run, then aggregate and report are cut off and reused from cache, even though they were in the dirty set.

counter = {"calls": 0}

def transform():
counter["calls"] += 1
return "always_same" # same output regardless of input

# First run
result1 = executor.execute(tasks)

# Second run: source_a changed, but transform produces same output
result2 = executor.execute(tasks, changed_nodes=["source_a"])
print(result2.early_cutoff) # 2 (aggregate and report were cut off)
print(result2.recomputed) # 2 (source_a and transform were recomputed)
print(result2.reused) # 1 (source_b)

Complete Example

A build system that recompiles only changed source files:

import dagron

dag = (
dagron.DAG.builder()
.add_node("parse_module_a")
.add_node("parse_module_b")
.add_node("type_check")
.add_node("optimize")
.add_node("codegen")
.add_node("link")
.add_edge("parse_module_a", "type_check")
.add_edge("parse_module_b", "type_check")
.add_edge("type_check", "optimize")
.add_edge("optimize", "codegen")
.add_edge("codegen", "link")
.build()
)

tasks = {
"parse_module_a": lambda: "ast_a",
"parse_module_b": lambda: "ast_b",
"type_check": lambda: "typed_ast",
"optimize": lambda: "optimized_ir",
"codegen": lambda: "machine_code",
"link": lambda: "executable",
}

executor = dagron.IncrementalExecutor(dag, enable_tracing=True)

# Initial full build
result = executor.execute(tasks)
print(f"Full build: {result.recomputed} nodes in {result.total_duration_seconds:.3f}s")

# Developer edits module_a.py — incremental rebuild
result = executor.execute(tasks, changed_nodes=["parse_module_a"])
print(f"Incremental: {result.recomputed} recomputed, {result.reused} reused")
print(f"Speed improvement: {result.reused}/{result.recomputed + result.reused} nodes skipped")