DAGBuilder
The DAGBuilder provides a fluent, chainable API for constructing DAGs. Every
mutating method returns self, so you can chain calls together. The builder
validates the graph on .build(), ensuring you never receive an invalid DAG.
Obtain a builder via DAG.builder():
import dagron
dag = (
dagron.DAG.builder()
.add_node("extract")
.add_node("transform")
.add_node("load")
.add_edge("extract", "transform")
.add_edge("transform", "load")
.build()
)
See the Building DAGs guide for construction patterns and best practices.
Constructor
class DAGBuilder()
Create a new empty builder. In most cases you will use DAG.builder() instead
of instantiating this class directly.
# Preferred
builder = dagron.DAG.builder()
# Also valid
builder = dagron.DAGBuilder()
Methods
add_node
def add_node(
name: str,
payload: Any = None,
metadata: dict | None = None,
) -> DAGBuilder
Add a node to the graph under construction. Returns self for chaining.
| Parameter | Type | Default | Description |
|---|---|---|---|
name | str | required | Unique name for the node. |
payload | Any | None | Arbitrary Python object attached to the node. |
metadata | dict | None | None | Optional metadata dictionary for labels, tags, annotations, etc. |
Raises:
DuplicateNodeError— if a node with the same name has already been added.
builder = (
dagron.DAG.builder()
.add_node("fetch", payload={"url": "https://api.example.com"})
.add_node("parse", metadata={"team": "data-eng"})
.add_node("store")
)
add_edge
def add_edge(
from_node: str,
to_node: str,
weight: float | None = None,
label: str | None = None,
) -> DAGBuilder
Add a directed edge from from_node to to_node. Returns self for chaining.
The builder defers cycle detection to .build(), so you can add edges in any
order.
| Parameter | Type | Default | Description |
|---|---|---|---|
from_node | str | required | Name of the source node (must have been added already or be added before build). |
to_node | str | required | Name of the target node. |
weight | float | None | None | Optional numeric weight for cost-aware scheduling. |
label | str | None | None | Optional label for visualization and debugging. |
builder = (
dagron.DAG.builder()
.add_node("a")
.add_node("b")
.add_node("c")
.add_edge("a", "b", weight=2.0)
.add_edge("b", "c", label="transform-to-load")
)
contract
def contract(
node: str,
inputs: dict[str, type] | None = None,
output: type = object,
) -> DAGBuilder
Attach a type contract to a node. Contracts declare the expected input types
(keyed by predecessor name) and the output type. Use validate_contracts() to
check all contracts before building, or let build() validate them
automatically.
| Parameter | Type | Default | Description |
|---|---|---|---|
node | str | required | Name of the node to attach the contract to. |
inputs | dict[str, type] | None | None | Expected input types keyed by predecessor node name. |
output | type | object | Expected output type for this node. |
Raises:
NodeNotFoundError— if the specified node has not been added.
builder = (
dagron.DAG.builder()
.add_node("fetch")
.add_node("parse")
.add_node("validate")
.add_edge("fetch", "parse")
.add_edge("parse", "validate")
.contract("fetch", output=list)
.contract("parse", inputs={"fetch": list}, output=dict)
.contract("validate", inputs={"parse": dict}, output=bool)
)
See the Contracts guide for more details on type contracts.
validate_contracts
def validate_contracts() -> list[ContractViolation]
Check all attached contracts for consistency (e.g., output type of A matches
the input type expected by B on the A -> B edge). Returns a list of
ContractViolation objects describing any mismatches. An empty list means all
contracts are consistent.
Returns: list[ContractViolation] — a list of violations, empty if valid.
violations = builder.validate_contracts()
if violations:
for v in violations:
print(f"Contract violation: {v}")
else:
print("All contracts consistent.")
build
def build() -> DAG
Finalize and validate the graph. Returns a fully constructed DAG instance. This method performs:
- Node existence checks — every edge endpoint must reference an existing node.
- Cycle detection — raises
CycleErrorif the graph contains a cycle. - Contract validation — if any contracts were attached, they are validated.
Returns: DAG — the validated directed acyclic graph.
Raises:
CycleError— if the graph contains a cycle.NodeNotFoundError— if an edge references a non-existent node.
dag = (
dagron.DAG.builder()
.add_node("a")
.add_node("b")
.add_edge("a", "b")
.build()
)
print(dag.node_count()) # 2
print(dag.edge_count()) # 1
Complete Example
A full builder workflow with contracts, metadata, and weighted edges:
import dagron
dag = (
dagron.DAG.builder()
# Nodes with payloads and metadata
.add_node("ingest", payload={"source": "s3"}, metadata={"tier": "bronze"})
.add_node("clean", metadata={"tier": "silver"})
.add_node("enrich", metadata={"tier": "silver"})
.add_node("aggregate", metadata={"tier": "gold"})
.add_node("publish", metadata={"tier": "gold"})
# Weighted edges for cost-aware scheduling
.add_edge("ingest", "clean", weight=1.0)
.add_edge("ingest", "enrich", weight=1.0)
.add_edge("clean", "aggregate", weight=3.0)
.add_edge("enrich", "aggregate", weight=2.0)
.add_edge("aggregate", "publish", weight=0.5)
# Type contracts
.contract("ingest", output=list)
.contract("clean", inputs={"ingest": list}, output=list)
.contract("enrich", inputs={"ingest": list}, output=list)
.contract("aggregate", inputs={"clean": list, "enrich": list}, output=dict)
.contract("publish", inputs={"aggregate": dict}, output=bool)
.build()
)
print(dag.node_count()) # 5
print(dag.edge_count()) # 5
print(dag.topological_levels()) # [[ingest], [clean, enrich], [aggregate], [publish]]
ContractViolation
class ContractViolation
Returned by DAGBuilder.validate_contracts() when a type mismatch is found.
| Property | Type | Description |
|---|---|---|
from_node | str | The upstream node whose output type does not match. |
to_node | str | The downstream node whose expected input type does not match. |
expected | type | The type expected by the downstream node. |
actual | type | The type declared by the upstream node's output. |
violations = builder.validate_contracts()
for v in violations:
print(f"{v.from_node} -> {v.to_node}: expected {v.expected}, got {v.actual}")