DataFrames
Data pipelines frequently pass DataFrames between nodes. A missing column, a wrong dtype, or an unexpectedly empty table can cascade through the pipeline and produce silent corruption. dagron's DataFrame integration lets you define schemas at edge boundaries and validate DataFrames automatically -- catching issues at the source instead of downstream.
The system works with both pandas and polars DataFrames. No additional dependencies are required beyond what you already use.
Each node has a schema that its output DataFrame must satisfy.
Core Classes
| Class | Role |
|---|---|
DataFramePipeline | Wraps a DAG with schema definitions. Validates execution results against schemas. |
DataFrameSchema | Defines the expected shape of a DataFrame: columns, row count bounds. |
ColumnSchema | Defines a single column: name, dtype, nullable, required. |
SchemaViolation | Describes a single validation failure: node name and message. |
validate_schema | Standalone function to validate any DataFrame against a schema. |
Defining Schemas
ColumnSchema
Each column is described by a ColumnSchema:
from dagron.dataframe import ColumnSchema
col = ColumnSchema(
name="user_id",
dtype="int", # substring match against the actual dtype string
nullable=False, # reject null values
required=True, # column must be present (default)
)
| Parameter | Type | Default | Description |
|---|---|---|---|
name | str | -- | Column name (exact match). |
dtype | str | None | None | Expected dtype as a substring. "int" matches int64, Int64, etc. |
nullable | bool | True | If False, the column must contain no null/NaN values. |
required | bool | True | If True, the column must exist in the DataFrame. |
The dtype check uses substring matching, so "int" matches both pandas int64 and polars Int64. This provides cross-framework compatibility without requiring exact dtype strings.
DataFrameSchema
Group column schemas and optional row count bounds into a DataFrameSchema:
from dagron.dataframe import DataFrameSchema, ColumnSchema
schema = DataFrameSchema(
columns=[
ColumnSchema("id", dtype="int", nullable=False),
ColumnSchema("name", dtype="str"),
ColumnSchema("email", dtype="str", nullable=False),
ColumnSchema("age", dtype="int", required=False), # optional column
],
min_rows=1, # at least 1 row
max_rows=1000000, # at most 1M rows
)
Quick Start
import dagron
import pandas as pd
from dagron.dataframe import (
ColumnSchema,
DataFramePipeline,
DataFrameSchema,
)
# 1. Build the DAG
dag = (
dagron.DAG.builder()
.add_node("extract")
.add_node("clean")
.add_node("aggregate")
.add_edge("extract", "clean")
.add_edge("clean", "aggregate")
.build()
)
# 2. Define schemas for each node's output
schemas = {
"extract": DataFrameSchema(
columns=[
ColumnSchema("user_id", dtype="int", nullable=False),
ColumnSchema("name", dtype="str"),
ColumnSchema("email", dtype="str"),
],
min_rows=1,
),
"clean": DataFrameSchema(
columns=[
ColumnSchema("user_id", dtype="int", nullable=False),
ColumnSchema("name", dtype="str", nullable=False),
ColumnSchema("email", dtype="str", nullable=False),
],
),
"aggregate": DataFrameSchema(
columns=[
ColumnSchema("domain", dtype="str"),
ColumnSchema("count", dtype="int"),
],
),
}
# 3. Define tasks
tasks = {
"extract": lambda: pd.DataFrame({
"user_id": [1, 2, 3],
"name": ["Alice", "Bob", None],
"email": ["alice@a.com", "bob@b.com", "charlie@c.com"],
}),
"clean": lambda: pd.DataFrame({
"user_id": [1, 2, 3],
"name": ["Alice", "Bob", "Charlie"],
"email": ["alice@a.com", "bob@b.com", "charlie@c.com"],
}),
"aggregate": lambda: pd.DataFrame({
"domain": ["a.com", "b.com", "c.com"],
"count": [1, 1, 1],
}),
}
# 4. Execute the DAG
executor = dagron.DAGExecutor(dag)
result = executor.execute(tasks)
# 5. Validate results against schemas
pipeline = DataFramePipeline(dag, schemas)
violations = pipeline.validate_result(result)
if violations:
for v in violations:
print(f" [{v.node_name}] {v.message}")
else:
print("All schemas valid!")
Validation Rules
The schema validator checks four things for each node:
1. Required Columns
If a column is required=True (the default) and is missing from the DataFrame:
schema = DataFrameSchema(columns=[ColumnSchema("missing_col", required=True)])
# Violation: "Missing required column 'missing_col'"
2. Data Types
If a dtype is specified, the actual dtype must contain the expected string as a substring:
schema = DataFrameSchema(columns=[ColumnSchema("age", dtype="int")])
# pandas int64 -> "int64" contains "int" -> valid
# pandas object -> "object" does NOT contain "int" -> violation
This means dtype="int" matches int8, int16, int32, int64, Int64, UInt32, etc.
3. Null Values
If nullable=False, the column must not contain any null/NaN values:
schema = DataFrameSchema(columns=[ColumnSchema("email", nullable=False)])
# DataFrame with email = ["alice@a.com", None, "bob@b.com"]
# Violation: "Column 'email' has null values but nullable=False"
4. Row Count Bounds
schema = DataFrameSchema(min_rows=1, max_rows=10000)
# Empty DataFrame
# Violation: "Expected at least 1 rows, got 0"
# DataFrame with 20000 rows
# Violation: "Expected at most 10000 rows, got 20000"
Standalone Validation
Use validate_schema() to validate any DataFrame without a full pipeline:
import pandas as pd
from dagron.dataframe import validate_schema, DataFrameSchema, ColumnSchema
df = pd.DataFrame({
"id": [1, 2, 3],
"name": ["Alice", None, "Charlie"],
})
schema = DataFrameSchema(
columns=[
ColumnSchema("id", dtype="int", nullable=False),
ColumnSchema("name", dtype="str", nullable=False),
],
min_rows=1,
)
violations = validate_schema(df, schema, node_name="my_step")
for v in violations:
print(f" {v.message}")
# "Column 'name' has null values but nullable=False"
Polars Support
The same schemas work with polars DataFrames -- no changes needed:
import polars as pl
from dagron.dataframe import validate_schema, DataFrameSchema, ColumnSchema
df = pl.DataFrame({
"id": [1, 2, 3],
"name": ["Alice", "Bob", "Charlie"],
"score": [0.9, 0.8, 0.7],
})
schema = DataFrameSchema(
columns=[
ColumnSchema("id", dtype="Int", nullable=False), # polars uses "Int64"
ColumnSchema("name", dtype="Utf8"),
ColumnSchema("score", dtype="Float"),
],
)
violations = validate_schema(df, schema)
if not violations:
print("Valid!")
dagron detects the framework automatically by inspecting the object's module path. Both pandas.DataFrame and polars.DataFrame are supported.
Validating Individual Node Outputs
Use validate_value() on a DataFramePipeline to validate a single node's output:
pipeline = DataFramePipeline(dag, schemas)
df = pd.DataFrame({"user_id": [1, 2], "name": ["Alice", "Bob"]})
violations = pipeline.validate_value("extract", df)
if violations:
print("extract output is invalid:")
for v in violations:
print(f" {v.message}")
This is useful for validating intermediate results during development.
SchemaViolation
Each violation is a frozen dataclass:
@dataclass(frozen=True)
class SchemaViolation:
node_name: str # which node produced the invalid DataFrame
message: str # human-readable description
Complete pandas Pipeline Example
import dagron
import pandas as pd
from dagron.dataframe import (
ColumnSchema,
DataFramePipeline,
DataFrameSchema,
validate_schema,
)
# Build DAG
dag = (
dagron.DAG.builder()
.add_node("read_orders")
.add_node("filter_active")
.add_node("compute_revenue")
.add_node("top_customers")
.add_edge("read_orders", "filter_active")
.add_edge("filter_active", "compute_revenue")
.add_edge("compute_revenue", "top_customers")
.build()
)
# Define schemas
schemas = {
"read_orders": DataFrameSchema(
columns=[
ColumnSchema("order_id", dtype="int", nullable=False),
ColumnSchema("customer_id", dtype="int", nullable=False),
ColumnSchema("amount", dtype="float"),
ColumnSchema("status", dtype="str"),
],
min_rows=1,
),
"filter_active": DataFrameSchema(
columns=[
ColumnSchema("order_id", dtype="int", nullable=False),
ColumnSchema("customer_id", dtype="int", nullable=False),
ColumnSchema("amount", dtype="float", nullable=False),
],
),
"compute_revenue": DataFrameSchema(
columns=[
ColumnSchema("customer_id", dtype="int", nullable=False),
ColumnSchema("total_revenue", dtype="float", nullable=False),
],
),
"top_customers": DataFrameSchema(
columns=[
ColumnSchema("customer_id", dtype="int"),
ColumnSchema("total_revenue", dtype="float"),
ColumnSchema("rank", dtype="int"),
],
max_rows=100, # top 100
),
}
# Tasks
def read_orders():
return pd.DataFrame({
"order_id": [1, 2, 3, 4, 5],
"customer_id": [101, 102, 101, 103, 102],
"amount": [50.0, 75.0, 30.0, 100.0, 45.0],
"status": ["active", "active", "cancelled", "active", "active"],
})
def filter_active():
df = read_orders()
return df[df["status"] == "active"][["order_id", "customer_id", "amount"]]
def compute_revenue():
df = filter_active()
return df.groupby("customer_id")["amount"].sum().reset_index().rename(
columns={"amount": "total_revenue"}
)
def top_customers():
df = compute_revenue()
df = df.sort_values("total_revenue", ascending=False).head(100)
df["rank"] = range(1, len(df) + 1)
return df
# Execute and validate
executor = dagron.DAGExecutor(dag)
result = executor.execute({
"read_orders": read_orders,
"filter_active": filter_active,
"compute_revenue": compute_revenue,
"top_customers": top_customers,
})
pipeline = DataFramePipeline(dag, schemas)
violations = pipeline.validate_result(result)
if violations:
print(f"{len(violations)} schema violation(s):")
for v in violations:
print(f" [{v.node_name}] {v.message}")
else:
print("All schemas valid!")
Combining with Contracts
Use NodeContract for type-level checking (ensuring nodes produce DataFrames) and DataFrameSchema for content-level checking (columns, dtypes, nulls):
from dagron.contracts import NodeContract, ContractValidator
# Type-level: ensure the node outputs a DataFrame
import pandas as pd
contracts = {
"read_orders": NodeContract(output=pd.DataFrame),
"filter_active": NodeContract(inputs={"read_orders": pd.DataFrame}, output=pd.DataFrame),
"compute_revenue": NodeContract(inputs={"filter_active": pd.DataFrame}, output=pd.DataFrame),
}
# Validate types
type_violations = ContractValidator(dag, contracts).validate()
# Content-level: validate column schemas
content_violations = DataFramePipeline(dag, schemas).validate_result(result)
Non-DataFrame Handling
If a node's output is not a pandas or polars DataFrame, the validator reports it:
schema = DataFrameSchema(columns=[ColumnSchema("id")])
violations = validate_schema("not a dataframe", schema, node_name="bad_node")
# SchemaViolation("bad_node", "Expected DataFrame, got str")
Best Practices
-
Define schemas for every node that produces a DataFrame. This creates a complete validation boundary at every step.
-
Use
nullable=Falsefor critical columns. Null values in ID or key columns are a common source of downstream errors. -
Set
min_rows=1for extract nodes. Catch empty result sets immediately instead of letting them flow through. -
Use
max_rowsfor output nodes. Prevent accidentally returning millions of rows to a reporting step. -
Run validation in CI. Execute the pipeline with test data and validate schemas as part of your test suite.
-
Use substring dtype matching. Specify
"int"instead of"int64"for cross-framework compatibility between pandas and polars.
Related
- API Reference: DataFrames -- full API documentation.
- Contracts -- type-level edge validation.
- Executing Tasks -- the execution model that produces
ExecutionResult. - Error Handling -- handling violations as part of the error flow.