DataFrames
The dataframe module provides schema validation for pandas and polars DataFrames at DAG edge boundaries. Define expected column schemas (names, dtypes, nullability) and row count constraints for each node, then validate execution results or individual values against those schemas.
This module auto-detects whether a value is a pandas or polars DataFrame and applies the appropriate introspection methods.
from dagron.dataframe import (
DataFramePipeline,
DataFrameSchema,
ColumnSchema,
SchemaViolation,
validate_schema,
)
ColumnSchema
@dataclass(frozen=True)
class ColumnSchema:
name: str
dtype: str | None = None
nullable: bool = True
required: bool = True
Schema definition for a single column in a DataFrame. Frozen dataclass.
| Parameter | Type | Default | Description |
|---|---|---|---|
name | str | required | The column name to validate. |
dtype | str | None | None | Expected dtype as a string (e.g., 'int64', 'float', 'object'). Checked via case-insensitive substring match against the actual dtype string. None means any dtype is accepted. |
nullable | bool | True | Whether the column is allowed to contain null values. Set to False to require non-null values. |
required | bool | True | Whether the column must exist in the DataFrame. Set to False for optional columns. |
from dagron.dataframe import ColumnSchema
id_col = ColumnSchema("id", dtype="int", nullable=False, required=True)
name_col = ColumnSchema("name", dtype="object", nullable=True)
score_col = ColumnSchema("score", dtype="float", required=False) # optional column
DataFrameSchema
@dataclass(frozen=True)
class DataFrameSchema:
columns: list[ColumnSchema] = field(default_factory=list)
min_rows: int | None = None
max_rows: int | None = None
Schema definition for a DataFrame at an edge boundary. Combines column schemas with optional row count constraints. Frozen dataclass.
| Parameter | Type | Default | Description |
|---|---|---|---|
columns | list[ColumnSchema] | [] | List of column schema definitions. |
min_rows | int | None | None | Minimum number of rows required. None means no minimum. |
max_rows | int | None | None | Maximum number of rows allowed. None means no maximum. |
from dagron.dataframe import DataFrameSchema, ColumnSchema
user_schema = DataFrameSchema(
columns=[
ColumnSchema("id", dtype="int", nullable=False),
ColumnSchema("name", dtype="object", nullable=False),
ColumnSchema("email", dtype="object", nullable=True),
],
min_rows=1,
max_rows=10000,
)
SchemaViolation
@dataclass(frozen=True)
class SchemaViolation:
node_name: str
message: str
A single schema violation detected during validation. Frozen dataclass.
| Parameter | Type | Default | Description |
|---|---|---|---|
node_name | str | required | Name of the node that produced the invalid output. |
message | str | required | Human-readable description of the violation. |
for violation in violations:
print(f"[{violation.node_name}] {violation.message}")
# [extract] Missing required column 'id'
# [extract] Column 'score' has null values but nullable=False
validate_schema
def validate_schema(
df: Any,
schema: DataFrameSchema,
node_name: str = "",
) -> list[SchemaViolation]
Validate a DataFrame against a schema. Works with both pandas and polars DataFrames. The framework is auto-detected from the object's type.
| Parameter | Type | Default | Description |
|---|---|---|---|
df | Any | required | A pandas or polars DataFrame to validate. |
schema | DataFrameSchema | required | The expected schema. |
node_name | str | "" | Name of the producing node (used in error messages). |
Returns: list[SchemaViolation] -- List of violations. An empty list means the DataFrame is valid.
Validation checks
The function performs the following checks in order:
- Framework detection -- verifies the object is a pandas or polars DataFrame.
- Required columns -- checks that all required columns exist.
- Dtype matching -- for each column with a
dtypeconstraint, checks that the actual dtype string contains the expected substring (case-insensitive). - Nullability -- for columns with
nullable=False, checks for null values. - Row count -- validates
min_rowsandmax_rowsconstraints.
import pandas as pd
from dagron.dataframe import validate_schema, DataFrameSchema, ColumnSchema
df = pd.DataFrame({"id": [1, 2, None], "name": ["Alice", "Bob", "Charlie"]})
schema = DataFrameSchema(
columns=[
ColumnSchema("id", dtype="int", nullable=False),
ColumnSchema("name", dtype="object"),
ColumnSchema("email", nullable=True, required=True),
],
min_rows=1,
)
violations = validate_schema(df, schema, node_name="extract")
for v in violations:
print(v.message)
# Column 'id' has null values but nullable=False
# Missing required column 'email'
DataFramePipeline
class DataFramePipeline:
def __init__(
self,
dag: DAG,
schemas: dict[str, DataFrameSchema],
) -> None: ...
Execute a DAG pipeline with schema validation at edge boundaries. Validates that each node's output DataFrame matches the expected schema.
| Parameter | Type | Default | Description |
|---|---|---|---|
dag | DAG | required | The DAG defining the pipeline structure. |
schemas | dict[str, DataFrameSchema] | required | Mapping of node names to their expected output DataFrameSchema. |
Methods
DataFramePipeline.validate_result
def validate_result(
self,
result: ExecutionResult,
) -> list[SchemaViolation]
Validate all completed node outputs in an execution result against their declared schemas. Only nodes that completed successfully and have a schema defined are checked.
| Parameter | Type | Default | Description |
|---|---|---|---|
result | ExecutionResult | required | The execution result containing node outputs. |
Returns: list[SchemaViolation] -- All violations found across all validated nodes.
import dagron
from dagron.dataframe import DataFramePipeline, DataFrameSchema, ColumnSchema
dag = (
dagron.DAG.builder()
.add_edge("extract", "transform")
.add_edge("transform", "load")
.build()
)
schemas = {
"extract": DataFrameSchema(
columns=[ColumnSchema("id", dtype="int"), ColumnSchema("name")],
min_rows=1,
),
"transform": DataFrameSchema(
columns=[
ColumnSchema("id", dtype="int", nullable=False),
ColumnSchema("name_upper", nullable=False),
],
),
}
pipeline = DataFramePipeline(dag, schemas)
# After execution...
executor = dagron.DAGExecutor(dag)
result = executor.execute(tasks)
violations = pipeline.validate_result(result)
if violations:
for v in violations:
print(f"[{v.node_name}] {v.message}")
else:
print("All DataFrames match their schemas")
DataFramePipeline.validate_value
def validate_value(
self,
node_name: str,
value: Any,
) -> list[SchemaViolation]
Validate a single value against a specific node's schema. Useful for testing individual node outputs without running the full pipeline.
| Parameter | Type | Default | Description |
|---|---|---|---|
node_name | str | required | Name of the node whose schema to validate against. |
value | Any | required | The value (DataFrame) to validate. |
Returns: list[SchemaViolation] -- Violations found, or empty list if valid. Returns empty list if no schema is defined for the given node.
import pandas as pd
from dagron.dataframe import DataFramePipeline, DataFrameSchema, ColumnSchema
# Validate a single DataFrame in isolation
df = pd.DataFrame({"id": [1, 2], "name": ["Alice", "Bob"]})
violations = pipeline.validate_value("extract", df)
assert not violations
Complete example
import dagron
import pandas as pd
from dagron.dataframe import (
DataFramePipeline,
DataFrameSchema,
ColumnSchema,
validate_schema,
)
# Define schemas for each pipeline stage
schemas = {
"extract": DataFrameSchema(
columns=[
ColumnSchema("user_id", dtype="int", nullable=False),
ColumnSchema("username", dtype="object", nullable=False),
ColumnSchema("score", dtype="float", nullable=True),
],
min_rows=1,
),
"transform": DataFrameSchema(
columns=[
ColumnSchema("user_id", dtype="int", nullable=False),
ColumnSchema("username", dtype="object", nullable=False),
ColumnSchema("score_normalized", dtype="float", nullable=False),
],
),
"load": DataFrameSchema(
columns=[
ColumnSchema("user_id", dtype="int", nullable=False),
ColumnSchema("username", dtype="object", nullable=False),
ColumnSchema("score_normalized", dtype="float", nullable=False),
ColumnSchema("loaded_at", dtype="datetime", nullable=False),
],
max_rows=100000,
),
}
# Build the DAG
dag = (
dagron.DAG.builder()
.add_edge("extract", "transform")
.add_edge("transform", "load")
.build()
)
pipeline = DataFramePipeline(dag, schemas)
# Validate individual DataFrames during development
test_df = pd.DataFrame({
"user_id": [1, 2, 3],
"username": ["alice", "bob", "charlie"],
"score": [0.85, None, 0.92],
})
violations = pipeline.validate_value("extract", test_df)
print(f"Extract violations: {len(violations)}")
for v in violations:
print(f" {v.message}")
# Or use the standalone function
violations = validate_schema(test_df, schemas["extract"], "extract")
# After full pipeline execution, validate all outputs
executor = dagron.DAGExecutor(dag)
result = executor.execute(tasks)
all_violations = pipeline.validate_result(result)
Using with polars
The module works identically with polars DataFrames -- no configuration changes needed:
import polars as pl
from dagron.dataframe import validate_schema, DataFrameSchema, ColumnSchema
df = pl.DataFrame({
"id": [1, 2, 3],
"value": [10.5, 20.3, 30.1],
})
schema = DataFrameSchema(
columns=[
ColumnSchema("id", dtype="i64", nullable=False),
ColumnSchema("value", dtype="f64"),
],
)
violations = validate_schema(df, schema, "my_node")