Contracts
When DAGs grow large, it becomes easy for the output type of one node to drift from what a downstream node expects. dagron's contract system lets you declare input and output types for each node and validate them at build time -- before any task runs. This catches type mismatches early, similar to how a compiler checks function signatures.
The contract validator catches that 'train_model' outputs float but 'report' expects str as input.
Core Classes
| Class | Role |
|---|---|
NodeContract | Declares input types (per dependency) and output type for a single node. |
ContractValidator | Validates contracts across all edges in a DAG. |
ContractViolation | Describes a single type mismatch: from_node, to_node, and a human-readable message. |
extract_contracts | Auto-extract contracts from a Pipeline's @task functions using typing.get_type_hints. |
validate_contracts | Convenience function: extract + validate in one call. |
Defining Contracts Manually
NodeContract
A NodeContract declares what types a node expects from its dependencies and what type it produces:
from dagron.contracts import NodeContract
# This node expects its "fetch_data" dependency to provide a list,
# and it outputs a dict.
clean_contract = NodeContract(
inputs={"fetch_data": list},
output=dict,
)
# This node expects "clean_data" to provide a dict,
# and outputs a float.
train_contract = NodeContract(
inputs={"clean_data": dict},
output=float,
)
The inputs dict maps dependency node names to their expected types. The output is the type this node produces.
The special type object acts as a wildcard (equivalent to Any) -- it matches any type.
Validating
Create a ContractValidator with a DAG and contracts, then call validate():
import dagron
from dagron.contracts import ContractValidator, NodeContract
dag = (
dagron.DAG.builder()
.add_node("fetch_data")
.add_node("clean_data")
.add_node("train_model")
.add_node("generate_report")
.add_edge("fetch_data", "clean_data")
.add_edge("clean_data", "train_model")
.add_edge("train_model", "generate_report")
.build()
)
contracts = {
"fetch_data": NodeContract(output=list),
"clean_data": NodeContract(inputs={"fetch_data": list}, output=dict),
"train_model": NodeContract(inputs={"clean_data": dict}, output=float),
"generate_report": NodeContract(inputs={"train_model": str}, output=str),
# ^^^ BUG: should be float
}
validator = ContractValidator(dag, contracts)
violations = validator.validate()
for v in violations:
print(f" {v.from_node} -> {v.to_node}: {v.message}")
Output:
train_model -> generate_report: Type mismatch on edge train_model -> generate_report:
producer outputs float, but consumer expects str
How Validation Works
For every edge (u, v) in the DAG, the validator:
- Looks up
v's contract to find the expected input type for dependencyu. - Looks up
u's contract to find its declared output type. - Checks compatibility using
issubclass(actual_output, expected_input).
If the output type is not a subclass of the expected input type, a ContractViolation is recorded.
Type Compatibility Rules
objectalways matches (wildcard).- Standard Python inheritance works: if
Bis a subclass ofA, thenBsatisfies anAcontract. - Generic type aliases (e.g.,
list[int]) fall back toTrueifissubclassraisesTypeError.
from dagron.contracts import NodeContract
# int is a subclass of object -- always valid
NodeContract(inputs={"dep": object}, output=int)
# bool is a subclass of int -- valid
NodeContract(inputs={"dep": int}, output=bool)
# str is NOT a subclass of int -- violation
NodeContract(inputs={"dep": int}, output=str)
ContractViolation
Each violation is a frozen dataclass with three fields:
@dataclass(frozen=True)
class ContractViolation:
from_node: str # the upstream node
to_node: str # the downstream node
message: str # human-readable description
You can use violations to fail a CI check:
violations = validator.validate()
if violations:
for v in violations:
print(f"ERROR: {v.message}")
raise SystemExit(1) # fail the build
Auto-Extracting Contracts from Pipelines
If you use dagron's Pipeline / @task decorator pattern with type annotations, you can auto-extract contracts from the function signatures:
import dagron
from dagron.contracts import extract_contracts, validate_contracts
@dagron.task(dependencies=[])
def fetch_data() -> list:
return [1, 2, 3]
@dagron.task(dependencies=["fetch_data"])
def clean_data(fetch_data: list) -> dict:
return {"values": fetch_data}
@dagron.task(dependencies=["clean_data"])
def train_model(clean_data: dict) -> float:
return 0.95
@dagron.task(dependencies=["train_model"])
def generate_report(train_model: float) -> str:
return f"Accuracy: {train_model}"
pipeline = dagron.Pipeline([fetch_data, clean_data, train_model, generate_report])
extract_contracts()
Reads typing.get_type_hints() from each task function to build NodeContract instances:
contracts = extract_contracts(pipeline)
for name, contract in contracts.items():
print(f" {name}: inputs={contract.inputs}, output={contract.output}")
Output:
fetch_data: inputs={}, output=<class 'list'>
clean_data: inputs={'fetch_data': <class 'list'>}, output=<class 'dict'>
train_model: inputs={'clean_data': <class 'dict'>}, output=<class 'float'>
generate_report: inputs={'train_model': <class 'float'>}, output=<class 'str'>
validate_contracts()
One-liner that extracts and validates in a single call:
violations = validate_contracts(pipeline)
if not violations:
print("All contracts valid!")
You can also provide extra manually-defined contracts that override the auto-extracted ones:
violations = validate_contracts(
pipeline,
extra_contracts={
"fetch_data": NodeContract(output=dict), # override
},
)
Using Contracts with DAGBuilder
You can attach contracts during DAG construction via the builder pattern:
dag = (
dagron.DAG.builder()
.add_node("extract")
.add_node("transform")
.add_node("load")
.add_edge("extract", "transform")
.add_edge("transform", "load")
.contract("extract", NodeContract(output=list))
.contract("transform", NodeContract(inputs={"extract": list}, output=dict))
.contract("load", NodeContract(inputs={"transform": dict}, output=bool))
.build()
)
Complete Validation Example
Here is a full example that demonstrates catching a type mismatch in a data pipeline:
import dagron
from dagron.contracts import ContractValidator, ContractViolation, NodeContract
# Build the DAG
dag = (
dagron.DAG.builder()
.add_node("read_csv")
.add_node("parse_dates")
.add_node("compute_stats")
.add_node("render_chart")
.add_node("send_email")
.add_edge("read_csv", "parse_dates")
.add_edge("parse_dates", "compute_stats")
.add_edge("compute_stats", "render_chart")
.add_edge("render_chart", "send_email")
.build()
)
# Define contracts
contracts = {
"read_csv": NodeContract(output=list), # list of rows
"parse_dates": NodeContract(inputs={"read_csv": list}, output=list),
"compute_stats": NodeContract(inputs={"parse_dates": list}, output=dict),
"render_chart": NodeContract(inputs={"compute_stats": dict}, output=bytes), # PNG bytes
"send_email": NodeContract(inputs={"render_chart": str}, output=bool),
# ^^^ BUG: chart is bytes, not str
}
# Validate
validator = ContractValidator(dag, contracts)
violations = validator.validate()
if violations:
print(f"Found {len(violations)} contract violation(s):")
for v in violations:
print(f" {v.message}")
else:
print("All contracts valid.")
Output:
Found 1 contract violation(s):
Type mismatch on edge render_chart -> send_email:
producer outputs bytes, but consumer expects str
Contracts in CI/CD
Add contract validation as a pre-execution check in your CI pipeline:
def validate_pipeline(pipeline):
"""Run as part of CI -- fail if contracts are violated."""
violations = validate_contracts(pipeline)
if violations:
print("Contract violations detected:")
for v in violations:
print(f" ERROR: {v.message}")
raise SystemExit(1)
print("All contracts valid.")
# In your CI script:
validate_pipeline(my_pipeline)
This catches type drift when someone changes a task's return type without updating the downstream consumer.
Partial Contracts
You do not need to define contracts for every node. Nodes without contracts are silently skipped during validation. This lets you adopt contracts incrementally:
contracts = {
# Only validate the critical path
"train_model": NodeContract(inputs={"features": dict}, output=float),
"deploy": NodeContract(inputs={"train_model": float}, output=bool),
}
validator = ContractValidator(dag, contracts)
violations = validator.validate() # only checks edges between contracted nodes
Wildcard Types
Use object as a wildcard that accepts any type:
# This node accepts anything from its dependency
contracts = {
"logger": NodeContract(inputs={"any_node": object}, output=object),
}
This is useful for utility nodes (loggers, monitors) that process arbitrary data.
Best Practices
-
Add type annotations to all
@taskfunctions. This enablesextract_contracts()to work automatically. -
Run
validate_contracts()in CI. Catch type mismatches before they cause runtime errors. -
Start with the critical path. You do not need full coverage immediately -- contract a few key nodes and expand over time.
-
Use concrete types, not
object. The more specific your contracts, the more errors they catch. Reserveobjectfor truly polymorphic nodes. -
Combine with DataFrames. For pandas/polars pipelines, use
DataFrameSchemafor column-level validation andNodeContractfor edge-level type checking.
Related
- API Reference: Contracts -- full API documentation.
- DataFrames -- schema validation for DataFrame pipelines.
- Building DAGs -- the builder
.contract()method. - Error Handling -- how violations integrate with the error flow.