Incremental Execution
When a small part of your input changes, you should not have to re-run the entire pipeline. dagron's incremental execution engine tracks which nodes are affected by a change, re-executes only those nodes, and applies early cutoff to stop propagation when a recomputed node produces the same result as before.
This guide explains the concepts behind incremental execution, walks through the
IncrementalExecutor API, and shows real-world
patterns for using it effectively.
Why incremental?
Consider a data pipeline with 20 nodes. If one source table changes, a naive executor re-runs all 20 tasks. With incremental execution:
- You declare which nodes changed (the dirty set).
- The executor computes the affected set — all downstream descendants of the dirty nodes.
- Only the affected set is re-executed, in topological order.
- If a recomputed node's output matches its previous output, the executor applies early cutoff and skips its descendants.
The result: you re-run 3 nodes instead of 20, saving minutes or hours on large pipelines.
Concepts
Dirty set
The dirty set is the set of nodes whose inputs have changed since the last
run. You provide this set explicitly via the changed_nodes parameter.
Affected set
The affected set is the transitive closure of the dirty set's descendants. dagron computes this automatically.
Early cutoff
After re-executing a node, the executor compares its new result to the cached previous result. If they are equal, the node's descendants are not re-executed — even if they are in the affected set. This is called early cutoff and can dramatically reduce recomputation.
Reused set
Nodes that are not in the affected set (or were cut off early) keep their previous results. These are the reused nodes.
Incremental execution. A is dirty (red). B and C are recomputed (orange). D produces the same result as before so early cutoff applies (green). E and F are reused (blue).
IncrementalExecutor
Basic usage
import dagron
dag = (
dagron.DAG.builder()
.add_edge("source_a", "transform")
.add_edge("source_b", "transform")
.add_edge("transform", "aggregate")
.add_edge("aggregate", "report")
.build()
)
tasks = {
"source_a": lambda: [1, 2, 3],
"source_b": lambda: [4, 5, 6],
"transform": lambda: [1, 2, 3, 4, 5, 6],
"aggregate": lambda: {"sum": 21, "count": 6},
"report": lambda: "Report: 6 items, sum=21",
}
executor = dagron.IncrementalExecutor(dag)
# First run — everything executes
result = executor.execute(tasks)
print(result.recomputed) # ['source_a', 'source_b', 'transform', 'aggregate', 'report']
print(result.reused) # []
print(result.early_cutoff) # []
Subsequent run with changes
# Only source_a changed
tasks["source_a"] = lambda: [10, 20, 30]
result = executor.execute(tasks, changed_nodes=["source_a"])
print(result.recomputed) # ['source_a', 'transform', 'aggregate', 'report']
print(result.reused) # ['source_b']
print(result.early_cutoff) # []
Node source_b was not in the dirty set or the affected set, so its previous
result is reused.
Only source_a changed. source_b is reused; everything downstream of source_a is recomputed.
Early cutoff in action
Early cutoff activates when a recomputed node produces the same result as its previous run:
# source_a changed, but transform produces the same result anyway
tasks["source_a"] = lambda: [1, 2, 3] # same as original
tasks["transform"] = lambda: [1, 2, 3, 4, 5, 6] # same output
result = executor.execute(tasks, changed_nodes=["source_a"])
print(result.recomputed) # ['source_a', 'transform']
print(result.early_cutoff) # ['transform']
print(result.reused) # ['source_b', 'aggregate', 'report']
Even though transform is downstream of the dirty node, its output did not
change, so aggregate and report are not re-executed.
Early cutoff at transform (green) prevents recomputation of aggregate and report.
Constructor parameters
dagron.IncrementalExecutor(
dag, # The DAG
callbacks=None, # ExecutionCallbacks instance
fail_fast=True, # Stop on first failure?
enable_tracing=False, # Record execution trace?
)
The constructor is similar to DAGExecutor but without
max_workers or costs, because incremental execution focuses on minimising
work rather than parallelising it.
IncrementalResult
The .execute() method returns an IncrementalResult that extends
ExecutionResult with three additional fields:
| Field | Type | Description |
|---|---|---|
recomputed | list[str] | Nodes that were actually re-executed |
early_cutoff | list[str] | Nodes where early cutoff stopped propagation |
reused | list[str] | Nodes whose previous results were kept |
result = executor.execute(tasks, changed_nodes=["source_a"])
# Standard ExecutionResult fields still work
print(result.succeeded)
print(result.total_duration_seconds)
for name, nr in result.node_results.items():
print(f"{name}: {nr.status}")
# Incremental-specific fields
print(f"Recomputed: {result.recomputed}")
print(f"Early cutoff: {result.early_cutoff}")
print(f"Reused: {result.reused}")
Computing dirty sets
Manual dirty set
The simplest approach: you know which inputs changed, so you list them explicitly:
result = executor.execute(tasks, changed_nodes=["source_a"])
Using DAG's dirty_set()
If you track changes at a finer granularity (e.g., file modification times), dagron can compute the dirty set for you:
# After changing node configurations or data sources,
# ask the DAG which nodes are dirty:
dirty = dag.dirty_set(
changed=["source_a"],
)
print(dirty) # ['source_a', 'transform', 'aggregate', 'report']
The dirty_set() method returns the full affected set, not just the immediate
changes. This is the set of nodes that might need re-execution (before
early cutoff).
Using change_provenance()
For more sophisticated change tracking, use change_provenance() to understand
why a node is dirty:
provenance = dag.change_provenance(changed=["source_a"])
for node, reason in provenance.items():
print(f"{node}: dirty because of {reason}")
# source_a: dirty because of direct change
# transform: dirty because of ancestor source_a
# aggregate: dirty because of ancestor source_a
# report: dirty because of ancestor source_a
Callbacks with incremental execution
Callbacks work the same as with DAGExecutor, but you get additional events
for skip/reuse:
class IncrementalLogger:
def on_node_start(self, name):
print(f" [RUN] {name}")
def on_node_complete(self, name, result):
print(f" [DONE] {name}")
def on_node_skip(self, name):
print(f" [REUSE] {name}")
executor = dagron.IncrementalExecutor(dag, callbacks=IncrementalLogger())
result = executor.execute(tasks, changed_nodes=["source_a"])
Fail-fast behaviour
When fail_fast=True (the default), a failure in any recomputed node skips
its descendants — just like the standard executor:
tasks["transform"] = lambda: (_ for _ in ()).throw(ValueError("bad data"))
result = executor.execute(tasks, changed_nodes=["source_a"])
print(result.failed) # 1
print(result.skipped) # 2 (aggregate, report)
| Node | Status |
|---|---|
| source_a | completed |
| source_b | completed (reused) |
| transform | failed |
| aggregate | skipped |
| report | skipped |
Real-world example: incremental ML training
import dagron
import hashlib
import json
dag = (
dagron.DAG.builder()
.add_edge("load_data", "feature_eng")
.add_edge("feature_eng", "train")
.add_edge("train", "evaluate")
.add_edge("evaluate", "report")
.build()
)
# Simulate expensive tasks
def load_data():
return {"rows": 10000, "checksum": "abc123"}
def feature_eng():
return {"features": 50, "checksum": "def456"}
def train():
import time
time.sleep(2) # expensive!
return {"model": "xgb_v1", "accuracy": 0.95}
def evaluate():
return {"accuracy": 0.95, "f1": 0.93}
def report():
return "Model accuracy: 95%"
tasks = {
"load_data": load_data,
"feature_eng": feature_eng,
"train": train,
"evaluate": evaluate,
"report": report,
}
executor = dagron.IncrementalExecutor(dag, enable_tracing=True)
# First run: everything executes
result = executor.execute(tasks)
print(f"First run: {result.total_duration_seconds:.1f}s, recomputed={len(result.recomputed)}")
# New data arrives, but only load_data changes
tasks["load_data"] = lambda: {"rows": 10001, "checksum": "abc124"}
# Second run: only affected nodes execute
result = executor.execute(tasks, changed_nodes=["load_data"])
print(f"Second run: {result.total_duration_seconds:.1f}s, recomputed={len(result.recomputed)}")
print(f"Reused: {result.reused}")
Combining with tracing
Enable tracing to understand incremental execution performance:
executor = dagron.IncrementalExecutor(dag, enable_tracing=True)
result = executor.execute(tasks, changed_nodes=["source_a"])
# The trace shows which nodes were recomputed vs reused
if result.trace:
print(result.trace.summary())
See Tracing & Profiling for the full tracing guide.
Tips for effective incremental execution
-
Keep tasks deterministic. Early cutoff works best when the same inputs produce the same outputs. Non-deterministic tasks (e.g., those using random seeds or wall-clock time) defeat early cutoff.
-
Use fine-grained nodes. The more granular your nodes, the more opportunities for early cutoff. A single monolithic "transform" node that changes its output on every run provides no cutoff benefit.
-
Track changes accurately. Over-reporting dirty nodes wastes computation. Under-reporting produces stale results. Use checksums or file modification times for accurate change detection.
-
Combine with checkpointing. For long-running pipelines, use
CheckpointExecutoralongside incremental execution to resume from failures without losing incremental state. -
Monitor cutoff rates. Track
len(result.early_cutoff)over time. If cutoff rates are low, your nodes may not be deterministic or your change detection may be too coarse.
API reference
| Class / Method | Docs |
|---|---|
IncrementalExecutor | Incremental |
IncrementalResult | Incremental |
dag.dirty_set() | DAG |
dag.change_provenance() | DAG |
Next steps
- Tracing & Profiling — combine tracing with incremental runs.
- Checkpointing — persist incremental state across restarts.
- Conditional Execution — combine conditions with incremental logic.