Skip to main content

Contracts

When DAGs grow large, it becomes easy for the output type of one node to drift from what a downstream node expects. dagron's contract system lets you declare input and output types for each node and validate them at build time -- before any task runs. This catches type mismatches early, similar to how a compiler checks function signatures.

The contract validator catches that 'train_model' outputs float but 'report' expects str as input.


Core Classes

ClassRole
NodeContractDeclares input types (per dependency) and output type for a single node.
ContractValidatorValidates contracts across all edges in a DAG.
ContractViolationDescribes a single type mismatch: from_node, to_node, and a human-readable message.
extract_contractsAuto-extract contracts from a Pipeline's @task functions using typing.get_type_hints.
validate_contractsConvenience function: extract + validate in one call.

Defining Contracts Manually

NodeContract

A NodeContract declares what types a node expects from its dependencies and what type it produces:

from dagron.contracts import NodeContract

# This node expects its "fetch_data" dependency to provide a list,
# and it outputs a dict.
clean_contract = NodeContract(
inputs={"fetch_data": list},
output=dict,
)

# This node expects "clean_data" to provide a dict,
# and outputs a float.
train_contract = NodeContract(
inputs={"clean_data": dict},
output=float,
)

The inputs dict maps dependency node names to their expected types. The output is the type this node produces.

The special type object acts as a wildcard (equivalent to Any) -- it matches any type.

Validating

Create a ContractValidator with a DAG and contracts, then call validate():

import dagron
from dagron.contracts import ContractValidator, NodeContract

dag = (
dagron.DAG.builder()
.add_node("fetch_data")
.add_node("clean_data")
.add_node("train_model")
.add_node("generate_report")
.add_edge("fetch_data", "clean_data")
.add_edge("clean_data", "train_model")
.add_edge("train_model", "generate_report")
.build()
)

contracts = {
"fetch_data": NodeContract(output=list),
"clean_data": NodeContract(inputs={"fetch_data": list}, output=dict),
"train_model": NodeContract(inputs={"clean_data": dict}, output=float),
"generate_report": NodeContract(inputs={"train_model": str}, output=str),
# ^^^ BUG: should be float
}

validator = ContractValidator(dag, contracts)
violations = validator.validate()

for v in violations:
print(f" {v.from_node} -> {v.to_node}: {v.message}")

Output:

  train_model -> generate_report: Type mismatch on edge train_model -> generate_report:
producer outputs float, but consumer expects str

How Validation Works

For every edge (u, v) in the DAG, the validator:

  1. Looks up v's contract to find the expected input type for dependency u.
  2. Looks up u's contract to find its declared output type.
  3. Checks compatibility using issubclass(actual_output, expected_input).

If the output type is not a subclass of the expected input type, a ContractViolation is recorded.

Type Compatibility Rules

  • object always matches (wildcard).
  • Standard Python inheritance works: if B is a subclass of A, then B satisfies an A contract.
  • Generic type aliases (e.g., list[int]) fall back to True if issubclass raises TypeError.
from dagron.contracts import NodeContract

# int is a subclass of object -- always valid
NodeContract(inputs={"dep": object}, output=int)

# bool is a subclass of int -- valid
NodeContract(inputs={"dep": int}, output=bool)

# str is NOT a subclass of int -- violation
NodeContract(inputs={"dep": int}, output=str)

ContractViolation

Each violation is a frozen dataclass with three fields:

@dataclass(frozen=True)
class ContractViolation:
from_node: str # the upstream node
to_node: str # the downstream node
message: str # human-readable description

You can use violations to fail a CI check:

violations = validator.validate()
if violations:
for v in violations:
print(f"ERROR: {v.message}")
raise SystemExit(1) # fail the build

Auto-Extracting Contracts from Pipelines

If you use dagron's Pipeline / @task decorator pattern with type annotations, you can auto-extract contracts from the function signatures:

import dagron
from dagron.contracts import extract_contracts, validate_contracts

@dagron.task(dependencies=[])
def fetch_data() -> list:
return [1, 2, 3]

@dagron.task(dependencies=["fetch_data"])
def clean_data(fetch_data: list) -> dict:
return {"values": fetch_data}

@dagron.task(dependencies=["clean_data"])
def train_model(clean_data: dict) -> float:
return 0.95

@dagron.task(dependencies=["train_model"])
def generate_report(train_model: float) -> str:
return f"Accuracy: {train_model}"

pipeline = dagron.Pipeline([fetch_data, clean_data, train_model, generate_report])

extract_contracts()

Reads typing.get_type_hints() from each task function to build NodeContract instances:

contracts = extract_contracts(pipeline)

for name, contract in contracts.items():
print(f" {name}: inputs={contract.inputs}, output={contract.output}")

Output:

  fetch_data: inputs={}, output=<class 'list'>
clean_data: inputs={'fetch_data': <class 'list'>}, output=<class 'dict'>
train_model: inputs={'clean_data': <class 'dict'>}, output=<class 'float'>
generate_report: inputs={'train_model': <class 'float'>}, output=<class 'str'>

validate_contracts()

One-liner that extracts and validates in a single call:

violations = validate_contracts(pipeline)
if not violations:
print("All contracts valid!")

You can also provide extra manually-defined contracts that override the auto-extracted ones:

violations = validate_contracts(
pipeline,
extra_contracts={
"fetch_data": NodeContract(output=dict), # override
},
)

Using Contracts with DAGBuilder

You can attach contracts during DAG construction via the builder pattern:

dag = (
dagron.DAG.builder()
.add_node("extract")
.add_node("transform")
.add_node("load")
.add_edge("extract", "transform")
.add_edge("transform", "load")
.contract("extract", NodeContract(output=list))
.contract("transform", NodeContract(inputs={"extract": list}, output=dict))
.contract("load", NodeContract(inputs={"transform": dict}, output=bool))
.build()
)

Complete Validation Example

Here is a full example that demonstrates catching a type mismatch in a data pipeline:

import dagron
from dagron.contracts import ContractValidator, ContractViolation, NodeContract

# Build the DAG
dag = (
dagron.DAG.builder()
.add_node("read_csv")
.add_node("parse_dates")
.add_node("compute_stats")
.add_node("render_chart")
.add_node("send_email")
.add_edge("read_csv", "parse_dates")
.add_edge("parse_dates", "compute_stats")
.add_edge("compute_stats", "render_chart")
.add_edge("render_chart", "send_email")
.build()
)

# Define contracts
contracts = {
"read_csv": NodeContract(output=list), # list of rows
"parse_dates": NodeContract(inputs={"read_csv": list}, output=list),
"compute_stats": NodeContract(inputs={"parse_dates": list}, output=dict),
"render_chart": NodeContract(inputs={"compute_stats": dict}, output=bytes), # PNG bytes
"send_email": NodeContract(inputs={"render_chart": str}, output=bool),
# ^^^ BUG: chart is bytes, not str
}

# Validate
validator = ContractValidator(dag, contracts)
violations = validator.validate()

if violations:
print(f"Found {len(violations)} contract violation(s):")
for v in violations:
print(f" {v.message}")
else:
print("All contracts valid.")

Output:

Found 1 contract violation(s):
Type mismatch on edge render_chart -> send_email:
producer outputs bytes, but consumer expects str

Contracts in CI/CD

Add contract validation as a pre-execution check in your CI pipeline:

def validate_pipeline(pipeline):
"""Run as part of CI -- fail if contracts are violated."""
violations = validate_contracts(pipeline)
if violations:
print("Contract violations detected:")
for v in violations:
print(f" ERROR: {v.message}")
raise SystemExit(1)
print("All contracts valid.")

# In your CI script:
validate_pipeline(my_pipeline)

This catches type drift when someone changes a task's return type without updating the downstream consumer.


Partial Contracts

You do not need to define contracts for every node. Nodes without contracts are silently skipped during validation. This lets you adopt contracts incrementally:

contracts = {
# Only validate the critical path
"train_model": NodeContract(inputs={"features": dict}, output=float),
"deploy": NodeContract(inputs={"train_model": float}, output=bool),
}

validator = ContractValidator(dag, contracts)
violations = validator.validate() # only checks edges between contracted nodes

Wildcard Types

Use object as a wildcard that accepts any type:

# This node accepts anything from its dependency
contracts = {
"logger": NodeContract(inputs={"any_node": object}, output=object),
}

This is useful for utility nodes (loggers, monitors) that process arbitrary data.


Best Practices

  1. Add type annotations to all @task functions. This enables extract_contracts() to work automatically.

  2. Run validate_contracts() in CI. Catch type mismatches before they cause runtime errors.

  3. Start with the critical path. You do not need full coverage immediately -- contract a few key nodes and expand over time.

  4. Use concrete types, not object. The more specific your contracts, the more errors they catch. Reserve object for truly polymorphic nodes.

  5. Combine with DataFrames. For pandas/polars pipelines, use DataFrameSchema for column-level validation and NodeContract for edge-level type checking.