Checkpointing
The checkpointing module allows you to persist execution progress to disk and resume after failures. When a node completes, its result is saved to a checkpoint directory. If execution is interrupted (crash, timeout, manual stop), you can resume from where it left off without re-executing completed nodes.
See the Checkpointing guide for usage patterns and failure recovery strategies.
CheckpointExecutor
class CheckpointExecutor(
dag: DAG,
checkpoint_dir: str,
callbacks: ExecutionCallbacks | None = None,
fail_fast: bool = True,
enable_tracing: bool = False,
)
An executor that saves completed node results to a checkpoint directory. On
failure, call .resume() to pick up where execution left off.
| Parameter | Type | Default | Description |
|---|---|---|---|
dag | DAG | required | The DAG whose topology drives execution order. |
checkpoint_dir | str | required | Directory for storing checkpoint files. Created if it does not exist. |
callbacks | ExecutionCallbacks | None | None | Optional lifecycle callbacks. |
fail_fast | bool | True | If True, skip downstream nodes when any node fails. |
enable_tracing | bool | False | If True, record a Chrome-compatible execution trace. |
execute
def execute(
tasks: dict[str, Callable],
) -> ExecutionResult
Execute all tasks, saving results to the checkpoint directory as each node completes. If a previous checkpoint exists, it is cleared and a fresh execution begins.
| Parameter | Type | Default | Description |
|---|---|---|---|
tasks | dict[str, Callable] | required | Map of node names to callable tasks. |
Returns: ExecutionResult
import dagron
dag = (
dagron.DAG.builder()
.add_node("download").add_node("parse").add_node("validate")
.add_node("transform").add_node("upload")
.add_edge("download", "parse")
.add_edge("parse", "validate")
.add_edge("validate", "transform")
.add_edge("transform", "upload")
.build()
)
executor = dagron.CheckpointExecutor(dag, checkpoint_dir="./checkpoints")
result = executor.execute({
"download": lambda: "raw_data",
"parse": lambda: "parsed",
"validate": lambda: "valid",
"transform": lambda: "transformed",
"upload": lambda: "uploaded",
})
print(result.succeeded) # 5
resume
def resume(
tasks: dict[str, Callable],
) -> ExecutionResult
Resume execution from the last checkpoint. Nodes that completed successfully in a previous run are skipped (their saved results are loaded). Nodes that failed or were never started are re-executed.
| Parameter | Type | Default | Description |
|---|---|---|---|
tasks | dict[str, Callable] | required | Map of node names to callable tasks (must include all nodes, not just remaining ones). |
Returns: ExecutionResult
# Suppose download and parse completed, but validate failed.
# Fix the issue and resume:
executor = dagron.CheckpointExecutor(dag, checkpoint_dir="./checkpoints")
result = executor.resume({
"download": lambda: "raw_data",
"parse": lambda: "parsed",
"validate": lambda: "valid", # fixed
"transform": lambda: "transformed",
"upload": lambda: "uploaded",
})
print(result.succeeded) # 5
# download and parse were loaded from checkpoint
# validate, transform, upload were re-executed
checkpoint_info
def checkpoint_info() -> CheckpointInfo | None
Return information about the current checkpoint state, or None if no
checkpoint exists.
Returns: CheckpointInfo or None
info = executor.checkpoint_info()
if info is not None:
print(f"Completed: {len(info.completed_nodes)}/{info.total_nodes}")
print(f"Failed: {info.failed_nodes}")
print(f"Timestamp: {info.timestamp}")
else:
print("No checkpoint found.")
clear_checkpoint
def clear_checkpoint() -> None
Delete all checkpoint files from the checkpoint directory. Use this after a successful run to clean up, or to force a fresh execution on the next call.
executor.clear_checkpoint()
assert executor.checkpoint_info() is None
CheckpointInfo
class CheckpointInfo(
checkpoint_dir: str,
completed_nodes: list[str],
failed_nodes: list[str],
total_nodes: int,
timestamp: str,
)
Metadata about the current checkpoint state.
| Property | Type | Description |
|---|---|---|
checkpoint_dir | str | The directory where checkpoint files are stored. |
completed_nodes | list[str] | Names of nodes that completed successfully. |
failed_nodes | list[str] | Names of nodes that failed. |
total_nodes | int | Total number of nodes in the DAG. |
timestamp | str | ISO-8601 timestamp of the last checkpoint write. |
info = executor.checkpoint_info()
print(f"Progress: {len(info.completed_nodes)}/{info.total_nodes}")
print(f"Completed: {info.completed_nodes}")
print(f"Failed: {info.failed_nodes}")
print(f"Last updated: {info.timestamp}")
Complete Example
A long-running data pipeline with checkpoint-and-resume:
import dagron
dag = (
dagron.DAG.builder()
.add_node("fetch_users")
.add_node("fetch_orders")
.add_node("join")
.add_node("enrich")
.add_node("validate")
.add_node("write_parquet")
.add_node("upload_s3")
.add_edge("fetch_users", "join")
.add_edge("fetch_orders", "join")
.add_edge("join", "enrich")
.add_edge("enrich", "validate")
.add_edge("validate", "write_parquet")
.add_edge("write_parquet", "upload_s3")
.build()
)
tasks = {
"fetch_users": lambda: "1M users fetched",
"fetch_orders": lambda: "5M orders fetched",
"join": lambda: "joined dataset",
"enrich": lambda: "enriched with geo data",
"validate": lambda: "all checks passed",
"write_parquet": lambda: "wrote 2GB parquet",
"upload_s3": lambda: "uploaded to s3://bucket/output",
}
executor = dagron.CheckpointExecutor(dag, checkpoint_dir="/tmp/pipeline_ckpt")
# First attempt — may fail partway through
try:
result = executor.execute(tasks)
except Exception:
info = executor.checkpoint_info()
print(f"Interrupted: {len(info.completed_nodes)}/{info.total_nodes} complete")
# Resume after fixing the issue
result = executor.resume(tasks)
print(f"All done: {result.succeeded} nodes succeeded")
# Clean up
executor.clear_checkpoint()
Checkpoint File Layout
The checkpoint directory contains one file per completed node, plus a metadata file:
./checkpoints/
_meta.json # CheckpointInfo (completed/failed lists, timestamp)
fetch_users.pkl # Pickled result of fetch_users
fetch_orders.pkl # Pickled result of fetch_orders
join.pkl # Pickled result of join
...
Checkpoint files use Python's pickle module. Only resume from checkpoints you
trust. Do not load checkpoint files from untrusted sources.
Related
- DAGExecutor — the base executor without checkpointing.
- Caching — content-addressable caching (complementary to checkpointing).
- Incremental Execution — re-execute only changed nodes.
- Checkpointing guide — usage patterns and recovery strategies.