Skip to main content

Checkpointing

The checkpointing module allows you to persist execution progress to disk and resume after failures. When a node completes, its result is saved to a checkpoint directory. If execution is interrupted (crash, timeout, manual stop), you can resume from where it left off without re-executing completed nodes.

See the Checkpointing guide for usage patterns and failure recovery strategies.


CheckpointExecutor

CheckpointExecutor
class CheckpointExecutor(
dag: DAG,
checkpoint_dir: str,
callbacks: ExecutionCallbacks | None = None,
fail_fast: bool = True,
enable_tracing: bool = False,
)

An executor that saves completed node results to a checkpoint directory. On failure, call .resume() to pick up where execution left off.

ParameterTypeDefaultDescription
dagDAGrequiredThe DAG whose topology drives execution order.
checkpoint_dirstrrequiredDirectory for storing checkpoint files. Created if it does not exist.
callbacksExecutionCallbacks | NoneNoneOptional lifecycle callbacks.
fail_fastboolTrueIf True, skip downstream nodes when any node fails.
enable_tracingboolFalseIf True, record a Chrome-compatible execution trace.

execute

CheckpointExecutor.execute
def execute(
tasks: dict[str, Callable],
) -> ExecutionResult

Execute all tasks, saving results to the checkpoint directory as each node completes. If a previous checkpoint exists, it is cleared and a fresh execution begins.

ParameterTypeDefaultDescription
tasksdict[str, Callable]requiredMap of node names to callable tasks.

Returns: ExecutionResult

import dagron

dag = (
dagron.DAG.builder()
.add_node("download").add_node("parse").add_node("validate")
.add_node("transform").add_node("upload")
.add_edge("download", "parse")
.add_edge("parse", "validate")
.add_edge("validate", "transform")
.add_edge("transform", "upload")
.build()
)

executor = dagron.CheckpointExecutor(dag, checkpoint_dir="./checkpoints")
result = executor.execute({
"download": lambda: "raw_data",
"parse": lambda: "parsed",
"validate": lambda: "valid",
"transform": lambda: "transformed",
"upload": lambda: "uploaded",
})

print(result.succeeded) # 5

resume

CheckpointExecutor.resume
def resume(
tasks: dict[str, Callable],
) -> ExecutionResult

Resume execution from the last checkpoint. Nodes that completed successfully in a previous run are skipped (their saved results are loaded). Nodes that failed or were never started are re-executed.

ParameterTypeDefaultDescription
tasksdict[str, Callable]requiredMap of node names to callable tasks (must include all nodes, not just remaining ones).

Returns: ExecutionResult

# Suppose download and parse completed, but validate failed.
# Fix the issue and resume:
executor = dagron.CheckpointExecutor(dag, checkpoint_dir="./checkpoints")
result = executor.resume({
"download": lambda: "raw_data",
"parse": lambda: "parsed",
"validate": lambda: "valid", # fixed
"transform": lambda: "transformed",
"upload": lambda: "uploaded",
})

print(result.succeeded) # 5
# download and parse were loaded from checkpoint
# validate, transform, upload were re-executed

checkpoint_info

CheckpointExecutor.checkpoint_info
def checkpoint_info() -> CheckpointInfo | None

Return information about the current checkpoint state, or None if no checkpoint exists.

Returns: CheckpointInfo or None

info = executor.checkpoint_info()
if info is not None:
print(f"Completed: {len(info.completed_nodes)}/{info.total_nodes}")
print(f"Failed: {info.failed_nodes}")
print(f"Timestamp: {info.timestamp}")
else:
print("No checkpoint found.")

clear_checkpoint

CheckpointExecutor.clear_checkpoint
def clear_checkpoint() -> None

Delete all checkpoint files from the checkpoint directory. Use this after a successful run to clean up, or to force a fresh execution on the next call.

executor.clear_checkpoint()
assert executor.checkpoint_info() is None

CheckpointInfo

CheckpointInfo
class CheckpointInfo(
checkpoint_dir: str,
completed_nodes: list[str],
failed_nodes: list[str],
total_nodes: int,
timestamp: str,
)

Metadata about the current checkpoint state.

PropertyTypeDescription
checkpoint_dirstrThe directory where checkpoint files are stored.
completed_nodeslist[str]Names of nodes that completed successfully.
failed_nodeslist[str]Names of nodes that failed.
total_nodesintTotal number of nodes in the DAG.
timestampstrISO-8601 timestamp of the last checkpoint write.
info = executor.checkpoint_info()
print(f"Progress: {len(info.completed_nodes)}/{info.total_nodes}")
print(f"Completed: {info.completed_nodes}")
print(f"Failed: {info.failed_nodes}")
print(f"Last updated: {info.timestamp}")

Complete Example

A long-running data pipeline with checkpoint-and-resume:

import dagron

dag = (
dagron.DAG.builder()
.add_node("fetch_users")
.add_node("fetch_orders")
.add_node("join")
.add_node("enrich")
.add_node("validate")
.add_node("write_parquet")
.add_node("upload_s3")
.add_edge("fetch_users", "join")
.add_edge("fetch_orders", "join")
.add_edge("join", "enrich")
.add_edge("enrich", "validate")
.add_edge("validate", "write_parquet")
.add_edge("write_parquet", "upload_s3")
.build()
)

tasks = {
"fetch_users": lambda: "1M users fetched",
"fetch_orders": lambda: "5M orders fetched",
"join": lambda: "joined dataset",
"enrich": lambda: "enriched with geo data",
"validate": lambda: "all checks passed",
"write_parquet": lambda: "wrote 2GB parquet",
"upload_s3": lambda: "uploaded to s3://bucket/output",
}

executor = dagron.CheckpointExecutor(dag, checkpoint_dir="/tmp/pipeline_ckpt")

# First attempt — may fail partway through
try:
result = executor.execute(tasks)
except Exception:
info = executor.checkpoint_info()
print(f"Interrupted: {len(info.completed_nodes)}/{info.total_nodes} complete")

# Resume after fixing the issue
result = executor.resume(tasks)
print(f"All done: {result.succeeded} nodes succeeded")

# Clean up
executor.clear_checkpoint()

Checkpoint File Layout

The checkpoint directory contains one file per completed node, plus a metadata file:

./checkpoints/
_meta.json # CheckpointInfo (completed/failed lists, timestamp)
fetch_users.pkl # Pickled result of fetch_users
fetch_orders.pkl # Pickled result of fetch_orders
join.pkl # Pickled result of join
...
caution

Checkpoint files use Python's pickle module. Only resume from checkpoints you trust. Do not load checkpoint files from untrusted sources.