Skip to main content

Checkpointing

Long-running pipelines fail. Networks drop, machines reboot, dependencies crash. Without checkpointing, a failure at step 95 of 100 means re-running all 95 successful steps. dagron's CheckpointExecutor saves progress to disk after each node completes, so you can resume from exactly where you left off.

Concepts

Checkpoint directory

The CheckpointExecutor writes checkpoint files to a directory you specify. Each file records the status and result of a completed node. On resume, the executor reads these files, skips already-completed nodes, and picks up from the first incomplete node.

Execute / resume cycle

  1. First run: call .execute(tasks). If everything succeeds, checkpoint files are cleaned up automatically.
  2. Failure: some nodes fail. Checkpoint files for completed nodes remain on disk.
  3. Resume: call .resume(tasks). The executor reads the checkpoint, skips completed nodes, and retries the failed and remaining nodes.

Checkpoint info

The .checkpoint_info() method returns metadata about the current checkpoint state: which nodes are completed, which failed, and what the last run time was.

Basic usage

import dagron
import time

dag = (
dagron.DAG.builder()
.add_edge("extract", "transform")
.add_edge("transform", "validate")
.add_edge("validate", "load")
.add_edge("load", "notify")
.build()
)

Five-step pipeline. We will simulate a failure at the 'load' step.

First run with a failure

call_count = 0

def flaky_load():
"""Simulates a task that fails on the first attempt."""
global call_count
call_count += 1
if call_count == 1:
raise ConnectionError("Database connection lost")
return "loaded 1000 rows"

tasks = {
"extract": lambda: time.sleep(2) or "extracted 1000 rows",
"transform": lambda: time.sleep(1) or "transformed",
"validate": lambda: "all rows valid",
"load": flaky_load,
"notify": lambda: "email sent",
}

executor = dagron.CheckpointExecutor(dag, checkpoint_dir="/tmp/pipeline_checkpoint")
result = executor.execute(tasks)

print(result.succeeded) # 3 (extract, transform, validate)
print(result.failed) # 1 (load)
print(result.skipped) # 1 (notify)

After this run, the checkpoint directory contains files for the three completed nodes:

NodeStatusCheckpointed?
extractcompletedYes
transformcompletedYes
validatecompletedYes
loadfailedNo
notifyskippedNo

Resume after fixing the issue

# The flaky_load function will succeed on the second call (call_count is now 1)
result = executor.resume(tasks)

print(result.succeeded) # 2 (load, notify — the rest were restored from checkpoint)
print(result.failed) # 0
NodeStatusSource
extractcompletedRestored from checkpoint
transformcompletedRestored from checkpoint
validatecompletedRestored from checkpoint
loadcompletedRe-executed
notifycompletedExecuted

On resume, extract/transform/validate are restored from checkpoint (blue). Only load and notify execute (green).

The expensive extract (2 seconds) and transform (1 second) steps were not re-run, saving 3 seconds on the resume.

Constructor

dagron.CheckpointExecutor(
dag, # The DAG to execute
checkpoint_dir, # Path to the checkpoint directory (str or Path)
)

The directory is created automatically if it does not exist.

API methods

execute(tasks)

Runs the pipeline from scratch, checkpointing each completed node:

result = executor.execute(tasks)

If the entire pipeline succeeds, checkpoint files are cleaned up. If any node fails, checkpoint files for completed nodes remain.

resume(tasks)

Reads the checkpoint and resumes from the last incomplete node:

result = executor.resume(tasks)

Nodes that were previously completed are not re-executed — their results are restored from the checkpoint.

checkpoint_info()

Returns metadata about the current checkpoint:

info = executor.checkpoint_info()
print(info)

Output:

CheckpointInfo(
exists=True,
completed_nodes=['extract', 'transform', 'validate'],
failed_nodes=['load'],
skipped_nodes=['notify'],
last_run_time='2025-01-15T10:30:00',
total_checkpointed=3,
)

Use this to build monitoring dashboards or decide whether to resume or start fresh.

clear_checkpoint()

Remove all checkpoint files:

executor.clear_checkpoint()
info = executor.checkpoint_info()
print(info.exists) # False

This is useful when you want to force a full re-run.

Complete example: ETL with retry

Here is a production-style pattern with automatic retry:

import dagron
import time
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("pipeline")

dag = (
dagron.DAG.builder()
.add_nodes(["fetch_api", "fetch_db", "merge", "transform",
"validate", "load", "update_dashboard"])
.add_edges([
("fetch_api", "merge"),
("fetch_db", "merge"),
("merge", "transform"),
("transform", "validate"),
("validate", "load"),
("load", "update_dashboard"),
])
.build()
)

tasks = {
"fetch_api": lambda: time.sleep(5) or {"api_rows": 10000},
"fetch_db": lambda: time.sleep(3) or {"db_rows": 50000},
"merge": lambda: {"total_rows": 60000},
"transform": lambda: time.sleep(10) or {"transformed": 60000},
"validate": lambda: {"valid": True, "bad_rows": 0},
"load": lambda: time.sleep(2) or "loaded",
"update_dashboard": lambda: "dashboard updated",
}

CHECKPOINT_DIR = "/var/data/pipeline_checkpoints/etl_daily"

def run_with_retry(max_attempts=3):
executor = dagron.CheckpointExecutor(dag, checkpoint_dir=CHECKPOINT_DIR)

# Check if there is an existing checkpoint to resume
info = executor.checkpoint_info()
if info.exists:
logger.info(f"Found checkpoint with {info.total_checkpointed} completed nodes")
logger.info(f"Resuming from last failure...")
result = executor.resume(tasks)
else:
logger.info("Starting fresh execution")
result = executor.execute(tasks)

# Retry loop for transient failures
attempts = 1
while result.failed > 0 and attempts < max_attempts:
attempts += 1
logger.warning(f"Attempt {attempts}: {result.failed} nodes failed, retrying...")
time.sleep(5) # back off before retry
result = executor.resume(tasks)

if result.failed > 0:
logger.error(f"Pipeline failed after {attempts} attempts")
# Leave checkpoint for manual inspection
else:
logger.info(f"Pipeline succeeded in {attempts} attempt(s)")
# Checkpoint auto-cleaned on full success

return result

result = run_with_retry()

ETL pipeline with automatic checkpoint-based retry.

Checkpointing with parallel branches

Checkpointing works correctly with fan-out / fan-in topologies. Each completed node is checkpointed independently:

dag = (
dagron.DAG.builder()
.add_edge("source", "branch_a")
.add_edge("source", "branch_b")
.add_edge("source", "branch_c")
.add_edge("branch_a", "join")
.add_edge("branch_b", "join")
.add_edge("branch_c", "join")
.build()
)

If branch_b fails but source, branch_a, and branch_c succeed, the checkpoint stores all three. On resume, only branch_b and join need to run.

NodeFirst runResume
sourcecompletedRestored
branch_acompletedRestored
branch_bfailedRe-executed
branch_ccompletedRestored
joinskippedExecuted

On resume, only branch_b and join need to execute.

Monitoring checkpoint state

Build observability around checkpoint state:

def monitor_pipeline(executor):
info = executor.checkpoint_info()

if not info.exists:
print("No checkpoint found — pipeline has not run or completed cleanly.")
return

total = dag.node_count()
completed = info.total_checkpointed
pct = (completed / total) * 100

print(f"Pipeline progress: {completed}/{total} ({pct:.0f}%)")
print(f"Completed: {info.completed_nodes}")
print(f"Failed: {info.failed_nodes}")
print(f"Remaining: {total - completed - len(info.failed_nodes)}")
print(f"Last run: {info.last_run_time}")

Integrating with alerting

import json

def checkpoint_to_metrics(executor):
"""Export checkpoint state as metrics for Prometheus/DataDog."""
info = executor.checkpoint_info()
return {
"pipeline.checkpoint.exists": 1 if info.exists else 0,
"pipeline.checkpoint.completed": info.total_checkpointed,
"pipeline.checkpoint.failed": len(info.failed_nodes),
"pipeline.checkpoint.progress_pct": (
info.total_checkpointed / dag.node_count() * 100
if info.exists else 0
),
}

Checkpoint directory structure

The checkpoint directory contains one file per completed node plus a metadata file:

/tmp/pipeline_checkpoint/
_metadata.json # Run metadata (start time, DAG hash, etc.)
extract.checkpoint # Serialized NodeResult for 'extract'
transform.checkpoint # Serialized NodeResult for 'transform'
validate.checkpoint # Serialized NodeResult for 'validate'
caution

Do not modify checkpoint files manually. The executor validates file integrity on resume and will reject tampered checkpoints.

When to clear checkpoints

Clear checkpoints when:

  • The DAG structure changed. Adding or removing nodes invalidates the checkpoint. The executor detects this and raises an error on resume.
  • Task logic changed. If you fixed a bug in a task that already checkpointed successfully, you need to re-run it.
  • You want a clean start. For periodic batch pipelines, clear the checkpoint before each scheduled run.
# Before a scheduled daily run
executor.clear_checkpoint()
result = executor.execute(tasks)

Combining with other features

Checkpointing + Incremental execution

Use checkpointing for fault tolerance and incremental execution for change-based optimization:

# First: use IncrementalExecutor to determine what needs to run
inc_executor = dagron.IncrementalExecutor(dag)
inc_result = inc_executor.execute(tasks, changed_nodes=["source"])

# Then: use CheckpointExecutor for fault tolerance on the re-execution
cp_executor = dagron.CheckpointExecutor(dag, checkpoint_dir="/tmp/cp")
cp_result = cp_executor.execute(tasks)

Checkpointing + Tracing

executor = dagron.CheckpointExecutor(dag, checkpoint_dir="/tmp/cp")
result = executor.execute(tasks)

# Note: tracing captures only the nodes that actually executed,
# not the ones restored from checkpoint.

Checkpointing + Conditional execution

Checkpoint files record the status of conditionally-skipped nodes, so resume correctly handles conditional branches:

# A conditionally-skipped node is recorded as SKIPPED in the checkpoint
# and remains skipped on resume.

Best practices

  1. Use unique checkpoint directories. For concurrent pipeline runs, use unique directories (e.g., include a run ID or timestamp):

    checkpoint_dir = f"/tmp/checkpoints/run_{run_id}"
  2. Clean up old checkpoints. Implement a retention policy to avoid accumulating stale checkpoint directories.

  3. Monitor checkpoint size. Node results are serialized to disk. If tasks return large objects (dataframes, models), checkpoint files can grow large. Consider returning metadata references instead.

  4. Handle DAG changes gracefully. Before resuming, compare the current DAG hash against the checkpoint metadata. If they differ, clear and re-run.

  5. Test your resume path. Deliberately inject failures in tests and verify that resume produces the correct final results.

  6. Set appropriate file permissions. Checkpoint files may contain sensitive results. Ensure the checkpoint directory has restrictive permissions.

API reference

Class / MethodDocs
CheckpointExecutorCheckpoint
CheckpointExecutor.execute()Checkpoint
CheckpointExecutor.resume()Checkpoint
CheckpointExecutor.checkpoint_info()Checkpoint
CheckpointExecutor.clear_checkpoint()Checkpoint

Next steps