Skip to main content

Incremental Execution

When a small part of your input changes, you should not have to re-run the entire pipeline. dagron's incremental execution engine tracks which nodes are affected by a change, re-executes only those nodes, and applies early cutoff to stop propagation when a recomputed node produces the same result as before.

This guide explains the concepts behind incremental execution, walks through the IncrementalExecutor API, and shows real-world patterns for using it effectively.

Why incremental?

Consider a data pipeline with 20 nodes. If one source table changes, a naive executor re-runs all 20 tasks. With incremental execution:

  1. You declare which nodes changed (the dirty set).
  2. The executor computes the affected set — all downstream descendants of the dirty nodes.
  3. Only the affected set is re-executed, in topological order.
  4. If a recomputed node's output matches its previous output, the executor applies early cutoff and skips its descendants.

The result: you re-run 3 nodes instead of 20, saving minutes or hours on large pipelines.

Concepts

Dirty set

The dirty set is the set of nodes whose inputs have changed since the last run. You provide this set explicitly via the changed_nodes parameter.

Affected set

The affected set is the transitive closure of the dirty set's descendants. dagron computes this automatically.

Early cutoff

After re-executing a node, the executor compares its new result to the cached previous result. If they are equal, the node's descendants are not re-executed — even if they are in the affected set. This is called early cutoff and can dramatically reduce recomputation.

Reused set

Nodes that are not in the affected set (or were cut off early) keep their previous results. These are the reused nodes.

Incremental execution. A is dirty (red). B and C are recomputed (orange). D produces the same result as before so early cutoff applies (green). E and F are reused (blue).

IncrementalExecutor

Basic usage

import dagron

dag = (
dagron.DAG.builder()
.add_edge("source_a", "transform")
.add_edge("source_b", "transform")
.add_edge("transform", "aggregate")
.add_edge("aggregate", "report")
.build()
)

tasks = {
"source_a": lambda: [1, 2, 3],
"source_b": lambda: [4, 5, 6],
"transform": lambda: [1, 2, 3, 4, 5, 6],
"aggregate": lambda: {"sum": 21, "count": 6},
"report": lambda: "Report: 6 items, sum=21",
}

executor = dagron.IncrementalExecutor(dag)

# First run — everything executes
result = executor.execute(tasks)
print(result.recomputed) # ['source_a', 'source_b', 'transform', 'aggregate', 'report']
print(result.reused) # []
print(result.early_cutoff) # []

Subsequent run with changes

# Only source_a changed
tasks["source_a"] = lambda: [10, 20, 30]

result = executor.execute(tasks, changed_nodes=["source_a"])
print(result.recomputed) # ['source_a', 'transform', 'aggregate', 'report']
print(result.reused) # ['source_b']
print(result.early_cutoff) # []

Node source_b was not in the dirty set or the affected set, so its previous result is reused.

Only source_a changed. source_b is reused; everything downstream of source_a is recomputed.

Early cutoff in action

Early cutoff activates when a recomputed node produces the same result as its previous run:

# source_a changed, but transform produces the same result anyway
tasks["source_a"] = lambda: [1, 2, 3] # same as original
tasks["transform"] = lambda: [1, 2, 3, 4, 5, 6] # same output

result = executor.execute(tasks, changed_nodes=["source_a"])
print(result.recomputed) # ['source_a', 'transform']
print(result.early_cutoff) # ['transform']
print(result.reused) # ['source_b', 'aggregate', 'report']

Even though transform is downstream of the dirty node, its output did not change, so aggregate and report are not re-executed.

Early cutoff at transform (green) prevents recomputation of aggregate and report.

Constructor parameters

dagron.IncrementalExecutor(
dag, # The DAG
callbacks=None, # ExecutionCallbacks instance
fail_fast=True, # Stop on first failure?
enable_tracing=False, # Record execution trace?
)

The constructor is similar to DAGExecutor but without max_workers or costs, because incremental execution focuses on minimising work rather than parallelising it.

IncrementalResult

The .execute() method returns an IncrementalResult that extends ExecutionResult with three additional fields:

FieldTypeDescription
recomputedlist[str]Nodes that were actually re-executed
early_cutofflist[str]Nodes where early cutoff stopped propagation
reusedlist[str]Nodes whose previous results were kept
result = executor.execute(tasks, changed_nodes=["source_a"])

# Standard ExecutionResult fields still work
print(result.succeeded)
print(result.total_duration_seconds)
for name, nr in result.node_results.items():
print(f"{name}: {nr.status}")

# Incremental-specific fields
print(f"Recomputed: {result.recomputed}")
print(f"Early cutoff: {result.early_cutoff}")
print(f"Reused: {result.reused}")

Computing dirty sets

Manual dirty set

The simplest approach: you know which inputs changed, so you list them explicitly:

result = executor.execute(tasks, changed_nodes=["source_a"])

Using DAG's dirty_set()

If you track changes at a finer granularity (e.g., file modification times), dagron can compute the dirty set for you:

# After changing node configurations or data sources,
# ask the DAG which nodes are dirty:
dirty = dag.dirty_set(
changed=["source_a"],
)
print(dirty) # ['source_a', 'transform', 'aggregate', 'report']

The dirty_set() method returns the full affected set, not just the immediate changes. This is the set of nodes that might need re-execution (before early cutoff).

Using change_provenance()

For more sophisticated change tracking, use change_provenance() to understand why a node is dirty:

provenance = dag.change_provenance(changed=["source_a"])
for node, reason in provenance.items():
print(f"{node}: dirty because of {reason}")
# source_a: dirty because of direct change
# transform: dirty because of ancestor source_a
# aggregate: dirty because of ancestor source_a
# report: dirty because of ancestor source_a

Callbacks with incremental execution

Callbacks work the same as with DAGExecutor, but you get additional events for skip/reuse:

class IncrementalLogger:
def on_node_start(self, name):
print(f" [RUN] {name}")

def on_node_complete(self, name, result):
print(f" [DONE] {name}")

def on_node_skip(self, name):
print(f" [REUSE] {name}")

executor = dagron.IncrementalExecutor(dag, callbacks=IncrementalLogger())
result = executor.execute(tasks, changed_nodes=["source_a"])

Fail-fast behaviour

When fail_fast=True (the default), a failure in any recomputed node skips its descendants — just like the standard executor:

tasks["transform"] = lambda: (_ for _ in ()).throw(ValueError("bad data"))

result = executor.execute(tasks, changed_nodes=["source_a"])
print(result.failed) # 1
print(result.skipped) # 2 (aggregate, report)
NodeStatus
source_acompleted
source_bcompleted (reused)
transformfailed
aggregateskipped
reportskipped

Real-world example: incremental ML training

import dagron
import hashlib
import json

dag = (
dagron.DAG.builder()
.add_edge("load_data", "feature_eng")
.add_edge("feature_eng", "train")
.add_edge("train", "evaluate")
.add_edge("evaluate", "report")
.build()
)

# Simulate expensive tasks
def load_data():
return {"rows": 10000, "checksum": "abc123"}

def feature_eng():
return {"features": 50, "checksum": "def456"}

def train():
import time
time.sleep(2) # expensive!
return {"model": "xgb_v1", "accuracy": 0.95}

def evaluate():
return {"accuracy": 0.95, "f1": 0.93}

def report():
return "Model accuracy: 95%"

tasks = {
"load_data": load_data,
"feature_eng": feature_eng,
"train": train,
"evaluate": evaluate,
"report": report,
}

executor = dagron.IncrementalExecutor(dag, enable_tracing=True)

# First run: everything executes
result = executor.execute(tasks)
print(f"First run: {result.total_duration_seconds:.1f}s, recomputed={len(result.recomputed)}")

# New data arrives, but only load_data changes
tasks["load_data"] = lambda: {"rows": 10001, "checksum": "abc124"}

# Second run: only affected nodes execute
result = executor.execute(tasks, changed_nodes=["load_data"])
print(f"Second run: {result.total_duration_seconds:.1f}s, recomputed={len(result.recomputed)}")
print(f"Reused: {result.reused}")

Combining with tracing

Enable tracing to understand incremental execution performance:

executor = dagron.IncrementalExecutor(dag, enable_tracing=True)
result = executor.execute(tasks, changed_nodes=["source_a"])

# The trace shows which nodes were recomputed vs reused
if result.trace:
print(result.trace.summary())

See Tracing & Profiling for the full tracing guide.

Tips for effective incremental execution

  1. Keep tasks deterministic. Early cutoff works best when the same inputs produce the same outputs. Non-deterministic tasks (e.g., those using random seeds or wall-clock time) defeat early cutoff.

  2. Use fine-grained nodes. The more granular your nodes, the more opportunities for early cutoff. A single monolithic "transform" node that changes its output on every run provides no cutoff benefit.

  3. Track changes accurately. Over-reporting dirty nodes wastes computation. Under-reporting produces stale results. Use checksums or file modification times for accurate change detection.

  4. Combine with checkpointing. For long-running pipelines, use CheckpointExecutor alongside incremental execution to resume from failures without losing incremental state.

  5. Monitor cutoff rates. Track len(result.early_cutoff) over time. If cutoff rates are low, your nodes may not be deterministic or your change detection may be too coarse.

API reference

Class / MethodDocs
IncrementalExecutorIncremental
IncrementalResultIncremental
dag.dirty_set()DAG
dag.change_provenance()DAG

Next steps