Executing Tasks
Once you have a DAG, you need an executor to run
it. dagron ships with several executor types — from a simple thread-pool executor to
async, pipeline, conditional, dynamic, and incremental variants. This guide covers
the two general-purpose executors (DAGExecutor and AsyncDAGExecutor), the
Pipeline convenience API, and all the options you can tune.
Executor overview
| Executor | Runtime | Use case |
|---|---|---|
DAGExecutor | Thread pool | CPU-bound tasks, synchronous code |
AsyncDAGExecutor | asyncio | I/O-bound tasks, async/await code |
Pipeline | Thread pool | Simple linear chains with @task |
Specialised executors are covered in their own guides: Incremental, Conditional, Dynamic, Checkpointing.
DAGExecutor
The workhorse executor. It schedules tasks across a thread pool, respecting topological order and maximising parallelism.
import dagron
dag = (
dagron.DAG.builder()
.add_edge("a", "b")
.add_edge("a", "c")
.add_edge("b", "d")
.add_edge("c", "d")
.build()
)
tasks = {
"a": lambda: "data",
"b": lambda: "processed_b",
"c": lambda: "processed_c",
"d": lambda: "merged",
}
result = dagron.DAGExecutor(dag).execute(tasks)
Diamond DAG. Nodes b and c run in parallel after a completes.
Constructor parameters
dagron.DAGExecutor(
dag, # The DAG to execute
max_workers=None, # Thread pool size (default: CPU count)
costs=None, # Dict[str, float] — cost hints for scheduling
callbacks=None, # ExecutionCallbacks instance
fail_fast=True, # Stop on first failure?
enable_tracing=False, # Record execution trace?
hooks=None, # Plugin hooks
)
The execute() method
result = executor.execute(
tasks, # Dict[str, Callable]
timeout=None, # Overall timeout in seconds
cancel_event=None, # threading.Event to signal cancellation
)
Understanding ExecutionResult
Every .execute() call returns an ExecutionResult:
result = dagron.DAGExecutor(dag).execute(tasks)
# Aggregate counts
print(result.succeeded) # int
print(result.failed) # int
print(result.skipped) # int
print(result.timed_out) # int
print(result.cancelled) # int
# Wall-clock duration
print(result.total_duration_seconds) # float
# Per-node details
for name, nr in result.node_results.items():
print(name, nr.status, nr.result, nr.error, nr.duration_seconds)
NodeResult
Each entry in result.node_results is a NodeResult:
| Field | Type | Description |
|---|---|---|
name | str | Node name |
status | NodeStatus | Final status |
result | Any | Return value of the callable |
error | Exception | None | Exception if the task failed |
duration_seconds | float | Wall-clock time for this node |
NodeStatus
dagron defines eight possible statuses:
| Status | Badge | Meaning |
|---|---|---|
PENDING | pending | Not yet scheduled |
RUNNING | running | Currently executing |
COMPLETED | completed | Finished successfully |
FAILED | failed | Raised an exception |
SKIPPED | skipped | Skipped (upstream failure or condition) |
TIMED_OUT | timed-out | Exceeded timeout |
CANCELLED | cancelled | Cancelled via cancel event |
CACHE_HIT | cache-hit | Result retrieved from cache |
from dagron import NodeStatus
if result.node_results["d"].status == NodeStatus.COMPLETED:
print("All good!")
Fail-fast vs. best-effort
Fail-fast (default)
When fail_fast=True, the executor stops scheduling new tasks as soon as any
node fails. Nodes that depend on the failed node (directly or transitively)
are marked skipped.
import time
def slow_a():
time.sleep(1)
return "ok"
def failing_b():
raise RuntimeError("boom")
def depends_on_b():
return "never reached"
dag = dagron.DAG.builder() \
.add_edge("a", "c") \
.add_edge("b", "c") \
.build()
result = dagron.DAGExecutor(dag, fail_fast=True).execute({
"a": slow_a,
"b": failing_b,
"c": depends_on_b,
})
print(result.node_results["b"].status) # FAILED
print(result.node_results["c"].status) # SKIPPED
Best-effort
With fail_fast=False, independent branches continue executing even when one
branch fails. Only direct descendants of the failed node are skipped.
dag = (
dagron.DAG.builder()
.add_edge("root", "branch_a")
.add_edge("root", "branch_b")
.add_edge("branch_a", "join")
.add_edge("branch_b", "join")
.build()
)
def ok():
return "ok"
def fail():
raise RuntimeError("oops")
result = dagron.DAGExecutor(dag, fail_fast=False).execute({
"root": ok,
"branch_a": fail,
"branch_b": ok,
"join": ok, # skipped because branch_a failed
})
print(result.node_results["branch_b"].status) # COMPLETED (still ran!)
print(result.node_results["join"].status) # SKIPPED
Timeouts
Global timeout
Set a wall-clock deadline for the entire execution:
result = dagron.DAGExecutor(dag).execute(tasks, timeout=30.0)
# Any node still running after 30 seconds is marked TIMED_OUT
for name, nr in result.node_results.items():
if nr.status == dagron.NodeStatus.TIMED_OUT:
print(f"{name} timed out!")
Timed-out nodes appear as timed-out in the results.
Per-node timeouts
Per-node timeouts are supported through the cost-aware scheduling system. See the Resource Scheduling API reference for details.
Cancellation
You can cancel a running execution from another thread using a
threading.Event:
import threading
cancel = threading.Event()
# In another thread (e.g., signal handler):
# cancel.set()
result = dagron.DAGExecutor(dag).execute(tasks, cancel_event=cancel)
When the event is set, the executor finishes any currently-running tasks but does not schedule new ones. Unstarted nodes are marked cancelled.
import signal
cancel = threading.Event()
signal.signal(signal.SIGINT, lambda *_: cancel.set())
result = dagron.DAGExecutor(dag).execute(tasks, cancel_event=cancel)
print(result.cancelled) # number of cancelled nodes
Callbacks
Callbacks let you react to execution events — for logging, metrics, progress bars, or custom logic.
class MyCallbacks:
def on_node_start(self, name):
print(f"[START] {name}")
def on_node_complete(self, name, result):
print(f"[DONE] {name} -> {result}")
def on_node_error(self, name, error):
print(f"[FAIL] {name}: {error}")
def on_node_skip(self, name):
print(f"[SKIP] {name}")
result = dagron.DAGExecutor(dag, callbacks=MyCallbacks()).execute(tasks)
Callbacks are called synchronously on the executor thread that completed the task. Keep them lightweight to avoid blocking the scheduler.
Progress tracking example
from dagron import DAGExecutor
class ProgressTracker:
def __init__(self, total):
self.total = total
self.done = 0
def on_node_complete(self, name, result):
self.done += 1
pct = (self.done / self.total) * 100
print(f"Progress: {self.done}/{self.total} ({pct:.0f}%)")
def on_node_error(self, name, error):
self.done += 1
def on_node_skip(self, name):
self.done += 1
tracker = ProgressTracker(dag.node_count())
result = DAGExecutor(dag, callbacks=tracker).execute(tasks)
Cost-aware scheduling
If some tasks are more expensive than others, provide cost hints so the executor can schedule them more intelligently:
costs = {
"train_model": 100.0,
"evaluate": 10.0,
"preprocess": 5.0,
}
result = dagron.DAGExecutor(dag, costs=costs).execute(tasks)
The executor uses costs when computing the critical path and when deciding which ready node to schedule first.
AsyncDAGExecutor
For I/O-bound workloads (HTTP requests, database queries, file operations), use the async executor:
import asyncio
import dagron
dag = (
dagron.DAG.builder()
.add_edge("fetch_users", "enrich")
.add_edge("fetch_orders", "enrich")
.add_edge("enrich", "store")
.build()
)
async def fetch_users():
await asyncio.sleep(0.5) # simulate HTTP call
return [{"id": 1, "name": "Alice"}]
async def fetch_orders():
await asyncio.sleep(0.3)
return [{"id": 1, "item": "Widget"}]
async def enrich():
return {"users": 1, "orders": 1}
async def store():
return "stored"
async def main():
executor = dagron.AsyncDAGExecutor(dag)
result = await executor.execute({
"fetch_users": fetch_users,
"fetch_orders": fetch_orders,
"enrich": enrich,
"store": store,
})
print(result.succeeded) # 4
asyncio.run(main())
AsyncDAGExecutor accepts the same constructor parameters as DAGExecutor
(max_workers, callbacks, fail_fast, enable_tracing, hooks), and its
.execute() method accepts the same timeout parameter.
Async execution: both fetch tasks run concurrently on the event loop.
Pipeline and @task
For simple function chains where dependencies are inferred from parameter names,
the Pipeline API is the most concise approach:
from dagron import Pipeline, task
@task
def download():
return {"raw": [1, 2, 3]}
@task
def normalize(download):
"""Depends on 'download' because of the parameter name."""
return [x * 10 for x in download["raw"]]
@task
def summarize(normalize):
return {"count": len(normalize), "total": sum(normalize)}
pipeline = Pipeline(tasks=[download, normalize, summarize], name="etl")
result = pipeline.execute()
print(result.node_results["summarize"].result)
# {'count': 3, 'total': 60}
How dependency inference works
The Pipeline inspects each function's parameter names. If a parameter matches the name of another task in the pipeline, an edge is added:
@task
def a():
return 1
@task
def b():
return 2
@task
def c(a, b):
"""Depends on both a and b."""
return a + b
pipeline = Pipeline(tasks=[a, b, c])
# Internally builds: a -> c, b -> c
Async pipelines
result = await pipeline.execute_async()
When to use Pipeline vs. DAGExecutor
| Feature | Pipeline | DAGExecutor |
|---|---|---|
| Dependency declaration | Implicit (parameter names) | Explicit (edges) |
| Data passing | Automatic (return values injected) | Manual |
| Graph complexity | Linear / simple fan-in | Any DAG shape |
| Fine-grained control | Limited | Full |
Use Pipeline for quick scripts and prototypes. Switch to DAGExecutor when you
need explicit control over graph structure, payloads, or advanced executor features.
Controlling parallelism
max_workers
# Use exactly 2 threads
result = dagron.DAGExecutor(dag, max_workers=2).execute(tasks)
# Use all available cores (default)
result = dagron.DAGExecutor(dag).execute(tasks)
Setting max_workers=1 gives you sequential execution in topological order,
which is useful for debugging.
Execution plan preview
Before executing, you can see the planned execution order:
plan = dag.execution_plan()
for step in plan:
print(step)
# ExecutionStep(level=0, nodes=['a'])
# ExecutionStep(level=1, nodes=['b', 'c'])
# ExecutionStep(level=2, nodes=['d'])
This tells you which nodes run in parallel at each level.
Putting it all together
Here is a complete example with all major features:
import dagron
import threading
import time
# Build the DAG
dag = (
dagron.DAG.builder()
.add_nodes(["extract", "validate", "transform", "enrich", "load", "notify"])
.add_edges([
("extract", "validate"),
("validate", "transform"),
("validate", "enrich"),
("transform", "load"),
("enrich", "load"),
("load", "notify"),
])
.build()
)
# Define tasks
def extract():
time.sleep(0.1)
return {"rows": 1000}
def validate():
return {"valid": True}
def transform():
time.sleep(0.2)
return {"transformed": 1000}
def enrich():
time.sleep(0.15)
return {"enriched": 1000}
def load():
return {"loaded": 1000}
def notify():
return "email sent"
tasks = {
"extract": extract,
"validate": validate,
"transform": transform,
"enrich": enrich,
"load": load,
"notify": notify,
}
# Callbacks
class Logger:
def on_node_start(self, name):
print(f" -> {name}")
def on_node_complete(self, name, result):
print(f" <- {name}: {result}")
def on_node_error(self, name, error):
print(f" !! {name}: {error}")
# Cancellation support
cancel = threading.Event()
# Execute with all options
result = dagron.DAGExecutor(
dag,
max_workers=4,
callbacks=Logger(),
fail_fast=True,
enable_tracing=True,
costs={"transform": 2.0, "enrich": 1.5},
).execute(tasks, timeout=60.0, cancel_event=cancel)
# Inspect results
print(f"\nCompleted: {result.succeeded}/{dag.node_count()}")
print(f"Duration: {result.total_duration_seconds:.3f}s")
for name, nr in result.node_results.items():
print(f" {name}: {nr.status.name} ({nr.duration_seconds:.3f}s)")
Complete ETL pipeline with parallel transform and enrich branches.
Next steps
- Tracing & Profiling — enable tracing and analyse bottlenecks.
- Incremental Execution — only re-run what changed.
- Conditional Execution — skip branches based on runtime predicates.
- Checkpointing — resume after failures.