Integration
The integration module provides helpers for building DAGs from common Python
data structures. The primary function, from_records, converts sequences of
dicts, dataclasses, or Pydantic models into DAGs with minimal boilerplate.
from dagron.integration import from_records
from_records
def from_records(
records: Sequence[Any],
*,
name_field: str = "name",
edge_fn: Callable[[Any], list[str]] | None = None,
payload_fn: Callable[[Any], Any] | None = None,
) -> DAG
Build a DAG from a sequence of records. Works with dicts, dataclasses,
and Pydantic BaseModel instances. Each record becomes a node, with the
node name extracted from the specified field. Edges and payloads are derived
via optional callback functions.
| Parameter | Type | Default | Description |
|---|---|---|---|
records | Sequence[Any] | required | Sequence of records to convert. Each record becomes a node in the DAG. Supports dicts (field access via key lookup), dataclasses, and Pydantic models (field access via attribute lookup). |
name_field | str | "name" | Field name to use as the node name. The field value is converted to a string. |
edge_fn | Callable[[Any], list[str]] | None | None | Optional callable that takes a record and returns a list of node names that this record depends on. Edges are created FROM those nodes TO this node. If None, no edges are added. |
payload_fn | Callable[[Any], Any] | None | None | Optional callable that takes a record and returns the payload to store on the node. If None, the entire record is stored as the payload. |
Returns: DAG -- A new DAG built from the records.
Raises:
DuplicateNodeError-- If any records share the same name.NodeNotFoundError-- Ifedge_fnreferences a node that does not exist.CycleError-- If the edges derived fromedge_fnwould create a cycle.
Building from dicts
The most common use case is converting a list of dictionaries, such as records loaded from a YAML file, JSON API, or database query:
from dagron.integration import from_records
records = [
{"name": "extract", "depends_on": [], "config": {"source": "s3"}},
{"name": "transform", "depends_on": ["extract"], "config": {"mode": "batch"}},
{"name": "load", "depends_on": ["transform"], "config": {"target": "warehouse"}},
]
dag = from_records(
records,
edge_fn=lambda r: r["depends_on"],
payload_fn=lambda r: r["config"],
)
print(dag.node_count()) # 3
print(dag.edge_count()) # 2
print(dag.get_payload("extract")) # {"source": "s3"}
Building from dataclasses
from dataclasses import dataclass, field
from dagron.integration import from_records
@dataclass
class TaskSpec:
name: str
dependencies: list[str] = field(default_factory=list)
timeout: float = 30.0
specs = [
TaskSpec("fetch", timeout=60.0),
TaskSpec("parse", dependencies=["fetch"], timeout=10.0),
TaskSpec("validate", dependencies=["parse"]),
TaskSpec("store", dependencies=["validate"]),
]
dag = from_records(
specs,
edge_fn=lambda s: s.dependencies,
payload_fn=lambda s: {"timeout": s.timeout},
)
print(list(dag.nodes())) # ['fetch', 'parse', 'validate', 'store']
Building from Pydantic models
from pydantic import BaseModel
from dagron.integration import from_records
class PipelineStep(BaseModel):
name: str
depends_on: list[str] = []
retries: int = 3
steps = [
PipelineStep(name="ingest", retries=5),
PipelineStep(name="clean", depends_on=["ingest"]),
PipelineStep(name="publish", depends_on=["clean"], retries=1),
]
dag = from_records(
steps,
edge_fn=lambda s: s.depends_on,
payload_fn=lambda s: {"retries": s.retries},
)
Custom name fields
Use the name_field parameter when your records use a different field
for the node identifier:
records = [
{"id": "step_1", "after": []},
{"id": "step_2", "after": ["step_1"]},
{"id": "step_3", "after": ["step_2"]},
]
dag = from_records(
records,
name_field="id",
edge_fn=lambda r: r["after"],
)
print(list(dag.nodes())) # ['step_1', 'step_2', 'step_3']
No edges
When edge_fn is not provided, the resulting DAG contains nodes but no
edges. This is useful when you want to add edges separately:
records = [{"name": "a"}, {"name": "b"}, {"name": "c"}]
dag = from_records(records)
print(dag.node_count()) # 3
print(dag.edge_count()) # 0
# Add edges manually
dag.add_edge("a", "b")
dag.add_edge("b", "c")
Default payloads
When payload_fn is not provided, the entire record is stored as the
payload:
records = [
{"name": "extract", "source": "api", "timeout": 30},
{"name": "load", "target": "db", "batch_size": 100},
]
dag = from_records(records)
print(dag.get_payload("extract"))
# {"name": "extract", "source": "api", "timeout": 30}
Complete example
import json
from dagron.integration import from_records
# Simulate loading pipeline config from JSON
config_json = """
[
{"name": "raw_data", "deps": [], "type": "source", "params": {"path": "/data/raw"}},
{"name": "clean", "deps": ["raw_data"], "type": "transform", "params": {"drop_nulls": true}},
{"name": "features", "deps": ["clean"], "type": "transform", "params": {"method": "tfidf"}},
{"name": "train", "deps": ["features"], "type": "model", "params": {"epochs": 50}},
{"name": "evaluate", "deps": ["train"], "type": "model", "params": {"metrics": ["acc", "f1"]}},
{"name": "deploy", "deps": ["evaluate"], "type": "deploy", "params": {"env": "prod"}}
]
"""
records = json.loads(config_json)
# Build DAG with structured payloads
dag = from_records(
records,
edge_fn=lambda r: r["deps"],
payload_fn=lambda r: {"type": r["type"], "params": r["params"]},
)
print(f"Pipeline: {dag.node_count()} nodes, {dag.edge_count()} edges")
# Pipeline: 6 nodes, 5 edges
# Inspect the structure
for level_idx, level in enumerate(dag.topological_levels()):
names = [n.name for n in level]
print(f"Level {level_idx}: {names}")
# Access payload metadata
for name in ["raw_data", "train", "deploy"]:
payload = dag.get_payload(name)
print(f"{name}: type={payload['type']}, params={payload['params']}")
# Execute with task functions
import dagron
tasks = {name: (lambda: f"completed {name}") for name in dag.nodes()}
executor = dagron.DAGExecutor(dag)
result = executor.execute(tasks)
print(f"Succeeded: {result.succeeded}/{dag.node_count()}")
See also
- DAG -- the core graph class.
- DAGBuilder -- fluent builder for programmatic construction.
- Templates -- parameterized DAG construction.
- Building DAGs guide -- construction patterns overview.