Skip to main content

DataFrames

The dataframe module provides schema validation for pandas and polars DataFrames at DAG edge boundaries. Define expected column schemas (names, dtypes, nullability) and row count constraints for each node, then validate execution results or individual values against those schemas.

This module auto-detects whether a value is a pandas or polars DataFrame and applies the appropriate introspection methods.

from dagron.dataframe import (
DataFramePipeline,
DataFrameSchema,
ColumnSchema,
SchemaViolation,
validate_schema,
)

ColumnSchema

ColumnSchema
@dataclass(frozen=True)
class ColumnSchema:
name: str
dtype: str | None = None
nullable: bool = True
required: bool = True

Schema definition for a single column in a DataFrame. Frozen dataclass.

ParameterTypeDefaultDescription
namestrrequiredThe column name to validate.
dtypestr | NoneNoneExpected dtype as a string (e.g., 'int64', 'float', 'object'). Checked via case-insensitive substring match against the actual dtype string. None means any dtype is accepted.
nullableboolTrueWhether the column is allowed to contain null values. Set to False to require non-null values.
requiredboolTrueWhether the column must exist in the DataFrame. Set to False for optional columns.
from dagron.dataframe import ColumnSchema

id_col = ColumnSchema("id", dtype="int", nullable=False, required=True)
name_col = ColumnSchema("name", dtype="object", nullable=True)
score_col = ColumnSchema("score", dtype="float", required=False) # optional column

DataFrameSchema

DataFrameSchema
@dataclass(frozen=True)
class DataFrameSchema:
columns: list[ColumnSchema] = field(default_factory=list)
min_rows: int | None = None
max_rows: int | None = None

Schema definition for a DataFrame at an edge boundary. Combines column schemas with optional row count constraints. Frozen dataclass.

ParameterTypeDefaultDescription
columnslist[ColumnSchema][]List of column schema definitions.
min_rowsint | NoneNoneMinimum number of rows required. None means no minimum.
max_rowsint | NoneNoneMaximum number of rows allowed. None means no maximum.
from dagron.dataframe import DataFrameSchema, ColumnSchema

user_schema = DataFrameSchema(
columns=[
ColumnSchema("id", dtype="int", nullable=False),
ColumnSchema("name", dtype="object", nullable=False),
ColumnSchema("email", dtype="object", nullable=True),
],
min_rows=1,
max_rows=10000,
)

SchemaViolation

SchemaViolation
@dataclass(frozen=True)
class SchemaViolation:
node_name: str
message: str

A single schema violation detected during validation. Frozen dataclass.

ParameterTypeDefaultDescription
node_namestrrequiredName of the node that produced the invalid output.
messagestrrequiredHuman-readable description of the violation.
for violation in violations:
print(f"[{violation.node_name}] {violation.message}")
# [extract] Missing required column 'id'
# [extract] Column 'score' has null values but nullable=False

validate_schema

validate_schema
def validate_schema(
df: Any,
schema: DataFrameSchema,
node_name: str = "",
) -> list[SchemaViolation]

Validate a DataFrame against a schema. Works with both pandas and polars DataFrames. The framework is auto-detected from the object's type.

ParameterTypeDefaultDescription
dfAnyrequiredA pandas or polars DataFrame to validate.
schemaDataFrameSchemarequiredThe expected schema.
node_namestr""Name of the producing node (used in error messages).

Returns: list[SchemaViolation] -- List of violations. An empty list means the DataFrame is valid.

Validation checks

The function performs the following checks in order:

  1. Framework detection -- verifies the object is a pandas or polars DataFrame.
  2. Required columns -- checks that all required columns exist.
  3. Dtype matching -- for each column with a dtype constraint, checks that the actual dtype string contains the expected substring (case-insensitive).
  4. Nullability -- for columns with nullable=False, checks for null values.
  5. Row count -- validates min_rows and max_rows constraints.
import pandas as pd
from dagron.dataframe import validate_schema, DataFrameSchema, ColumnSchema

df = pd.DataFrame({"id": [1, 2, None], "name": ["Alice", "Bob", "Charlie"]})

schema = DataFrameSchema(
columns=[
ColumnSchema("id", dtype="int", nullable=False),
ColumnSchema("name", dtype="object"),
ColumnSchema("email", nullable=True, required=True),
],
min_rows=1,
)

violations = validate_schema(df, schema, node_name="extract")
for v in violations:
print(v.message)
# Column 'id' has null values but nullable=False
# Missing required column 'email'

DataFramePipeline

DataFramePipeline
class DataFramePipeline:
def __init__(
self,
dag: DAG,
schemas: dict[str, DataFrameSchema],
) -> None: ...

Execute a DAG pipeline with schema validation at edge boundaries. Validates that each node's output DataFrame matches the expected schema.

ParameterTypeDefaultDescription
dagDAGrequiredThe DAG defining the pipeline structure.
schemasdict[str, DataFrameSchema]requiredMapping of node names to their expected output DataFrameSchema.

Methods


DataFramePipeline.validate_result

DataFramePipeline.validate_result
def validate_result(
self,
result: ExecutionResult,
) -> list[SchemaViolation]

Validate all completed node outputs in an execution result against their declared schemas. Only nodes that completed successfully and have a schema defined are checked.

ParameterTypeDefaultDescription
resultExecutionResultrequiredThe execution result containing node outputs.

Returns: list[SchemaViolation] -- All violations found across all validated nodes.

import dagron
from dagron.dataframe import DataFramePipeline, DataFrameSchema, ColumnSchema

dag = (
dagron.DAG.builder()
.add_edge("extract", "transform")
.add_edge("transform", "load")
.build()
)

schemas = {
"extract": DataFrameSchema(
columns=[ColumnSchema("id", dtype="int"), ColumnSchema("name")],
min_rows=1,
),
"transform": DataFrameSchema(
columns=[
ColumnSchema("id", dtype="int", nullable=False),
ColumnSchema("name_upper", nullable=False),
],
),
}

pipeline = DataFramePipeline(dag, schemas)

# After execution...
executor = dagron.DAGExecutor(dag)
result = executor.execute(tasks)

violations = pipeline.validate_result(result)
if violations:
for v in violations:
print(f"[{v.node_name}] {v.message}")
else:
print("All DataFrames match their schemas")

DataFramePipeline.validate_value

DataFramePipeline.validate_value
def validate_value(
self,
node_name: str,
value: Any,
) -> list[SchemaViolation]

Validate a single value against a specific node's schema. Useful for testing individual node outputs without running the full pipeline.

ParameterTypeDefaultDescription
node_namestrrequiredName of the node whose schema to validate against.
valueAnyrequiredThe value (DataFrame) to validate.

Returns: list[SchemaViolation] -- Violations found, or empty list if valid. Returns empty list if no schema is defined for the given node.

import pandas as pd
from dagron.dataframe import DataFramePipeline, DataFrameSchema, ColumnSchema

# Validate a single DataFrame in isolation
df = pd.DataFrame({"id": [1, 2], "name": ["Alice", "Bob"]})
violations = pipeline.validate_value("extract", df)
assert not violations

Complete example

import dagron
import pandas as pd
from dagron.dataframe import (
DataFramePipeline,
DataFrameSchema,
ColumnSchema,
validate_schema,
)

# Define schemas for each pipeline stage
schemas = {
"extract": DataFrameSchema(
columns=[
ColumnSchema("user_id", dtype="int", nullable=False),
ColumnSchema("username", dtype="object", nullable=False),
ColumnSchema("score", dtype="float", nullable=True),
],
min_rows=1,
),
"transform": DataFrameSchema(
columns=[
ColumnSchema("user_id", dtype="int", nullable=False),
ColumnSchema("username", dtype="object", nullable=False),
ColumnSchema("score_normalized", dtype="float", nullable=False),
],
),
"load": DataFrameSchema(
columns=[
ColumnSchema("user_id", dtype="int", nullable=False),
ColumnSchema("username", dtype="object", nullable=False),
ColumnSchema("score_normalized", dtype="float", nullable=False),
ColumnSchema("loaded_at", dtype="datetime", nullable=False),
],
max_rows=100000,
),
}

# Build the DAG
dag = (
dagron.DAG.builder()
.add_edge("extract", "transform")
.add_edge("transform", "load")
.build()
)

pipeline = DataFramePipeline(dag, schemas)

# Validate individual DataFrames during development
test_df = pd.DataFrame({
"user_id": [1, 2, 3],
"username": ["alice", "bob", "charlie"],
"score": [0.85, None, 0.92],
})

violations = pipeline.validate_value("extract", test_df)
print(f"Extract violations: {len(violations)}")
for v in violations:
print(f" {v.message}")

# Or use the standalone function
violations = validate_schema(test_df, schemas["extract"], "extract")

# After full pipeline execution, validate all outputs
executor = dagron.DAGExecutor(dag)
result = executor.execute(tasks)
all_violations = pipeline.validate_result(result)

Using with polars

The module works identically with polars DataFrames -- no configuration changes needed:

import polars as pl
from dagron.dataframe import validate_schema, DataFrameSchema, ColumnSchema

df = pl.DataFrame({
"id": [1, 2, 3],
"value": [10.5, 20.3, 30.1],
})

schema = DataFrameSchema(
columns=[
ColumnSchema("id", dtype="i64", nullable=False),
ColumnSchema("value", dtype="f64"),
],
)

violations = validate_schema(df, schema, "my_node")

See also

  • Contracts -- type-level contracts for arbitrary Python types.
  • Pipeline -- the Pipeline class for decorator-based DAG construction.
  • Execution -- ExecutionResult containing node outputs.