Skip to main content

Integration

The integration module provides helpers for building DAGs from common Python data structures. The primary function, from_records, converts sequences of dicts, dataclasses, or Pydantic models into DAGs with minimal boilerplate.

from dagron.integration import from_records

from_records

from_records
def from_records(
records: Sequence[Any],
*,
name_field: str = "name",
edge_fn: Callable[[Any], list[str]] | None = None,
payload_fn: Callable[[Any], Any] | None = None,
) -> DAG

Build a DAG from a sequence of records. Works with dicts, dataclasses, and Pydantic BaseModel instances. Each record becomes a node, with the node name extracted from the specified field. Edges and payloads are derived via optional callback functions.

ParameterTypeDefaultDescription
recordsSequence[Any]requiredSequence of records to convert. Each record becomes a node in the DAG. Supports dicts (field access via key lookup), dataclasses, and Pydantic models (field access via attribute lookup).
name_fieldstr"name"Field name to use as the node name. The field value is converted to a string.
edge_fnCallable[[Any], list[str]] | NoneNoneOptional callable that takes a record and returns a list of node names that this record depends on. Edges are created FROM those nodes TO this node. If None, no edges are added.
payload_fnCallable[[Any], Any] | NoneNoneOptional callable that takes a record and returns the payload to store on the node. If None, the entire record is stored as the payload.

Returns: DAG -- A new DAG built from the records.

Raises:

  • DuplicateNodeError -- If any records share the same name.
  • NodeNotFoundError -- If edge_fn references a node that does not exist.
  • CycleError -- If the edges derived from edge_fn would create a cycle.

Building from dicts

The most common use case is converting a list of dictionaries, such as records loaded from a YAML file, JSON API, or database query:

from dagron.integration import from_records

records = [
{"name": "extract", "depends_on": [], "config": {"source": "s3"}},
{"name": "transform", "depends_on": ["extract"], "config": {"mode": "batch"}},
{"name": "load", "depends_on": ["transform"], "config": {"target": "warehouse"}},
]

dag = from_records(
records,
edge_fn=lambda r: r["depends_on"],
payload_fn=lambda r: r["config"],
)

print(dag.node_count()) # 3
print(dag.edge_count()) # 2
print(dag.get_payload("extract")) # {"source": "s3"}

Building from dataclasses

from dataclasses import dataclass, field
from dagron.integration import from_records

@dataclass
class TaskSpec:
name: str
dependencies: list[str] = field(default_factory=list)
timeout: float = 30.0

specs = [
TaskSpec("fetch", timeout=60.0),
TaskSpec("parse", dependencies=["fetch"], timeout=10.0),
TaskSpec("validate", dependencies=["parse"]),
TaskSpec("store", dependencies=["validate"]),
]

dag = from_records(
specs,
edge_fn=lambda s: s.dependencies,
payload_fn=lambda s: {"timeout": s.timeout},
)

print(list(dag.nodes())) # ['fetch', 'parse', 'validate', 'store']

Building from Pydantic models

from pydantic import BaseModel
from dagron.integration import from_records

class PipelineStep(BaseModel):
name: str
depends_on: list[str] = []
retries: int = 3

steps = [
PipelineStep(name="ingest", retries=5),
PipelineStep(name="clean", depends_on=["ingest"]),
PipelineStep(name="publish", depends_on=["clean"], retries=1),
]

dag = from_records(
steps,
edge_fn=lambda s: s.depends_on,
payload_fn=lambda s: {"retries": s.retries},
)

Custom name fields

Use the name_field parameter when your records use a different field for the node identifier:

records = [
{"id": "step_1", "after": []},
{"id": "step_2", "after": ["step_1"]},
{"id": "step_3", "after": ["step_2"]},
]

dag = from_records(
records,
name_field="id",
edge_fn=lambda r: r["after"],
)

print(list(dag.nodes())) # ['step_1', 'step_2', 'step_3']

No edges

When edge_fn is not provided, the resulting DAG contains nodes but no edges. This is useful when you want to add edges separately:

records = [{"name": "a"}, {"name": "b"}, {"name": "c"}]

dag = from_records(records)
print(dag.node_count()) # 3
print(dag.edge_count()) # 0

# Add edges manually
dag.add_edge("a", "b")
dag.add_edge("b", "c")

Default payloads

When payload_fn is not provided, the entire record is stored as the payload:

records = [
{"name": "extract", "source": "api", "timeout": 30},
{"name": "load", "target": "db", "batch_size": 100},
]

dag = from_records(records)
print(dag.get_payload("extract"))
# {"name": "extract", "source": "api", "timeout": 30}

Complete example

import json
from dagron.integration import from_records

# Simulate loading pipeline config from JSON
config_json = """
[
{"name": "raw_data", "deps": [], "type": "source", "params": {"path": "/data/raw"}},
{"name": "clean", "deps": ["raw_data"], "type": "transform", "params": {"drop_nulls": true}},
{"name": "features", "deps": ["clean"], "type": "transform", "params": {"method": "tfidf"}},
{"name": "train", "deps": ["features"], "type": "model", "params": {"epochs": 50}},
{"name": "evaluate", "deps": ["train"], "type": "model", "params": {"metrics": ["acc", "f1"]}},
{"name": "deploy", "deps": ["evaluate"], "type": "deploy", "params": {"env": "prod"}}
]
"""

records = json.loads(config_json)

# Build DAG with structured payloads
dag = from_records(
records,
edge_fn=lambda r: r["deps"],
payload_fn=lambda r: {"type": r["type"], "params": r["params"]},
)

print(f"Pipeline: {dag.node_count()} nodes, {dag.edge_count()} edges")
# Pipeline: 6 nodes, 5 edges

# Inspect the structure
for level_idx, level in enumerate(dag.topological_levels()):
names = [n.name for n in level]
print(f"Level {level_idx}: {names}")

# Access payload metadata
for name in ["raw_data", "train", "deploy"]:
payload = dag.get_payload(name)
print(f"{name}: type={payload['type']}, params={payload['params']}")

# Execute with task functions
import dagron

tasks = {name: (lambda: f"completed {name}") for name in dag.nodes()}
executor = dagron.DAGExecutor(dag)
result = executor.execute(tasks)
print(f"Succeeded: {result.succeeded}/{dag.node_count()}")

See also