Checkpointing
Long-running pipelines fail. Networks drop, machines reboot, dependencies crash.
Without checkpointing, a failure at step 95 of 100 means re-running all 95
successful steps. dagron's CheckpointExecutor saves
progress to disk after each node completes, so you can resume from exactly where
you left off.
Concepts
Checkpoint directory
The CheckpointExecutor writes checkpoint files to a directory you specify. Each
file records the status and result of a completed node. On resume, the executor
reads these files, skips already-completed nodes, and picks up from the first
incomplete node.
Execute / resume cycle
- First run: call
.execute(tasks). If everything succeeds, checkpoint files are cleaned up automatically. - Failure: some nodes fail. Checkpoint files for completed nodes remain on disk.
- Resume: call
.resume(tasks). The executor reads the checkpoint, skips completed nodes, and retries the failed and remaining nodes.
Checkpoint info
The .checkpoint_info() method returns metadata about the current checkpoint
state: which nodes are completed, which failed, and what the last run time was.
Basic usage
import dagron
import time
dag = (
dagron.DAG.builder()
.add_edge("extract", "transform")
.add_edge("transform", "validate")
.add_edge("validate", "load")
.add_edge("load", "notify")
.build()
)
Five-step pipeline. We will simulate a failure at the 'load' step.
First run with a failure
call_count = 0
def flaky_load():
"""Simulates a task that fails on the first attempt."""
global call_count
call_count += 1
if call_count == 1:
raise ConnectionError("Database connection lost")
return "loaded 1000 rows"
tasks = {
"extract": lambda: time.sleep(2) or "extracted 1000 rows",
"transform": lambda: time.sleep(1) or "transformed",
"validate": lambda: "all rows valid",
"load": flaky_load,
"notify": lambda: "email sent",
}
executor = dagron.CheckpointExecutor(dag, checkpoint_dir="/tmp/pipeline_checkpoint")
result = executor.execute(tasks)
print(result.succeeded) # 3 (extract, transform, validate)
print(result.failed) # 1 (load)
print(result.skipped) # 1 (notify)
After this run, the checkpoint directory contains files for the three completed nodes:
| Node | Status | Checkpointed? |
|---|---|---|
| extract | completed | Yes |
| transform | completed | Yes |
| validate | completed | Yes |
| load | failed | No |
| notify | skipped | No |
Resume after fixing the issue
# The flaky_load function will succeed on the second call (call_count is now 1)
result = executor.resume(tasks)
print(result.succeeded) # 2 (load, notify — the rest were restored from checkpoint)
print(result.failed) # 0
| Node | Status | Source |
|---|---|---|
| extract | completed | Restored from checkpoint |
| transform | completed | Restored from checkpoint |
| validate | completed | Restored from checkpoint |
| load | completed | Re-executed |
| notify | completed | Executed |
On resume, extract/transform/validate are restored from checkpoint (blue). Only load and notify execute (green).
The expensive extract (2 seconds) and transform (1 second) steps were not
re-run, saving 3 seconds on the resume.
Constructor
dagron.CheckpointExecutor(
dag, # The DAG to execute
checkpoint_dir, # Path to the checkpoint directory (str or Path)
)
The directory is created automatically if it does not exist.
API methods
execute(tasks)
Runs the pipeline from scratch, checkpointing each completed node:
result = executor.execute(tasks)
If the entire pipeline succeeds, checkpoint files are cleaned up. If any node fails, checkpoint files for completed nodes remain.
resume(tasks)
Reads the checkpoint and resumes from the last incomplete node:
result = executor.resume(tasks)
Nodes that were previously completed are not re-executed — their results are restored from the checkpoint.
checkpoint_info()
Returns metadata about the current checkpoint:
info = executor.checkpoint_info()
print(info)
Output:
CheckpointInfo(
exists=True,
completed_nodes=['extract', 'transform', 'validate'],
failed_nodes=['load'],
skipped_nodes=['notify'],
last_run_time='2025-01-15T10:30:00',
total_checkpointed=3,
)
Use this to build monitoring dashboards or decide whether to resume or start fresh.
clear_checkpoint()
Remove all checkpoint files:
executor.clear_checkpoint()
info = executor.checkpoint_info()
print(info.exists) # False
This is useful when you want to force a full re-run.
Complete example: ETL with retry
Here is a production-style pattern with automatic retry:
import dagron
import time
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("pipeline")
dag = (
dagron.DAG.builder()
.add_nodes(["fetch_api", "fetch_db", "merge", "transform",
"validate", "load", "update_dashboard"])
.add_edges([
("fetch_api", "merge"),
("fetch_db", "merge"),
("merge", "transform"),
("transform", "validate"),
("validate", "load"),
("load", "update_dashboard"),
])
.build()
)
tasks = {
"fetch_api": lambda: time.sleep(5) or {"api_rows": 10000},
"fetch_db": lambda: time.sleep(3) or {"db_rows": 50000},
"merge": lambda: {"total_rows": 60000},
"transform": lambda: time.sleep(10) or {"transformed": 60000},
"validate": lambda: {"valid": True, "bad_rows": 0},
"load": lambda: time.sleep(2) or "loaded",
"update_dashboard": lambda: "dashboard updated",
}
CHECKPOINT_DIR = "/var/data/pipeline_checkpoints/etl_daily"
def run_with_retry(max_attempts=3):
executor = dagron.CheckpointExecutor(dag, checkpoint_dir=CHECKPOINT_DIR)
# Check if there is an existing checkpoint to resume
info = executor.checkpoint_info()
if info.exists:
logger.info(f"Found checkpoint with {info.total_checkpointed} completed nodes")
logger.info(f"Resuming from last failure...")
result = executor.resume(tasks)
else:
logger.info("Starting fresh execution")
result = executor.execute(tasks)
# Retry loop for transient failures
attempts = 1
while result.failed > 0 and attempts < max_attempts:
attempts += 1
logger.warning(f"Attempt {attempts}: {result.failed} nodes failed, retrying...")
time.sleep(5) # back off before retry
result = executor.resume(tasks)
if result.failed > 0:
logger.error(f"Pipeline failed after {attempts} attempts")
# Leave checkpoint for manual inspection
else:
logger.info(f"Pipeline succeeded in {attempts} attempt(s)")
# Checkpoint auto-cleaned on full success
return result
result = run_with_retry()
ETL pipeline with automatic checkpoint-based retry.
Checkpointing with parallel branches
Checkpointing works correctly with fan-out / fan-in topologies. Each completed node is checkpointed independently:
dag = (
dagron.DAG.builder()
.add_edge("source", "branch_a")
.add_edge("source", "branch_b")
.add_edge("source", "branch_c")
.add_edge("branch_a", "join")
.add_edge("branch_b", "join")
.add_edge("branch_c", "join")
.build()
)
If branch_b fails but source, branch_a, and branch_c succeed, the
checkpoint stores all three. On resume, only branch_b and join need to
run.
| Node | First run | Resume |
|---|---|---|
| source | completed | Restored |
| branch_a | completed | Restored |
| branch_b | failed | Re-executed |
| branch_c | completed | Restored |
| join | skipped | Executed |
On resume, only branch_b and join need to execute.
Monitoring checkpoint state
Build observability around checkpoint state:
def monitor_pipeline(executor):
info = executor.checkpoint_info()
if not info.exists:
print("No checkpoint found — pipeline has not run or completed cleanly.")
return
total = dag.node_count()
completed = info.total_checkpointed
pct = (completed / total) * 100
print(f"Pipeline progress: {completed}/{total} ({pct:.0f}%)")
print(f"Completed: {info.completed_nodes}")
print(f"Failed: {info.failed_nodes}")
print(f"Remaining: {total - completed - len(info.failed_nodes)}")
print(f"Last run: {info.last_run_time}")
Integrating with alerting
import json
def checkpoint_to_metrics(executor):
"""Export checkpoint state as metrics for Prometheus/DataDog."""
info = executor.checkpoint_info()
return {
"pipeline.checkpoint.exists": 1 if info.exists else 0,
"pipeline.checkpoint.completed": info.total_checkpointed,
"pipeline.checkpoint.failed": len(info.failed_nodes),
"pipeline.checkpoint.progress_pct": (
info.total_checkpointed / dag.node_count() * 100
if info.exists else 0
),
}
Checkpoint directory structure
The checkpoint directory contains one file per completed node plus a metadata file:
/tmp/pipeline_checkpoint/
_metadata.json # Run metadata (start time, DAG hash, etc.)
extract.checkpoint # Serialized NodeResult for 'extract'
transform.checkpoint # Serialized NodeResult for 'transform'
validate.checkpoint # Serialized NodeResult for 'validate'
Do not modify checkpoint files manually. The executor validates file integrity on resume and will reject tampered checkpoints.
When to clear checkpoints
Clear checkpoints when:
- The DAG structure changed. Adding or removing nodes invalidates the checkpoint. The executor detects this and raises an error on resume.
- Task logic changed. If you fixed a bug in a task that already checkpointed successfully, you need to re-run it.
- You want a clean start. For periodic batch pipelines, clear the checkpoint before each scheduled run.
# Before a scheduled daily run
executor.clear_checkpoint()
result = executor.execute(tasks)
Combining with other features
Checkpointing + Incremental execution
Use checkpointing for fault tolerance and incremental execution for change-based optimization:
# First: use IncrementalExecutor to determine what needs to run
inc_executor = dagron.IncrementalExecutor(dag)
inc_result = inc_executor.execute(tasks, changed_nodes=["source"])
# Then: use CheckpointExecutor for fault tolerance on the re-execution
cp_executor = dagron.CheckpointExecutor(dag, checkpoint_dir="/tmp/cp")
cp_result = cp_executor.execute(tasks)
Checkpointing + Tracing
executor = dagron.CheckpointExecutor(dag, checkpoint_dir="/tmp/cp")
result = executor.execute(tasks)
# Note: tracing captures only the nodes that actually executed,
# not the ones restored from checkpoint.
Checkpointing + Conditional execution
Checkpoint files record the status of conditionally-skipped nodes, so resume correctly handles conditional branches:
# A conditionally-skipped node is recorded as SKIPPED in the checkpoint
# and remains skipped on resume.
Best practices
-
Use unique checkpoint directories. For concurrent pipeline runs, use unique directories (e.g., include a run ID or timestamp):
checkpoint_dir = f"/tmp/checkpoints/run_{run_id}" -
Clean up old checkpoints. Implement a retention policy to avoid accumulating stale checkpoint directories.
-
Monitor checkpoint size. Node results are serialized to disk. If tasks return large objects (dataframes, models), checkpoint files can grow large. Consider returning metadata references instead.
-
Handle DAG changes gracefully. Before resuming, compare the current DAG hash against the checkpoint metadata. If they differ, clear and re-run.
-
Test your resume path. Deliberately inject failures in tests and verify that resume produces the correct final results.
-
Set appropriate file permissions. Checkpoint files may contain sensitive results. Ensure the checkpoint directory has restrictive permissions.
API reference
| Class / Method | Docs |
|---|---|
CheckpointExecutor | Checkpoint |
CheckpointExecutor.execute() | Checkpoint |
CheckpointExecutor.resume() | Checkpoint |
CheckpointExecutor.checkpoint_info() | Checkpoint |
CheckpointExecutor.clear_checkpoint() | Checkpoint |
Next steps
- Incremental Execution — only re-run what changed.
- Tracing & Profiling — analyze resume execution performance.
- Conditional Execution — gate branches with runtime predicates.
- Getting Started — back to the basics.