Getting Started
This guide walks you through installing dagron, creating your first directed acyclic graph, executing tasks in parallel, and inspecting the results. By the end you will have a working ETL pipeline that extracts, transforms, and loads data with full observability.
Installation
dagron is distributed as a single wheel that bundles the Rust core via PyO3. Install it from PyPI:
pip install dagron
For async execution support, install the optional async extra:
pip install "dagron[async]"
Verify the installation:
import dagron
print(dagron.__version__)
dagron requires Python 3.9 or later. The Rust extension is pre-compiled for Linux (x86_64, aarch64), macOS (x86_64, Apple Silicon), and Windows (x86_64).
Core concepts
Before writing code, it helps to understand three concepts that appear throughout dagron:
| Concept | What it is |
|---|---|
| DAG | A directed acyclic graph whose nodes represent units of work and whose edges represent dependencies. The graph structure lives in Rust for speed. |
| Executor | A scheduler that walks the DAG in topological order, dispatching tasks to a thread pool (or async event loop) with maximum parallelism. |
| Result | A structured report containing every node's status, return value, error (if any), and wall-clock duration. |
The typical workflow is: build a DAG, map tasks to nodes, execute, inspect results.
Your first DAG
Let us model a classic ETL pipeline with three stages: extract, transform, and load. The transform step depends on extract, and load depends on transform.
A simple three-node ETL pipeline.
Step 1 — Build the graph
The easiest way to create a DAG is with the fluent builder pattern:
import dagron
dag = (
dagron.DAG.builder()
.add_node("extract")
.add_node("transform")
.add_node("load")
.add_edge("extract", "transform")
.add_edge("transform", "load")
.build()
)
The builder validates the graph at .build() time. If you accidentally introduce a
cycle, dagron raises a CycleError immediately — you never get an invalid graph.
print(dag.node_count()) # 3
print(dag.edge_count()) # 2
Step 2 — Define tasks
A task is any Python callable (function, lambda, method). You map node names to callables in a plain dictionary:
def extract():
"""Simulate fetching rows from an API."""
print("Extracting data...")
return [{"id": 1, "name": "Alice"}, {"id": 2, "name": "Bob"}]
def transform():
"""Normalize names to uppercase."""
print("Transforming data...")
return [{"id": 1, "name": "ALICE"}, {"id": 2, "name": "BOB"}]
def load():
"""Write results to the database."""
print("Loading data...")
return "2 rows written"
tasks = {
"extract": extract,
"transform": transform,
"load": load,
}
Each task runs independently. If you need to pass data between tasks, see the Executing Tasks guide for strategies including shared state and the Pipeline API.
Step 3 — Execute
Create a DAGExecutor and call .execute():
executor = dagron.DAGExecutor(dag)
result = executor.execute(tasks)
The executor honours the dependency order: extract runs first, then transform, then load. Independent nodes (if any) would run in parallel across all available CPU cores.
Step 4 — Inspect results
The returned ExecutionResult contains per-node
outcomes:
print(result.succeeded) # 3
print(result.failed) # 0
for name, node_result in result.node_results.items():
print(f"{name}: {node_result.status} in {node_result.duration_seconds:.3f}s")
# extract: NodeStatus.COMPLETED in 0.001s
# transform: NodeStatus.COMPLETED in 0.001s
# load: NodeStatus.COMPLETED in 0.001s
Each NodeResult carries the callable's return value:
print(result.node_results["load"].result)
# "2 rows written"
And the overall wall-clock time:
print(f"Pipeline finished in {result.total_duration_seconds:.3f}s")
A more realistic example
Real pipelines have fan-out and fan-in. Let us expand the ETL to extract from two sources in parallel, transform each independently, then merge and load:
Fan-out / fan-in ETL pipeline. The two extract-transform branches run in parallel.
import dagron
import time
# -- Build the DAG --
dag = (
dagron.DAG.builder()
.add_node("extract_api")
.add_node("extract_db")
.add_node("transform_api")
.add_node("transform_db")
.add_node("merge")
.add_node("load")
.add_edge("extract_api", "transform_api")
.add_edge("extract_db", "transform_db")
.add_edge("transform_api", "merge")
.add_edge("transform_db", "merge")
.add_edge("merge", "load")
.build()
)
# -- Define tasks --
def extract_api():
time.sleep(0.5) # simulate network I/O
return [{"source": "api", "id": 1}]
def extract_db():
time.sleep(0.3) # simulate query
return [{"source": "db", "id": 2}]
def transform_api():
return [{"source": "api", "id": 1, "clean": True}]
def transform_db():
return [{"source": "db", "id": 2, "clean": True}]
def merge():
return "merged 2 sources"
def load():
return "loaded to warehouse"
tasks = {
"extract_api": extract_api,
"extract_db": extract_db,
"transform_api": transform_api,
"transform_db": transform_db,
"merge": merge,
"load": load,
}
# -- Execute --
result = dagron.DAGExecutor(dag, max_workers=4).execute(tasks)
print(f"Completed {result.succeeded}/{dag.node_count()} tasks")
print(f"Wall time: {result.total_duration_seconds:.3f}s")
Because extract_api and extract_db have no mutual dependency, they execute
concurrently. The merge node waits until both transform branches finish,
then load runs last.
Understanding execution order
You can preview the execution plan without running anything:
print(dag.topological_sort())
# ['extract_api', 'extract_db', 'transform_api', 'transform_db', 'merge', 'load']
for level, nodes in enumerate(dag.topological_levels()):
print(f"Level {level}: {nodes}")
# Level 0: ['extract_api', 'extract_db']
# Level 1: ['transform_api', 'transform_db']
# Level 2: ['merge']
# Level 3: ['load']
Nodes at the same level can run in parallel. The executor uses this structure internally to maximise concurrency.
Handling failures
By default, the executor uses fail-fast mode: if any node raises an exception, downstream nodes are skipped and the result reports the failure.
def bad_transform():
raise ValueError("Data quality check failed!")
tasks_with_failure = {
"extract": extract,
"transform": bad_transform,
"load": load,
}
result = dagron.DAGExecutor(dag).execute(tasks_with_failure)
print(result.failed) # 1
print(result.skipped) # 1 (load was skipped)
Node statuses after a failure:
| Node | Status |
|---|---|
| extract | completed |
| transform | failed |
| load | skipped |
To continue executing independent branches even after a failure, disable fail-fast:
result = dagron.DAGExecutor(dag, fail_fast=False).execute(tasks)
See the Executing Tasks guide for the full set of executor options.
Builder shortcuts
The builder supports several convenience patterns:
Bulk operations
dag = (
dagron.DAG.builder()
.add_nodes(["a", "b", "c", "d"])
.add_edges([("a", "b"), ("a", "c"), ("b", "d"), ("c", "d")])
.build()
)
Direct construction
If you prefer an imperative style, create a bare DAG and mutate it:
dag = dagron.DAG()
dag.add_node("x")
dag.add_node("y")
dag.add_edge("x", "y")
Pipeline decorator
For simple linear pipelines, the @task decorator infers dependencies from
function parameter names:
from dagron import Pipeline, task
@task
def fetch():
return [1, 2, 3]
@task
def double(fetch):
return [x * 2 for x in fetch]
@task
def save(double):
return f"saved {len(double)} items"
pipeline = Pipeline(tasks=[fetch, double, save], name="numbers")
result = pipeline.execute()
print(result.node_results["save"].result) # "saved 3 items"
The Pipeline API is covered in depth in Executing Tasks.
Visualizing your DAG
dagron can export the graph in several formats:
# Mermaid (great for docs)
print(dag.to_mermaid())
# Graphviz DOT
print(dag.to_dot())
# JSON (for programmatic consumption)
print(dag.to_json())
See Serialization for the full serialization guide.
Quick reference
Here is a summary of the objects introduced in this guide with links to the API reference:
| Object | Purpose | API docs |
|---|---|---|
dagron.DAG | The core graph | DAG |
dagron.DAG.builder() | Fluent graph construction | DAGBuilder |
dagron.DAGExecutor | Thread-pool executor | DAGExecutor |
ExecutionResult | Aggregate execution report | ExecutionResult |
NodeResult | Per-node outcome | NodeResult |
Pipeline / @task | Decorator-based pipelines | Pipeline |
Next steps
You now know how to install dagron, build a DAG, execute tasks, and read results. Continue with:
- Building DAGs — deep dive into construction patterns, metadata, and payloads.
- Executing Tasks — timeouts, cancellation, callbacks, async execution.
- Inspecting Graphs — analysis, querying, and what-if exploration.
- Tracing & Profiling — Chrome-compatible traces and bottleneck detection.