Error Handling
Pipelines fail. Data is missing, APIs time out, code has bugs. dagron provides a structured error hierarchy, clear fail-fast semantics, and patterns for graceful error recovery so you can build pipelines that handle failure predictably.
Error Hierarchy
All dagron-specific errors inherit from DagronError, which itself inherits from Python's Exception. This gives you a single base class to catch all dagron errors:
Core Errors
These are raised by the Rust-backed graph engine:
| Error | When it is raised |
|---|---|
DagronError | Base class for all dagron errors. Never raised directly. |
GraphError | General graph structure error (e.g., invalid operation on the graph). |
CycleError | Adding an edge would create a cycle, violating the DAG property. |
DuplicateNodeError | Adding a node with a name that already exists in the graph. |
NodeNotFoundError | Referencing a node name that does not exist. |
EdgeNotFoundError | Referencing an edge that does not exist (e.g., during removal). |
Execution Errors
These are raised during task execution:
| Error | When it is raised |
|---|---|
GateRejectedError | An approval gate was rejected. |
GateTimeoutError | An approval gate timed out before a decision was made. |
TemplateError | A template parameter is invalid. |
Graph Construction Errors
CycleError
The most common graph error. Raised when adding an edge would create a cycle:
import dagron
dag = dagron.DAG()
dag.add_node("a")
dag.add_node("b")
dag.add_node("c")
dag.add_edge("a", "b")
dag.add_edge("b", "c")
try:
dag.add_edge("c", "a") # would create a -> b -> c -> a cycle
except dagron.CycleError as e:
print(f"Cycle detected: {e}")
The edge c -> a is rejected because it would create a cycle.
Use the builder pattern with allow_cycles=False (the default) to catch cycles at build time:
try:
dag = (
dagron.DAG.builder()
.add_node("a").add_node("b").add_node("c")
.add_edge("a", "b")
.add_edge("b", "c")
.add_edge("c", "a") # CycleError raised here
.build()
)
except dagron.CycleError:
print("Cannot build: graph contains a cycle")
DuplicateNodeError
Raised when you try to add a node with a name that already exists:
dag = dagron.DAG()
dag.add_node("extract")
try:
dag.add_node("extract") # already exists
except dagron.DuplicateNodeError as e:
print(f"Duplicate: {e}")
NodeNotFoundError
Raised when referencing a node that does not exist:
dag = dagron.DAG()
dag.add_node("a")
try:
dag.add_edge("a", "b") # "b" does not exist
except dagron.NodeNotFoundError as e:
print(f"Not found: {e}")
Also raised when trying to get the payload, metadata, or predecessors of a nonexistent node:
try:
dag.get_payload("nonexistent")
except dagron.NodeNotFoundError:
print("Node does not exist")
EdgeNotFoundError
Raised when trying to remove or reference an edge that does not exist:
dag = dagron.DAG()
dag.add_node("a")
dag.add_node("b")
try:
dag.remove_edge("a", "b") # no edge between a and b
except dagron.EdgeNotFoundError as e:
print(f"Edge not found: {e}")
GraphError
A general graph error for operations that are not covered by the more specific errors:
try:
dag.some_invalid_operation()
except dagron.GraphError as e:
print(f"Graph error: {e}")
Catching All dagron Errors
Use DagronError as a catch-all:
import dagron
try:
dag = (
dagron.DAG.builder()
.add_node("a")
.add_edge("a", "nonexistent")
.build()
)
except dagron.DagronError as e:
print(f"dagron error: {type(e).__name__}: {e}")
This catches CycleError, DuplicateNodeError, NodeNotFoundError, EdgeNotFoundError, and GraphError.
Fail-Fast Execution
During execution, dagron uses fail-fast semantics by default. When a node fails, all downstream nodes are immediately skipped:
dag = (
dagron.DAG.builder()
.add_node("extract")
.add_node("transform")
.add_node("load")
.add_node("report")
.add_edge("extract", "transform")
.add_edge("transform", "load")
.add_edge("load", "report")
.build()
)
def failing_transform():
raise ValueError("Bad data format")
executor = dagron.DAGExecutor(dag)
result = executor.execute({
"extract": lambda: "raw data",
"transform": failing_transform,
"load": lambda: "loaded",
"report": lambda: "report",
})
for name, nr in result.node_results.items():
print(f" {name}: {nr.status.value}")
Output:
extract: completed
transform: failed
load: skipped
report: skipped
When transform fails, load and report are skipped.
Disabling Fail-Fast
Set fail_fast=False to let independent branches continue executing:
dag = (
dagron.DAG.builder()
.add_node("extract")
.add_node("path_a")
.add_node("path_b")
.add_node("merge")
.add_edge("extract", "path_a")
.add_edge("extract", "path_b")
.add_edge("path_a", "merge")
.add_edge("path_b", "merge")
.build()
)
executor = dagron.DAGExecutor(dag, fail_fast=False)
result = executor.execute({
"extract": lambda: "data",
"path_a": lambda: 1 / 0, # fails
"path_b": lambda: "success", # still runs
"merge": lambda: "merged", # still runs (has at least one completed dep)
})
for name, nr in result.node_results.items():
print(f" {name}: {nr.status.value}")
Output:
extract: completed
path_a: failed
path_b: completed
merge: completed
Inspecting Node Errors
Each NodeResult contains the original exception if the node failed:
for name, nr in result.node_results.items():
if nr.status == dagron.NodeStatus.FAILED:
print(f"Node '{name}' failed:")
print(f" Error type: {type(nr.error).__name__}")
print(f" Message: {nr.error}")
print(f" Duration: {nr.duration_seconds:.3f}s")
ExecutionResult Summary
The ExecutionResult provides aggregate counts:
result = executor.execute(tasks)
print(f"Succeeded: {result.succeeded}")
print(f"Failed: {result.failed}")
print(f"Skipped: {result.skipped}")
print(f"Timed out: {result.timed_out}")
print(f"Cancelled: {result.cancelled}")
print(f"Duration: {result.total_duration_seconds:.1f}s")
# Check if the entire execution succeeded
if result.failed == 0:
print("All nodes completed successfully")
else:
print(f"{result.failed} node(s) failed")
Error Recovery Patterns
Pattern: Retry with Backoff
Wrap tasks in a retry decorator:
import time
def retry(fn, max_retries=3, backoff=1.0):
"""Retry a task function with exponential backoff."""
def wrapper():
last_error = None
for attempt in range(max_retries):
try:
return fn()
except Exception as e:
last_error = e
if attempt < max_retries - 1:
time.sleep(backoff * (2 ** attempt))
raise last_error
return wrapper
tasks = {
"fetch_api": retry(lambda: call_flaky_api(), max_retries=3),
"process": lambda: process_data(),
}
Pattern: Fallback Values
Provide a fallback when a task fails:
def with_fallback(fn, default):
"""Return a default value if the task fails."""
def wrapper():
try:
return fn()
except Exception:
return default
return wrapper
tasks = {
"fetch_cache": with_fallback(lambda: get_from_cache(), default=None),
"fetch_api": with_fallback(lambda: get_from_api(), default={}),
}
Pattern: Error Callbacks
Use execution callbacks to log errors as they happen:
from dagron.execution._types import ExecutionCallbacks
def on_failure(name, error):
log.error(f"Node '{name}' failed: {error}")
send_alert(f"Pipeline node '{name}' failed")
callbacks = ExecutionCallbacks(
on_failure=on_failure,
)
executor = dagron.DAGExecutor(dag, callbacks=callbacks)
Pattern: Checkpoint and Resume
Use checkpointing to save progress and resume after fixing the failure:
from dagron.execution.checkpoint import CheckpointExecutor
executor = CheckpointExecutor(dag, checkpoint_dir="/tmp/checkpoints")
result = executor.execute(tasks)
if result.failed > 0:
# Fix the failing task, then resume from the checkpoint
result = executor.resume(tasks)
Pattern: Graceful Degradation
Design your DAG so that non-critical branches can fail without affecting the critical path:
dag = (
dagron.DAG.builder()
.add_node("extract")
.add_node("transform") # critical
.add_node("load") # critical
.add_node("send_metrics") # non-critical
.add_node("send_slack") # non-critical
.add_edge("extract", "transform")
.add_edge("transform", "load")
.add_edge("transform", "send_metrics")
.add_edge("load", "send_slack")
.build()
)
# With fail_fast=False, metrics/slack failures don't block load
executor = dagron.DAGExecutor(dag, fail_fast=False)
Non-critical branches can fail independently without affecting the critical path.
Gate Errors
Approval gates raise their own errors:
from dagron.execution.gates import GateRejectedError, GateTimeoutError
try:
controller.wait_sync("deploy")
except GateRejectedError as e:
print(f"Gate '{e.gate_name}' rejected: {e.reason}")
except GateTimeoutError as e:
print(f"Gate '{e.gate_name}' timed out after {e.timeout}s")
These errors are not subclasses of DagronError since they originate from the execution layer, not the graph engine.
Template Errors
Template parameter validation raises TemplateError:
from dagron.template import DAGTemplate, TemplateError
template = DAGTemplate(params={"env": str})
try:
template.render(env=42) # wrong type
except TemplateError as e:
print(f"Template error: {e}")
# "Parameter 'env' expects str, got int"
Error Handling in Hooks
Plugin hooks catch and warn on callback errors to prevent them from breaking execution:
from dagron.plugins.hooks import HookRegistry, HookEvent, HookContext
hooks = HookRegistry()
def buggy_hook(ctx: HookContext):
raise RuntimeError("hook crashed")
hooks.register(HookEvent.PRE_NODE, buggy_hook)
# This fires the hook but does NOT raise.
# Instead, a RuntimeWarning is issued.
hooks.fire(HookContext(event=HookEvent.PRE_NODE))
If you need to observe hook errors, use Python's warnings module:
import warnings
with warnings.catch_warnings(record=True) as w:
warnings.simplefilter("always")
hooks.fire(HookContext(event=HookEvent.PRE_NODE))
if w:
print(f"Hook warning: {w[0].message}")
Defensive DAG Construction
Use try/except around builder operations to build robust DAG construction code:
import dagron
def build_pipeline_safely(node_specs):
"""Build a DAG from specs, skipping invalid entries."""
builder = dagron.DAG.builder()
errors = []
for spec in node_specs:
try:
builder.add_node(spec["name"])
except dagron.DuplicateNodeError:
errors.append(f"Duplicate node: {spec['name']}")
for spec in node_specs:
for dep in spec.get("dependencies", []):
try:
builder.add_edge(dep, spec["name"])
except dagron.NodeNotFoundError as e:
errors.append(f"Missing dependency: {e}")
except dagron.CycleError as e:
errors.append(f"Would create cycle: {e}")
if errors:
print(f"Warnings during build ({len(errors)}):")
for err in errors:
print(f" - {err}")
return builder.build()
Error Hierarchy Summary
import dagron
from dagron.execution.gates import GateRejectedError, GateTimeoutError
from dagron.template import TemplateError
# Graph engine errors (from Rust)
dagron.DagronError # base class
dagron.GraphError # general graph error
dagron.CycleError # edge would create a cycle
dagron.DuplicateNodeError # node already exists
dagron.NodeNotFoundError # node does not exist
dagron.EdgeNotFoundError # edge does not exist
# Execution errors (from Python)
GateRejectedError # gate was rejected
GateTimeoutError # gate timed out
TemplateError # template parameter invalid
Best Practices
-
Catch specific errors. Use
CycleError,NodeNotFoundError, etc. instead of the broadDagronErrorwhen you know what might go wrong. -
Use
fail_fast=Trueby default. This prevents wasting compute on nodes that will fail anyway due to missing upstream data. -
Disable fail-fast for non-critical branches. If some branches are optional (metrics, notifications), use
fail_fast=Falseso they do not block the critical path. -
Log errors via callbacks. Use
ExecutionCallbacks.on_failureto capture errors as they happen, not just at the end. -
Inspect
NodeResult.error. When a node fails, the original exception is preserved for debugging. -
Use retry wrappers for transient failures. Network errors, API rate limits, and temporary file locks benefit from retry logic.
-
Checkpoint long-running pipelines. For pipelines that take hours, use checkpointing so you do not lose progress on failure.
Related
- API Reference: Errors -- full documentation for all error classes.
- Executing Tasks -- how fail-fast works in the executor.
- Approval Gates --
GateRejectedErrorandGateTimeoutError. - Templates --
TemplateError. - Checkpointing -- saving and resuming execution state.
- Plugins & Hooks -- error isolation in hooks.