Skip to main content

DAGBuilder

The DAGBuilder provides a fluent, chainable API for constructing DAGs. Every mutating method returns self, so you can chain calls together. The builder validates the graph on .build(), ensuring you never receive an invalid DAG.

Obtain a builder via DAG.builder():

import dagron

dag = (
dagron.DAG.builder()
.add_node("extract")
.add_node("transform")
.add_node("load")
.add_edge("extract", "transform")
.add_edge("transform", "load")
.build()
)

See the Building DAGs guide for construction patterns and best practices.


Constructor

DAGBuilder
class DAGBuilder()

Create a new empty builder. In most cases you will use DAG.builder() instead of instantiating this class directly.

# Preferred
builder = dagron.DAG.builder()

# Also valid
builder = dagron.DAGBuilder()

Methods

add_node

DAGBuilder.add_node
def add_node(
name: str,
payload: Any = None,
metadata: dict | None = None,
) -> DAGBuilder

Add a node to the graph under construction. Returns self for chaining.

ParameterTypeDefaultDescription
namestrrequiredUnique name for the node.
payloadAnyNoneArbitrary Python object attached to the node.
metadatadict | NoneNoneOptional metadata dictionary for labels, tags, annotations, etc.

Raises:

  • DuplicateNodeError — if a node with the same name has already been added.
builder = (
dagron.DAG.builder()
.add_node("fetch", payload={"url": "https://api.example.com"})
.add_node("parse", metadata={"team": "data-eng"})
.add_node("store")
)

add_edge

DAGBuilder.add_edge
def add_edge(
from_node: str,
to_node: str,
weight: float | None = None,
label: str | None = None,
) -> DAGBuilder

Add a directed edge from from_node to to_node. Returns self for chaining. The builder defers cycle detection to .build(), so you can add edges in any order.

ParameterTypeDefaultDescription
from_nodestrrequiredName of the source node (must have been added already or be added before build).
to_nodestrrequiredName of the target node.
weightfloat | NoneNoneOptional numeric weight for cost-aware scheduling.
labelstr | NoneNoneOptional label for visualization and debugging.
builder = (
dagron.DAG.builder()
.add_node("a")
.add_node("b")
.add_node("c")
.add_edge("a", "b", weight=2.0)
.add_edge("b", "c", label="transform-to-load")
)

contract

DAGBuilder.contract
def contract(
node: str,
inputs: dict[str, type] | None = None,
output: type = object,
) -> DAGBuilder

Attach a type contract to a node. Contracts declare the expected input types (keyed by predecessor name) and the output type. Use validate_contracts() to check all contracts before building, or let build() validate them automatically.

ParameterTypeDefaultDescription
nodestrrequiredName of the node to attach the contract to.
inputsdict[str, type] | NoneNoneExpected input types keyed by predecessor node name.
outputtypeobjectExpected output type for this node.

Raises:

  • NodeNotFoundError — if the specified node has not been added.
builder = (
dagron.DAG.builder()
.add_node("fetch")
.add_node("parse")
.add_node("validate")
.add_edge("fetch", "parse")
.add_edge("parse", "validate")
.contract("fetch", output=list)
.contract("parse", inputs={"fetch": list}, output=dict)
.contract("validate", inputs={"parse": dict}, output=bool)
)

See the Contracts guide for more details on type contracts.


validate_contracts

DAGBuilder.validate_contracts
def validate_contracts() -> list[ContractViolation]

Check all attached contracts for consistency (e.g., output type of A matches the input type expected by B on the A -> B edge). Returns a list of ContractViolation objects describing any mismatches. An empty list means all contracts are consistent.

Returns: list[ContractViolation] — a list of violations, empty if valid.

violations = builder.validate_contracts()
if violations:
for v in violations:
print(f"Contract violation: {v}")
else:
print("All contracts consistent.")

build

DAGBuilder.build
def build() -> DAG

Finalize and validate the graph. Returns a fully constructed DAG instance. This method performs:

  1. Node existence checks — every edge endpoint must reference an existing node.
  2. Cycle detection — raises CycleError if the graph contains a cycle.
  3. Contract validation — if any contracts were attached, they are validated.

Returns: DAG — the validated directed acyclic graph.

Raises:

  • CycleError — if the graph contains a cycle.
  • NodeNotFoundError — if an edge references a non-existent node.
dag = (
dagron.DAG.builder()
.add_node("a")
.add_node("b")
.add_edge("a", "b")
.build()
)

print(dag.node_count()) # 2
print(dag.edge_count()) # 1

Complete Example

A full builder workflow with contracts, metadata, and weighted edges:

import dagron

dag = (
dagron.DAG.builder()
# Nodes with payloads and metadata
.add_node("ingest", payload={"source": "s3"}, metadata={"tier": "bronze"})
.add_node("clean", metadata={"tier": "silver"})
.add_node("enrich", metadata={"tier": "silver"})
.add_node("aggregate", metadata={"tier": "gold"})
.add_node("publish", metadata={"tier": "gold"})

# Weighted edges for cost-aware scheduling
.add_edge("ingest", "clean", weight=1.0)
.add_edge("ingest", "enrich", weight=1.0)
.add_edge("clean", "aggregate", weight=3.0)
.add_edge("enrich", "aggregate", weight=2.0)
.add_edge("aggregate", "publish", weight=0.5)

# Type contracts
.contract("ingest", output=list)
.contract("clean", inputs={"ingest": list}, output=list)
.contract("enrich", inputs={"ingest": list}, output=list)
.contract("aggregate", inputs={"clean": list, "enrich": list}, output=dict)
.contract("publish", inputs={"aggregate": dict}, output=bool)

.build()
)

print(dag.node_count()) # 5
print(dag.edge_count()) # 5
print(dag.topological_levels()) # [[ingest], [clean, enrich], [aggregate], [publish]]

ContractViolation

ContractViolation
class ContractViolation

Returned by DAGBuilder.validate_contracts() when a type mismatch is found.

PropertyTypeDescription
from_nodestrThe upstream node whose output type does not match.
to_nodestrThe downstream node whose expected input type does not match.
expectedtypeThe type expected by the downstream node.
actualtypeThe type declared by the upstream node's output.
violations = builder.validate_contracts()
for v in violations:
print(f"{v.from_node} -> {v.to_node}: expected {v.expected}, got {v.actual}")