Skip to main content

Getting Started

This guide walks you through installing dagron, creating your first directed acyclic graph, executing tasks in parallel, and inspecting the results. By the end you will have a working ETL pipeline that extracts, transforms, and loads data with full observability.

Installation

dagron is distributed as a single wheel that bundles the Rust core via PyO3. Install it from PyPI:

pip install dagron

For async execution support, install the optional async extra:

pip install "dagron[async]"

Verify the installation:

import dagron
print(dagron.__version__)
System requirements

dagron requires Python 3.9 or later. The Rust extension is pre-compiled for Linux (x86_64, aarch64), macOS (x86_64, Apple Silicon), and Windows (x86_64).

Core concepts

Before writing code, it helps to understand three concepts that appear throughout dagron:

ConceptWhat it is
DAGA directed acyclic graph whose nodes represent units of work and whose edges represent dependencies. The graph structure lives in Rust for speed.
ExecutorA scheduler that walks the DAG in topological order, dispatching tasks to a thread pool (or async event loop) with maximum parallelism.
ResultA structured report containing every node's status, return value, error (if any), and wall-clock duration.

The typical workflow is: build a DAG, map tasks to nodes, execute, inspect results.

Your first DAG

Let us model a classic ETL pipeline with three stages: extract, transform, and load. The transform step depends on extract, and load depends on transform.

A simple three-node ETL pipeline.

Step 1 — Build the graph

The easiest way to create a DAG is with the fluent builder pattern:

import dagron

dag = (
dagron.DAG.builder()
.add_node("extract")
.add_node("transform")
.add_node("load")
.add_edge("extract", "transform")
.add_edge("transform", "load")
.build()
)

The builder validates the graph at .build() time. If you accidentally introduce a cycle, dagron raises a CycleError immediately — you never get an invalid graph.

print(dag.node_count())  # 3
print(dag.edge_count()) # 2

Step 2 — Define tasks

A task is any Python callable (function, lambda, method). You map node names to callables in a plain dictionary:

def extract():
"""Simulate fetching rows from an API."""
print("Extracting data...")
return [{"id": 1, "name": "Alice"}, {"id": 2, "name": "Bob"}]

def transform():
"""Normalize names to uppercase."""
print("Transforming data...")
return [{"id": 1, "name": "ALICE"}, {"id": 2, "name": "BOB"}]

def load():
"""Write results to the database."""
print("Loading data...")
return "2 rows written"

tasks = {
"extract": extract,
"transform": transform,
"load": load,
}
note

Each task runs independently. If you need to pass data between tasks, see the Executing Tasks guide for strategies including shared state and the Pipeline API.

Step 3 — Execute

Create a DAGExecutor and call .execute():

executor = dagron.DAGExecutor(dag)
result = executor.execute(tasks)

The executor honours the dependency order: extract runs first, then transform, then load. Independent nodes (if any) would run in parallel across all available CPU cores.

Step 4 — Inspect results

The returned ExecutionResult contains per-node outcomes:

print(result.succeeded)  # 3
print(result.failed) # 0

for name, node_result in result.node_results.items():
print(f"{name}: {node_result.status} in {node_result.duration_seconds:.3f}s")
# extract: NodeStatus.COMPLETED in 0.001s
# transform: NodeStatus.COMPLETED in 0.001s
# load: NodeStatus.COMPLETED in 0.001s

Each NodeResult carries the callable's return value:

print(result.node_results["load"].result)
# "2 rows written"

And the overall wall-clock time:

print(f"Pipeline finished in {result.total_duration_seconds:.3f}s")

A more realistic example

Real pipelines have fan-out and fan-in. Let us expand the ETL to extract from two sources in parallel, transform each independently, then merge and load:

Fan-out / fan-in ETL pipeline. The two extract-transform branches run in parallel.

import dagron
import time

# -- Build the DAG --
dag = (
dagron.DAG.builder()
.add_node("extract_api")
.add_node("extract_db")
.add_node("transform_api")
.add_node("transform_db")
.add_node("merge")
.add_node("load")
.add_edge("extract_api", "transform_api")
.add_edge("extract_db", "transform_db")
.add_edge("transform_api", "merge")
.add_edge("transform_db", "merge")
.add_edge("merge", "load")
.build()
)

# -- Define tasks --
def extract_api():
time.sleep(0.5) # simulate network I/O
return [{"source": "api", "id": 1}]

def extract_db():
time.sleep(0.3) # simulate query
return [{"source": "db", "id": 2}]

def transform_api():
return [{"source": "api", "id": 1, "clean": True}]

def transform_db():
return [{"source": "db", "id": 2, "clean": True}]

def merge():
return "merged 2 sources"

def load():
return "loaded to warehouse"

tasks = {
"extract_api": extract_api,
"extract_db": extract_db,
"transform_api": transform_api,
"transform_db": transform_db,
"merge": merge,
"load": load,
}

# -- Execute --
result = dagron.DAGExecutor(dag, max_workers=4).execute(tasks)

print(f"Completed {result.succeeded}/{dag.node_count()} tasks")
print(f"Wall time: {result.total_duration_seconds:.3f}s")

Because extract_api and extract_db have no mutual dependency, they execute concurrently. The merge node waits until both transform branches finish, then load runs last.

Understanding execution order

You can preview the execution plan without running anything:

print(dag.topological_sort())
# ['extract_api', 'extract_db', 'transform_api', 'transform_db', 'merge', 'load']

for level, nodes in enumerate(dag.topological_levels()):
print(f"Level {level}: {nodes}")
# Level 0: ['extract_api', 'extract_db']
# Level 1: ['transform_api', 'transform_db']
# Level 2: ['merge']
# Level 3: ['load']

Nodes at the same level can run in parallel. The executor uses this structure internally to maximise concurrency.

Handling failures

By default, the executor uses fail-fast mode: if any node raises an exception, downstream nodes are skipped and the result reports the failure.

def bad_transform():
raise ValueError("Data quality check failed!")

tasks_with_failure = {
"extract": extract,
"transform": bad_transform,
"load": load,
}

result = dagron.DAGExecutor(dag).execute(tasks_with_failure)

print(result.failed) # 1
print(result.skipped) # 1 (load was skipped)

Node statuses after a failure:

NodeStatus
extractcompleted
transformfailed
loadskipped

To continue executing independent branches even after a failure, disable fail-fast:

result = dagron.DAGExecutor(dag, fail_fast=False).execute(tasks)

See the Executing Tasks guide for the full set of executor options.

Builder shortcuts

The builder supports several convenience patterns:

Bulk operations

dag = (
dagron.DAG.builder()
.add_nodes(["a", "b", "c", "d"])
.add_edges([("a", "b"), ("a", "c"), ("b", "d"), ("c", "d")])
.build()
)

Direct construction

If you prefer an imperative style, create a bare DAG and mutate it:

dag = dagron.DAG()
dag.add_node("x")
dag.add_node("y")
dag.add_edge("x", "y")

Pipeline decorator

For simple linear pipelines, the @task decorator infers dependencies from function parameter names:

from dagron import Pipeline, task

@task
def fetch():
return [1, 2, 3]

@task
def double(fetch):
return [x * 2 for x in fetch]

@task
def save(double):
return f"saved {len(double)} items"

pipeline = Pipeline(tasks=[fetch, double, save], name="numbers")
result = pipeline.execute()
print(result.node_results["save"].result) # "saved 3 items"

The Pipeline API is covered in depth in Executing Tasks.

Visualizing your DAG

dagron can export the graph in several formats:

# Mermaid (great for docs)
print(dag.to_mermaid())

# Graphviz DOT
print(dag.to_dot())

# JSON (for programmatic consumption)
print(dag.to_json())

See Serialization for the full serialization guide.

Quick reference

Here is a summary of the objects introduced in this guide with links to the API reference:

ObjectPurposeAPI docs
dagron.DAGThe core graphDAG
dagron.DAG.builder()Fluent graph constructionDAGBuilder
dagron.DAGExecutorThread-pool executorDAGExecutor
ExecutionResultAggregate execution reportExecutionResult
NodeResultPer-node outcomeNodeResult
Pipeline / @taskDecorator-based pipelinesPipeline

Next steps

You now know how to install dagron, build a DAG, execute tasks, and read results. Continue with: