Skip to main content

Executing Tasks

Once you have a DAG, you need an executor to run it. dagron ships with several executor types — from a simple thread-pool executor to async, pipeline, conditional, dynamic, and incremental variants. This guide covers the two general-purpose executors (DAGExecutor and AsyncDAGExecutor), the Pipeline convenience API, and all the options you can tune.

Executor overview

ExecutorRuntimeUse case
DAGExecutorThread poolCPU-bound tasks, synchronous code
AsyncDAGExecutorasyncioI/O-bound tasks, async/await code
PipelineThread poolSimple linear chains with @task

Specialised executors are covered in their own guides: Incremental, Conditional, Dynamic, Checkpointing.

DAGExecutor

The workhorse executor. It schedules tasks across a thread pool, respecting topological order and maximising parallelism.

import dagron

dag = (
dagron.DAG.builder()
.add_edge("a", "b")
.add_edge("a", "c")
.add_edge("b", "d")
.add_edge("c", "d")
.build()
)

tasks = {
"a": lambda: "data",
"b": lambda: "processed_b",
"c": lambda: "processed_c",
"d": lambda: "merged",
}

result = dagron.DAGExecutor(dag).execute(tasks)

Diamond DAG. Nodes b and c run in parallel after a completes.

Constructor parameters

dagron.DAGExecutor(
dag, # The DAG to execute
max_workers=None, # Thread pool size (default: CPU count)
costs=None, # Dict[str, float] — cost hints for scheduling
callbacks=None, # ExecutionCallbacks instance
fail_fast=True, # Stop on first failure?
enable_tracing=False, # Record execution trace?
hooks=None, # Plugin hooks
)

The execute() method

result = executor.execute(
tasks, # Dict[str, Callable]
timeout=None, # Overall timeout in seconds
cancel_event=None, # threading.Event to signal cancellation
)

Understanding ExecutionResult

Every .execute() call returns an ExecutionResult:

result = dagron.DAGExecutor(dag).execute(tasks)

# Aggregate counts
print(result.succeeded) # int
print(result.failed) # int
print(result.skipped) # int
print(result.timed_out) # int
print(result.cancelled) # int

# Wall-clock duration
print(result.total_duration_seconds) # float

# Per-node details
for name, nr in result.node_results.items():
print(name, nr.status, nr.result, nr.error, nr.duration_seconds)

NodeResult

Each entry in result.node_results is a NodeResult:

FieldTypeDescription
namestrNode name
statusNodeStatusFinal status
resultAnyReturn value of the callable
errorException | NoneException if the task failed
duration_secondsfloatWall-clock time for this node

NodeStatus

dagron defines eight possible statuses:

StatusBadgeMeaning
PENDINGpendingNot yet scheduled
RUNNINGrunningCurrently executing
COMPLETEDcompletedFinished successfully
FAILEDfailedRaised an exception
SKIPPEDskippedSkipped (upstream failure or condition)
TIMED_OUTtimed-outExceeded timeout
CANCELLEDcancelledCancelled via cancel event
CACHE_HITcache-hitResult retrieved from cache
from dagron import NodeStatus

if result.node_results["d"].status == NodeStatus.COMPLETED:
print("All good!")

Fail-fast vs. best-effort

Fail-fast (default)

When fail_fast=True, the executor stops scheduling new tasks as soon as any node fails. Nodes that depend on the failed node (directly or transitively) are marked skipped.

import time

def slow_a():
time.sleep(1)
return "ok"

def failing_b():
raise RuntimeError("boom")

def depends_on_b():
return "never reached"

dag = dagron.DAG.builder() \
.add_edge("a", "c") \
.add_edge("b", "c") \
.build()

result = dagron.DAGExecutor(dag, fail_fast=True).execute({
"a": slow_a,
"b": failing_b,
"c": depends_on_b,
})

print(result.node_results["b"].status) # FAILED
print(result.node_results["c"].status) # SKIPPED

Best-effort

With fail_fast=False, independent branches continue executing even when one branch fails. Only direct descendants of the failed node are skipped.

dag = (
dagron.DAG.builder()
.add_edge("root", "branch_a")
.add_edge("root", "branch_b")
.add_edge("branch_a", "join")
.add_edge("branch_b", "join")
.build()
)

def ok():
return "ok"

def fail():
raise RuntimeError("oops")

result = dagron.DAGExecutor(dag, fail_fast=False).execute({
"root": ok,
"branch_a": fail,
"branch_b": ok,
"join": ok, # skipped because branch_a failed
})

print(result.node_results["branch_b"].status) # COMPLETED (still ran!)
print(result.node_results["join"].status) # SKIPPED

Timeouts

Global timeout

Set a wall-clock deadline for the entire execution:

result = dagron.DAGExecutor(dag).execute(tasks, timeout=30.0)

# Any node still running after 30 seconds is marked TIMED_OUT
for name, nr in result.node_results.items():
if nr.status == dagron.NodeStatus.TIMED_OUT:
print(f"{name} timed out!")

Timed-out nodes appear as timed-out in the results.

Per-node timeouts

Per-node timeouts are supported through the cost-aware scheduling system. See the Resource Scheduling API reference for details.

Cancellation

You can cancel a running execution from another thread using a threading.Event:

import threading

cancel = threading.Event()

# In another thread (e.g., signal handler):
# cancel.set()

result = dagron.DAGExecutor(dag).execute(tasks, cancel_event=cancel)

When the event is set, the executor finishes any currently-running tasks but does not schedule new ones. Unstarted nodes are marked cancelled.

import signal

cancel = threading.Event()
signal.signal(signal.SIGINT, lambda *_: cancel.set())

result = dagron.DAGExecutor(dag).execute(tasks, cancel_event=cancel)
print(result.cancelled) # number of cancelled nodes

Callbacks

Callbacks let you react to execution events — for logging, metrics, progress bars, or custom logic.

class MyCallbacks:
def on_node_start(self, name):
print(f"[START] {name}")

def on_node_complete(self, name, result):
print(f"[DONE] {name} -> {result}")

def on_node_error(self, name, error):
print(f"[FAIL] {name}: {error}")

def on_node_skip(self, name):
print(f"[SKIP] {name}")

result = dagron.DAGExecutor(dag, callbacks=MyCallbacks()).execute(tasks)

Callbacks are called synchronously on the executor thread that completed the task. Keep them lightweight to avoid blocking the scheduler.

Progress tracking example

from dagron import DAGExecutor

class ProgressTracker:
def __init__(self, total):
self.total = total
self.done = 0

def on_node_complete(self, name, result):
self.done += 1
pct = (self.done / self.total) * 100
print(f"Progress: {self.done}/{self.total} ({pct:.0f}%)")

def on_node_error(self, name, error):
self.done += 1

def on_node_skip(self, name):
self.done += 1

tracker = ProgressTracker(dag.node_count())
result = DAGExecutor(dag, callbacks=tracker).execute(tasks)

Cost-aware scheduling

If some tasks are more expensive than others, provide cost hints so the executor can schedule them more intelligently:

costs = {
"train_model": 100.0,
"evaluate": 10.0,
"preprocess": 5.0,
}

result = dagron.DAGExecutor(dag, costs=costs).execute(tasks)

The executor uses costs when computing the critical path and when deciding which ready node to schedule first.

AsyncDAGExecutor

For I/O-bound workloads (HTTP requests, database queries, file operations), use the async executor:

import asyncio
import dagron

dag = (
dagron.DAG.builder()
.add_edge("fetch_users", "enrich")
.add_edge("fetch_orders", "enrich")
.add_edge("enrich", "store")
.build()
)

async def fetch_users():
await asyncio.sleep(0.5) # simulate HTTP call
return [{"id": 1, "name": "Alice"}]

async def fetch_orders():
await asyncio.sleep(0.3)
return [{"id": 1, "item": "Widget"}]

async def enrich():
return {"users": 1, "orders": 1}

async def store():
return "stored"

async def main():
executor = dagron.AsyncDAGExecutor(dag)
result = await executor.execute({
"fetch_users": fetch_users,
"fetch_orders": fetch_orders,
"enrich": enrich,
"store": store,
})
print(result.succeeded) # 4

asyncio.run(main())

AsyncDAGExecutor accepts the same constructor parameters as DAGExecutor (max_workers, callbacks, fail_fast, enable_tracing, hooks), and its .execute() method accepts the same timeout parameter.

Async execution: both fetch tasks run concurrently on the event loop.

Pipeline and @task

For simple function chains where dependencies are inferred from parameter names, the Pipeline API is the most concise approach:

from dagron import Pipeline, task

@task
def download():
return {"raw": [1, 2, 3]}

@task
def normalize(download):
"""Depends on 'download' because of the parameter name."""
return [x * 10 for x in download["raw"]]

@task
def summarize(normalize):
return {"count": len(normalize), "total": sum(normalize)}

pipeline = Pipeline(tasks=[download, normalize, summarize], name="etl")
result = pipeline.execute()

print(result.node_results["summarize"].result)
# {'count': 3, 'total': 60}

How dependency inference works

The Pipeline inspects each function's parameter names. If a parameter matches the name of another task in the pipeline, an edge is added:

@task
def a():
return 1

@task
def b():
return 2

@task
def c(a, b):
"""Depends on both a and b."""
return a + b

pipeline = Pipeline(tasks=[a, b, c])
# Internally builds: a -> c, b -> c

Async pipelines

result = await pipeline.execute_async()

When to use Pipeline vs. DAGExecutor

FeaturePipelineDAGExecutor
Dependency declarationImplicit (parameter names)Explicit (edges)
Data passingAutomatic (return values injected)Manual
Graph complexityLinear / simple fan-inAny DAG shape
Fine-grained controlLimitedFull

Use Pipeline for quick scripts and prototypes. Switch to DAGExecutor when you need explicit control over graph structure, payloads, or advanced executor features.

Controlling parallelism

max_workers

# Use exactly 2 threads
result = dagron.DAGExecutor(dag, max_workers=2).execute(tasks)

# Use all available cores (default)
result = dagron.DAGExecutor(dag).execute(tasks)

Setting max_workers=1 gives you sequential execution in topological order, which is useful for debugging.

Execution plan preview

Before executing, you can see the planned execution order:

plan = dag.execution_plan()
for step in plan:
print(step)
# ExecutionStep(level=0, nodes=['a'])
# ExecutionStep(level=1, nodes=['b', 'c'])
# ExecutionStep(level=2, nodes=['d'])

This tells you which nodes run in parallel at each level.

Putting it all together

Here is a complete example with all major features:

import dagron
import threading
import time

# Build the DAG
dag = (
dagron.DAG.builder()
.add_nodes(["extract", "validate", "transform", "enrich", "load", "notify"])
.add_edges([
("extract", "validate"),
("validate", "transform"),
("validate", "enrich"),
("transform", "load"),
("enrich", "load"),
("load", "notify"),
])
.build()
)

# Define tasks
def extract():
time.sleep(0.1)
return {"rows": 1000}

def validate():
return {"valid": True}

def transform():
time.sleep(0.2)
return {"transformed": 1000}

def enrich():
time.sleep(0.15)
return {"enriched": 1000}

def load():
return {"loaded": 1000}

def notify():
return "email sent"

tasks = {
"extract": extract,
"validate": validate,
"transform": transform,
"enrich": enrich,
"load": load,
"notify": notify,
}

# Callbacks
class Logger:
def on_node_start(self, name):
print(f" -> {name}")
def on_node_complete(self, name, result):
print(f" <- {name}: {result}")
def on_node_error(self, name, error):
print(f" !! {name}: {error}")

# Cancellation support
cancel = threading.Event()

# Execute with all options
result = dagron.DAGExecutor(
dag,
max_workers=4,
callbacks=Logger(),
fail_fast=True,
enable_tracing=True,
costs={"transform": 2.0, "enrich": 1.5},
).execute(tasks, timeout=60.0, cancel_event=cancel)

# Inspect results
print(f"\nCompleted: {result.succeeded}/{dag.node_count()}")
print(f"Duration: {result.total_duration_seconds:.3f}s")

for name, nr in result.node_results.items():
print(f" {name}: {nr.status.name} ({nr.duration_seconds:.3f}s)")

Complete ETL pipeline with parallel transform and enrich branches.

Next steps