Pipeline
The Pipeline API provides a high-level, decorator-based approach to building and
executing DAGs. Instead of manually creating nodes and edges, you decorate
functions with @task and let dagron infer the graph structure from function
parameter names.
See the Executing Tasks guide for usage patterns and the Contracts guide for type validation with pipelines.
@task
@task
def my_function(upstream_result):
...
# Or with explicit configuration:
@task(name="custom_name", timeout=30.0, retries=2)
def my_function(upstream_result):
...
A decorator that marks a function as a pipeline task. The DAG is inferred from function parameter names: each parameter name corresponds to the name of an upstream task whose return value is passed as the argument.
When used without arguments, the task name is the function name. When used with arguments, you can customize the name, timeout, and retry behavior.
| Parameter | Type | Default | Description |
|---|---|---|---|
name | str | None | None | Custom task name. Defaults to the function name. |
timeout | float | None | None | Per-task timeout in seconds. |
retries | int | 0 | Number of times to retry on failure. |
from dagron import task
@task
def extract():
"""Root task — no parameters, so no dependencies."""
return [1, 2, 3, 4, 5]
@task
def transform(extract):
"""Depends on 'extract'. Receives extract's return value."""
return [x * 10 for x in extract]
@task
def load(transform):
"""Depends on 'transform'. Receives transform's return value."""
return f"loaded {len(transform)} rows"
Dependency inference
The @task decorator inspects each parameter name and wires it as a dependency:
| Parameter name | Matched to |
|---|---|
extract | The task named "extract" |
transform | The task named "transform" |
| Any other name | The task with that name |
If a parameter name does not match any task in the pipeline, an error is raised at pipeline construction time.
Fan-in pattern
Tasks with multiple parameters depend on multiple upstream tasks:
@task
def merge(api_data, db_data):
"""Depends on both 'api_data' and 'db_data' tasks."""
return {**api_data, **db_data}
Pipeline
class Pipeline(
tasks: list[Callable],
name: str | None = None,
)
A high-level container that builds a DAG from a list of @task-decorated
functions and provides execution methods.
| Parameter | Type | Default | Description |
|---|---|---|---|
tasks | list[Callable] | required | List of @task-decorated functions. |
name | str | None | None | Optional name for the pipeline (used in tracing and logging). |
from dagron import Pipeline, task
@task
def fetch():
return {"users": [1, 2, 3]}
@task
def enrich(fetch):
return {**fetch, "enriched": True}
@task
def store(enrich):
return f"stored {len(enrich)} keys"
pipeline = Pipeline(tasks=[fetch, enrich, store], name="user-pipeline")
Properties
@property
def dag() -> DAG
Access the underlying DAG instance. Useful for inspection, visualization, or passing to lower-level executors.
print(pipeline.dag.node_count()) # 3
print(pipeline.dag.topological_sort()) # [fetch, enrich, store]
print(pipeline.dag.to_mermaid()) # Mermaid diagram string
@property
def task_names() -> list[str]
Return the names of all tasks in the pipeline, in topological order.
print(pipeline.task_names) # ["fetch", "enrich", "store"]
execute
def execute(
max_workers: int | None = None,
callbacks: ExecutionCallbacks | None = None,
fail_fast: bool = True,
enable_tracing: bool = False,
overrides: dict[str, Callable] | None = None,
) -> ExecutionResult
Execute the pipeline synchronously. Task return values are automatically passed as arguments to downstream tasks based on parameter names.
| Parameter | Type | Default | Description |
|---|---|---|---|
max_workers | int | None | None | Maximum concurrent workers. None uses os.cpu_count(). |
callbacks | ExecutionCallbacks | None | None | Optional lifecycle callbacks. |
fail_fast | bool | True | If True, skip downstream tasks when any task fails. |
enable_tracing | bool | False | If True, record a Chrome-compatible execution trace. |
overrides | dict[str, Callable] | None | None | Replace specific tasks by name. Useful for testing and mocking. |
Returns: ExecutionResult — the aggregate execution result.
result = pipeline.execute(max_workers=4)
print(result.succeeded) # 3
print(result.node_results["store"].result) # "stored 3 keys"
print(f"Took {result.total_duration_seconds:.3f}s")
Overriding tasks
Use overrides to replace tasks at execution time without modifying the
pipeline definition. This is especially useful for testing:
result = pipeline.execute(overrides={
"fetch": lambda: {"users": [99]}, # mock data
})
print(result.node_results["store"].result) # "stored 2 keys"
execute_async
async def execute_async(
max_workers: int | None = None,
callbacks: ExecutionCallbacks | None = None,
fail_fast: bool = True,
enable_tracing: bool = False,
overrides: dict[str, Callable] | None = None,
) -> ExecutionResult
Execute the pipeline asynchronously. All tasks should be async callables.
| Parameter | Type | Default | Description |
|---|---|---|---|
max_workers | int | None | None | Maximum concurrent coroutines. |
callbacks | ExecutionCallbacks | None | None | Optional lifecycle callbacks. |
fail_fast | bool | True | If True, skip downstream tasks when any task fails. |
enable_tracing | bool | False | If True, record an execution trace. |
overrides | dict[str, Callable] | None | None | Replace specific tasks by name. |
import asyncio
from dagron import Pipeline, task
@task
async def fetch():
await asyncio.sleep(0.1)
return [1, 2, 3]
@task
async def process(fetch):
return [x * 2 for x in fetch]
pipeline = Pipeline(tasks=[fetch, process])
result = asyncio.run(pipeline.execute_async())
print(result.node_results["process"].result) # [2, 4, 6]
validate_contracts
def validate_contracts(
extra_contracts: dict[str, tuple[dict | None, type]] | None = None,
) -> list[ContractViolation]
Validate type contracts across all pipeline edges. If tasks have type
annotations, those are used as implicit contracts. You can supply additional
explicit contracts via extra_contracts.
| Parameter | Type | Default | Description |
|---|---|---|---|
extra_contracts | dict[str, tuple[dict | None, type]] | None | None | Additional contracts keyed by task name. Each value is (inputs_dict, output_type). |
Returns: list[ContractViolation] — empty if all contracts are consistent.
from dagron import Pipeline, task
@task
def fetch() -> list:
return [1, 2, 3]
@task
def process(fetch: list) -> dict:
return {"data": fetch}
pipeline = Pipeline(tasks=[fetch, process])
violations = pipeline.validate_contracts()
assert len(violations) == 0
Complete Example
A realistic data pipeline with fan-out, fan-in, callbacks, and tracing:
from dagron import Pipeline, task, ExecutionCallbacks
@task
def api_source():
return {"source": "api", "rows": 100}
@task
def db_source():
return {"source": "db", "rows": 250}
@task
def clean_api(api_source):
return {**api_source, "cleaned": True}
@task
def clean_db(db_source):
return {**db_source, "cleaned": True}
@task
def merge(clean_api, clean_db):
total = clean_api["rows"] + clean_db["rows"]
return {"total_rows": total, "sources": 2}
@task
def publish(merge):
return f"Published {merge['total_rows']} rows from {merge['sources']} sources"
pipeline = Pipeline(
tasks=[api_source, db_source, clean_api, clean_db, merge, publish],
name="etl-pipeline",
)
# Inspect the generated DAG
print(pipeline.dag.to_mermaid())
print(f"Parallelism levels: {pipeline.dag.topological_levels()}")
# Execute with callbacks and tracing
result = pipeline.execute(
max_workers=4,
callbacks=ExecutionCallbacks(
on_start=lambda n: print(f" Starting {n}..."),
on_complete=lambda n, r: print(f" Finished {n}"),
),
enable_tracing=True,
)
print(f"\n{result.node_results['publish'].result}")
# Published 350 rows from 2 sources
Related
- DAGExecutor — the lower-level thread-pool executor used internally.
- DAGBuilder — the builder that the Pipeline constructs behind the scenes.
- Contracts — type contract validation guide.
- Tracing & Profiling — visualizing pipeline traces.