Skip to main content

DataFrames

Data pipelines frequently pass DataFrames between nodes. A missing column, a wrong dtype, or an unexpectedly empty table can cascade through the pipeline and produce silent corruption. dagron's DataFrame integration lets you define schemas at edge boundaries and validate DataFrames automatically -- catching issues at the source instead of downstream.

The system works with both pandas and polars DataFrames. No additional dependencies are required beyond what you already use.

Each node has a schema that its output DataFrame must satisfy.


Core Classes

ClassRole
DataFramePipelineWraps a DAG with schema definitions. Validates execution results against schemas.
DataFrameSchemaDefines the expected shape of a DataFrame: columns, row count bounds.
ColumnSchemaDefines a single column: name, dtype, nullable, required.
SchemaViolationDescribes a single validation failure: node name and message.
validate_schemaStandalone function to validate any DataFrame against a schema.

Defining Schemas

ColumnSchema

Each column is described by a ColumnSchema:

from dagron.dataframe import ColumnSchema

col = ColumnSchema(
name="user_id",
dtype="int", # substring match against the actual dtype string
nullable=False, # reject null values
required=True, # column must be present (default)
)
ParameterTypeDefaultDescription
namestr--Column name (exact match).
dtypestr | NoneNoneExpected dtype as a substring. "int" matches int64, Int64, etc.
nullableboolTrueIf False, the column must contain no null/NaN values.
requiredboolTrueIf True, the column must exist in the DataFrame.

The dtype check uses substring matching, so "int" matches both pandas int64 and polars Int64. This provides cross-framework compatibility without requiring exact dtype strings.

DataFrameSchema

Group column schemas and optional row count bounds into a DataFrameSchema:

from dagron.dataframe import DataFrameSchema, ColumnSchema

schema = DataFrameSchema(
columns=[
ColumnSchema("id", dtype="int", nullable=False),
ColumnSchema("name", dtype="str"),
ColumnSchema("email", dtype="str", nullable=False),
ColumnSchema("age", dtype="int", required=False), # optional column
],
min_rows=1, # at least 1 row
max_rows=1000000, # at most 1M rows
)

Quick Start

import dagron
import pandas as pd
from dagron.dataframe import (
ColumnSchema,
DataFramePipeline,
DataFrameSchema,
)

# 1. Build the DAG
dag = (
dagron.DAG.builder()
.add_node("extract")
.add_node("clean")
.add_node("aggregate")
.add_edge("extract", "clean")
.add_edge("clean", "aggregate")
.build()
)

# 2. Define schemas for each node's output
schemas = {
"extract": DataFrameSchema(
columns=[
ColumnSchema("user_id", dtype="int", nullable=False),
ColumnSchema("name", dtype="str"),
ColumnSchema("email", dtype="str"),
],
min_rows=1,
),
"clean": DataFrameSchema(
columns=[
ColumnSchema("user_id", dtype="int", nullable=False),
ColumnSchema("name", dtype="str", nullable=False),
ColumnSchema("email", dtype="str", nullable=False),
],
),
"aggregate": DataFrameSchema(
columns=[
ColumnSchema("domain", dtype="str"),
ColumnSchema("count", dtype="int"),
],
),
}

# 3. Define tasks
tasks = {
"extract": lambda: pd.DataFrame({
"user_id": [1, 2, 3],
"name": ["Alice", "Bob", None],
"email": ["alice@a.com", "bob@b.com", "charlie@c.com"],
}),
"clean": lambda: pd.DataFrame({
"user_id": [1, 2, 3],
"name": ["Alice", "Bob", "Charlie"],
"email": ["alice@a.com", "bob@b.com", "charlie@c.com"],
}),
"aggregate": lambda: pd.DataFrame({
"domain": ["a.com", "b.com", "c.com"],
"count": [1, 1, 1],
}),
}

# 4. Execute the DAG
executor = dagron.DAGExecutor(dag)
result = executor.execute(tasks)

# 5. Validate results against schemas
pipeline = DataFramePipeline(dag, schemas)
violations = pipeline.validate_result(result)

if violations:
for v in violations:
print(f" [{v.node_name}] {v.message}")
else:
print("All schemas valid!")

Validation Rules

The schema validator checks four things for each node:

1. Required Columns

If a column is required=True (the default) and is missing from the DataFrame:

schema = DataFrameSchema(columns=[ColumnSchema("missing_col", required=True)])
# Violation: "Missing required column 'missing_col'"

2. Data Types

If a dtype is specified, the actual dtype must contain the expected string as a substring:

schema = DataFrameSchema(columns=[ColumnSchema("age", dtype="int")])

# pandas int64 -> "int64" contains "int" -> valid
# pandas object -> "object" does NOT contain "int" -> violation

This means dtype="int" matches int8, int16, int32, int64, Int64, UInt32, etc.

3. Null Values

If nullable=False, the column must not contain any null/NaN values:

schema = DataFrameSchema(columns=[ColumnSchema("email", nullable=False)])

# DataFrame with email = ["alice@a.com", None, "bob@b.com"]
# Violation: "Column 'email' has null values but nullable=False"

4. Row Count Bounds

schema = DataFrameSchema(min_rows=1, max_rows=10000)

# Empty DataFrame
# Violation: "Expected at least 1 rows, got 0"

# DataFrame with 20000 rows
# Violation: "Expected at most 10000 rows, got 20000"

Standalone Validation

Use validate_schema() to validate any DataFrame without a full pipeline:

import pandas as pd
from dagron.dataframe import validate_schema, DataFrameSchema, ColumnSchema

df = pd.DataFrame({
"id": [1, 2, 3],
"name": ["Alice", None, "Charlie"],
})

schema = DataFrameSchema(
columns=[
ColumnSchema("id", dtype="int", nullable=False),
ColumnSchema("name", dtype="str", nullable=False),
],
min_rows=1,
)

violations = validate_schema(df, schema, node_name="my_step")
for v in violations:
print(f" {v.message}")
# "Column 'name' has null values but nullable=False"

Polars Support

The same schemas work with polars DataFrames -- no changes needed:

import polars as pl
from dagron.dataframe import validate_schema, DataFrameSchema, ColumnSchema

df = pl.DataFrame({
"id": [1, 2, 3],
"name": ["Alice", "Bob", "Charlie"],
"score": [0.9, 0.8, 0.7],
})

schema = DataFrameSchema(
columns=[
ColumnSchema("id", dtype="Int", nullable=False), # polars uses "Int64"
ColumnSchema("name", dtype="Utf8"),
ColumnSchema("score", dtype="Float"),
],
)

violations = validate_schema(df, schema)
if not violations:
print("Valid!")

dagron detects the framework automatically by inspecting the object's module path. Both pandas.DataFrame and polars.DataFrame are supported.


Validating Individual Node Outputs

Use validate_value() on a DataFramePipeline to validate a single node's output:

pipeline = DataFramePipeline(dag, schemas)

df = pd.DataFrame({"user_id": [1, 2], "name": ["Alice", "Bob"]})
violations = pipeline.validate_value("extract", df)

if violations:
print("extract output is invalid:")
for v in violations:
print(f" {v.message}")

This is useful for validating intermediate results during development.


SchemaViolation

Each violation is a frozen dataclass:

@dataclass(frozen=True)
class SchemaViolation:
node_name: str # which node produced the invalid DataFrame
message: str # human-readable description

Complete pandas Pipeline Example

import dagron
import pandas as pd
from dagron.dataframe import (
ColumnSchema,
DataFramePipeline,
DataFrameSchema,
validate_schema,
)

# Build DAG
dag = (
dagron.DAG.builder()
.add_node("read_orders")
.add_node("filter_active")
.add_node("compute_revenue")
.add_node("top_customers")
.add_edge("read_orders", "filter_active")
.add_edge("filter_active", "compute_revenue")
.add_edge("compute_revenue", "top_customers")
.build()
)

# Define schemas
schemas = {
"read_orders": DataFrameSchema(
columns=[
ColumnSchema("order_id", dtype="int", nullable=False),
ColumnSchema("customer_id", dtype="int", nullable=False),
ColumnSchema("amount", dtype="float"),
ColumnSchema("status", dtype="str"),
],
min_rows=1,
),
"filter_active": DataFrameSchema(
columns=[
ColumnSchema("order_id", dtype="int", nullable=False),
ColumnSchema("customer_id", dtype="int", nullable=False),
ColumnSchema("amount", dtype="float", nullable=False),
],
),
"compute_revenue": DataFrameSchema(
columns=[
ColumnSchema("customer_id", dtype="int", nullable=False),
ColumnSchema("total_revenue", dtype="float", nullable=False),
],
),
"top_customers": DataFrameSchema(
columns=[
ColumnSchema("customer_id", dtype="int"),
ColumnSchema("total_revenue", dtype="float"),
ColumnSchema("rank", dtype="int"),
],
max_rows=100, # top 100
),
}

# Tasks
def read_orders():
return pd.DataFrame({
"order_id": [1, 2, 3, 4, 5],
"customer_id": [101, 102, 101, 103, 102],
"amount": [50.0, 75.0, 30.0, 100.0, 45.0],
"status": ["active", "active", "cancelled", "active", "active"],
})

def filter_active():
df = read_orders()
return df[df["status"] == "active"][["order_id", "customer_id", "amount"]]

def compute_revenue():
df = filter_active()
return df.groupby("customer_id")["amount"].sum().reset_index().rename(
columns={"amount": "total_revenue"}
)

def top_customers():
df = compute_revenue()
df = df.sort_values("total_revenue", ascending=False).head(100)
df["rank"] = range(1, len(df) + 1)
return df

# Execute and validate
executor = dagron.DAGExecutor(dag)
result = executor.execute({
"read_orders": read_orders,
"filter_active": filter_active,
"compute_revenue": compute_revenue,
"top_customers": top_customers,
})

pipeline = DataFramePipeline(dag, schemas)
violations = pipeline.validate_result(result)

if violations:
print(f"{len(violations)} schema violation(s):")
for v in violations:
print(f" [{v.node_name}] {v.message}")
else:
print("All schemas valid!")

Combining with Contracts

Use NodeContract for type-level checking (ensuring nodes produce DataFrames) and DataFrameSchema for content-level checking (columns, dtypes, nulls):

from dagron.contracts import NodeContract, ContractValidator

# Type-level: ensure the node outputs a DataFrame
import pandas as pd
contracts = {
"read_orders": NodeContract(output=pd.DataFrame),
"filter_active": NodeContract(inputs={"read_orders": pd.DataFrame}, output=pd.DataFrame),
"compute_revenue": NodeContract(inputs={"filter_active": pd.DataFrame}, output=pd.DataFrame),
}

# Validate types
type_violations = ContractValidator(dag, contracts).validate()

# Content-level: validate column schemas
content_violations = DataFramePipeline(dag, schemas).validate_result(result)

Non-DataFrame Handling

If a node's output is not a pandas or polars DataFrame, the validator reports it:

schema = DataFrameSchema(columns=[ColumnSchema("id")])
violations = validate_schema("not a dataframe", schema, node_name="bad_node")
# SchemaViolation("bad_node", "Expected DataFrame, got str")

Best Practices

  1. Define schemas for every node that produces a DataFrame. This creates a complete validation boundary at every step.

  2. Use nullable=False for critical columns. Null values in ID or key columns are a common source of downstream errors.

  3. Set min_rows=1 for extract nodes. Catch empty result sets immediately instead of letting them flow through.

  4. Use max_rows for output nodes. Prevent accidentally returning millions of rows to a reporting step.

  5. Run validation in CI. Execute the pipeline with test data and validate schemas as part of your test suite.

  6. Use substring dtype matching. Specify "int" instead of "int64" for cross-framework compatibility between pandas and polars.