Skip to main content

Pipeline

The Pipeline API provides a high-level, decorator-based approach to building and executing DAGs. Instead of manually creating nodes and edges, you decorate functions with @task and let dagron infer the graph structure from function parameter names.

See the Executing Tasks guide for usage patterns and the Contracts guide for type validation with pipelines.


@task

@task
@task
def my_function(upstream_result):
...

# Or with explicit configuration:
@task(name="custom_name", timeout=30.0, retries=2)
def my_function(upstream_result):
...

A decorator that marks a function as a pipeline task. The DAG is inferred from function parameter names: each parameter name corresponds to the name of an upstream task whose return value is passed as the argument.

When used without arguments, the task name is the function name. When used with arguments, you can customize the name, timeout, and retry behavior.

ParameterTypeDefaultDescription
namestr | NoneNoneCustom task name. Defaults to the function name.
timeoutfloat | NoneNonePer-task timeout in seconds.
retriesint0Number of times to retry on failure.
from dagron import task

@task
def extract():
"""Root task — no parameters, so no dependencies."""
return [1, 2, 3, 4, 5]

@task
def transform(extract):
"""Depends on 'extract'. Receives extract's return value."""
return [x * 10 for x in extract]

@task
def load(transform):
"""Depends on 'transform'. Receives transform's return value."""
return f"loaded {len(transform)} rows"

Dependency inference

The @task decorator inspects each parameter name and wires it as a dependency:

Parameter nameMatched to
extractThe task named "extract"
transformThe task named "transform"
Any other nameThe task with that name

If a parameter name does not match any task in the pipeline, an error is raised at pipeline construction time.

Fan-in pattern

Tasks with multiple parameters depend on multiple upstream tasks:

@task
def merge(api_data, db_data):
"""Depends on both 'api_data' and 'db_data' tasks."""
return {**api_data, **db_data}

Pipeline

Pipeline
class Pipeline(
tasks: list[Callable],
name: str | None = None,
)

A high-level container that builds a DAG from a list of @task-decorated functions and provides execution methods.

ParameterTypeDefaultDescription
taskslist[Callable]requiredList of @task-decorated functions.
namestr | NoneNoneOptional name for the pipeline (used in tracing and logging).
from dagron import Pipeline, task

@task
def fetch():
return {"users": [1, 2, 3]}

@task
def enrich(fetch):
return {**fetch, "enriched": True}

@task
def store(enrich):
return f"stored {len(enrich)} keys"

pipeline = Pipeline(tasks=[fetch, enrich, store], name="user-pipeline")

Properties

Pipeline.dag
@property
def dag() -> DAG

Access the underlying DAG instance. Useful for inspection, visualization, or passing to lower-level executors.

print(pipeline.dag.node_count())       # 3
print(pipeline.dag.topological_sort()) # [fetch, enrich, store]
print(pipeline.dag.to_mermaid()) # Mermaid diagram string
Pipeline.task_names
@property
def task_names() -> list[str]

Return the names of all tasks in the pipeline, in topological order.

print(pipeline.task_names)  # ["fetch", "enrich", "store"]

execute

Pipeline.execute
def execute(
max_workers: int | None = None,
callbacks: ExecutionCallbacks | None = None,
fail_fast: bool = True,
enable_tracing: bool = False,
overrides: dict[str, Callable] | None = None,
) -> ExecutionResult

Execute the pipeline synchronously. Task return values are automatically passed as arguments to downstream tasks based on parameter names.

ParameterTypeDefaultDescription
max_workersint | NoneNoneMaximum concurrent workers. None uses os.cpu_count().
callbacksExecutionCallbacks | NoneNoneOptional lifecycle callbacks.
fail_fastboolTrueIf True, skip downstream tasks when any task fails.
enable_tracingboolFalseIf True, record a Chrome-compatible execution trace.
overridesdict[str, Callable] | NoneNoneReplace specific tasks by name. Useful for testing and mocking.

Returns: ExecutionResult — the aggregate execution result.

result = pipeline.execute(max_workers=4)

print(result.succeeded) # 3
print(result.node_results["store"].result) # "stored 3 keys"
print(f"Took {result.total_duration_seconds:.3f}s")

Overriding tasks

Use overrides to replace tasks at execution time without modifying the pipeline definition. This is especially useful for testing:

result = pipeline.execute(overrides={
"fetch": lambda: {"users": [99]}, # mock data
})
print(result.node_results["store"].result) # "stored 2 keys"

execute_async

Pipeline.execute_async
async def execute_async(
max_workers: int | None = None,
callbacks: ExecutionCallbacks | None = None,
fail_fast: bool = True,
enable_tracing: bool = False,
overrides: dict[str, Callable] | None = None,
) -> ExecutionResult

Execute the pipeline asynchronously. All tasks should be async callables.

ParameterTypeDefaultDescription
max_workersint | NoneNoneMaximum concurrent coroutines.
callbacksExecutionCallbacks | NoneNoneOptional lifecycle callbacks.
fail_fastboolTrueIf True, skip downstream tasks when any task fails.
enable_tracingboolFalseIf True, record an execution trace.
overridesdict[str, Callable] | NoneNoneReplace specific tasks by name.
import asyncio
from dagron import Pipeline, task

@task
async def fetch():
await asyncio.sleep(0.1)
return [1, 2, 3]

@task
async def process(fetch):
return [x * 2 for x in fetch]

pipeline = Pipeline(tasks=[fetch, process])
result = asyncio.run(pipeline.execute_async())
print(result.node_results["process"].result) # [2, 4, 6]

validate_contracts

Pipeline.validate_contracts
def validate_contracts(
extra_contracts: dict[str, tuple[dict | None, type]] | None = None,
) -> list[ContractViolation]

Validate type contracts across all pipeline edges. If tasks have type annotations, those are used as implicit contracts. You can supply additional explicit contracts via extra_contracts.

ParameterTypeDefaultDescription
extra_contractsdict[str, tuple[dict | None, type]] | NoneNoneAdditional contracts keyed by task name. Each value is (inputs_dict, output_type).

Returns: list[ContractViolation] — empty if all contracts are consistent.

from dagron import Pipeline, task

@task
def fetch() -> list:
return [1, 2, 3]

@task
def process(fetch: list) -> dict:
return {"data": fetch}

pipeline = Pipeline(tasks=[fetch, process])
violations = pipeline.validate_contracts()
assert len(violations) == 0

Complete Example

A realistic data pipeline with fan-out, fan-in, callbacks, and tracing:

from dagron import Pipeline, task, ExecutionCallbacks

@task
def api_source():
return {"source": "api", "rows": 100}

@task
def db_source():
return {"source": "db", "rows": 250}

@task
def clean_api(api_source):
return {**api_source, "cleaned": True}

@task
def clean_db(db_source):
return {**db_source, "cleaned": True}

@task
def merge(clean_api, clean_db):
total = clean_api["rows"] + clean_db["rows"]
return {"total_rows": total, "sources": 2}

@task
def publish(merge):
return f"Published {merge['total_rows']} rows from {merge['sources']} sources"

pipeline = Pipeline(
tasks=[api_source, db_source, clean_api, clean_db, merge, publish],
name="etl-pipeline",
)

# Inspect the generated DAG
print(pipeline.dag.to_mermaid())
print(f"Parallelism levels: {pipeline.dag.topological_levels()}")

# Execute with callbacks and tracing
result = pipeline.execute(
max_workers=4,
callbacks=ExecutionCallbacks(
on_start=lambda n: print(f" Starting {n}..."),
on_complete=lambda n, r: print(f" Finished {n}"),
),
enable_tracing=True,
)

print(f"\n{result.node_results['publish'].result}")
# Published 350 rows from 2 sources

  • DAGExecutor — the lower-level thread-pool executor used internally.
  • DAGBuilder — the builder that the Pipeline constructs behind the scenes.
  • Contracts — type contract validation guide.
  • Tracing & Profiling — visualizing pipeline traces.