Skip to main content

Templates

The template module provides parameterized DAG construction. Define a DAG shape with placeholder values in node names (and optionally in payloads and metadata), then render concrete DAGs by supplying parameter values. Templates support type checking, custom validators, default values, and configurable delimiters.

from dagron.template import DAGTemplate, TemplateParam, TemplateError

DAGTemplate

DAGTemplate
class DAGTemplate:
def __init__(
self,
params: dict[str, type] | None = None,
defaults: dict[str, Any] | None = None,
descriptions: dict[str, str] | None = None,
validators: dict[str, Any] | None = None,
delimiters: tuple[str, str] = ("{{", "}}"),
) -> None: ...

A parameterized DAG template that renders concrete DAGs by substituting placeholder values. Placeholders in node names, payloads, and edge labels are replaced at render time.

ParameterTypeDefaultDescription
paramsdict[str, type] | NoneNoneParameter definitions mapping parameter name to expected type.
defaultsdict[str, Any] | NoneNoneDefault values for parameters. Parameters with defaults are optional at render time.
descriptionsdict[str, str] | NoneNoneHuman-readable descriptions for each parameter.
validatorsdict[str, Any] | NoneNoneCustom validator callables for each parameter. Each validator takes a value and returns True if valid.
delimiterstuple[str, str]("{{", "}}")Opening and closing delimiter pair for placeholders.
from dagron.template import DAGTemplate

template = DAGTemplate(
params={"env": str, "replicas": int},
defaults={"env": "staging"},
descriptions={"env": "Target environment", "replicas": "Number of replicas"},
validators={"replicas": lambda n: 1 <= n <= 10},
)

template.add_node("extract_{{env}}")
template.add_node("load_{{env}}")
template.add_edge("extract_{{env}}", "load_{{env}}")

dag = template.render(env="prod", replicas=3)
print(list(dag.nodes())) # ['extract_prod', 'load_prod']

Properties


DAGTemplate.params

DAGTemplate.params
@property
def params(self) -> dict[str, TemplateParam]

Return a copy of the template parameters.

Returns: dict[str, TemplateParam] -- Mapping of parameter names to their TemplateParam specifications.

for name, param in template.params.items():
print(f"{name}: type={param.type.__name__}, default={param.default}")

Methods


DAGTemplate.add_node

DAGTemplate.add_node
def add_node(
self,
name: str,
*,
payload: object = None,
metadata: object = None,
) -> DAGTemplate

Add a templated node. The name may contain {{param}} placeholders that are resolved at render time. Returns self for method chaining.

ParameterTypeDefaultDescription
namestrrequiredNode name template. May contain placeholder expressions like {{env}}.
payloadobjectNoneOptional payload to attach. String payloads also undergo placeholder substitution.
metadataobjectNoneOptional metadata to attach. String metadata also undergoes substitution.

Returns: DAGTemplate -- Self, for fluent chaining.

template.add_node("worker_{{env}}_{{replicas}}", payload={"region": "{{env}}"})

DAGTemplate.add_edge

DAGTemplate.add_edge
def add_edge(
self,
from_node: str,
to_node: str,
*,
weight: float | None = None,
label: str | None = None,
) -> DAGTemplate

Add a templated edge. Both node names may contain placeholders. Returns self for method chaining.

ParameterTypeDefaultDescription
from_nodestrrequiredSource node name template.
to_nodestrrequiredTarget node name template.
weightfloat | NoneNoneOptional edge weight.
labelstr | NoneNoneOptional edge label (supports placeholder substitution).

Returns: DAGTemplate -- Self, for fluent chaining.

template.add_edge("extract_{{env}}", "transform_{{env}}", label="{{env}}_pipeline")

DAGTemplate.validate_params

DAGTemplate.validate_params
def validate_params(self, **kwargs: Any) -> list[str]

Validate parameters without rendering. Checks for unknown parameters, missing required parameters, type mismatches, and custom validator failures.

Returns: list[str] -- List of error messages. Empty means all parameters are valid.

errors = template.validate_params(env="prod", replicas="three")
for err in errors:
print(err)
# Parameter 'replicas' expects int, got str

DAGTemplate.render

DAGTemplate.render
def render(self, **kwargs: Any) -> DAG

Render the template into a concrete DAG. All placeholders are resolved with the provided parameter values (merged with defaults).

Returns: DAG -- A new DAG instance with all placeholders resolved.

Raises: TemplateError -- If parameters are missing, wrong type, or fail custom validation.

dag = template.render(env="production", replicas=5)
print(dag.node_count())

Type-preserving substitution

If the entire string is a single placeholder (e.g., "{{replicas}}"), the raw Python value is returned rather than a stringified version. This means integer and other non-string parameters pass through as their original type when they are the sole content of a placeholder.


DAGTemplate.render_builder

DAGTemplate.render_builder
def render_builder(self, **kwargs: Any) -> DAGBuilder

Render the template into a pre-populated DAGBuilder. This allows you to add additional nodes and edges before calling .build().

Returns: DAGBuilder -- A builder with all templated nodes and edges added.

Raises: TemplateError -- If parameter validation fails.

builder = template.render_builder(env="staging", replicas=2)
# Add extra nodes to the rendered template
builder.add_node("monitoring")
builder.add_edge("load_staging", "monitoring")
dag = builder.build()

DAGTemplate.render_pipeline

DAGTemplate.render_pipeline
def render_pipeline(
self,
tasks: list[Any] | None = None,
**kwargs: Any,
) -> Pipeline

Render the template into a Pipeline.

ParameterTypeDefaultDescription
taskslist[Any] | NoneNoneOptional list of @task-decorated functions.
**kwargsAnyrequiredTemplate parameters.

Returns: Pipeline -- A Pipeline wrapping the rendered DAG.


TemplateParam

TemplateParam
@dataclass(frozen=True)
class TemplateParam:
name: str
type: type = str
default: Any = None
description: str = ""
validator: Any = None

Specification for a single template parameter. Frozen dataclass.

ParameterTypeDefaultDescription
namestrrequiredThe parameter name.
typetypestrExpected Python type for the parameter value.
defaultAnyNoneDefault value. If None, the parameter is required.
descriptionstr""Human-readable description.
validatorCallable[[Any], bool] | NoneNoneCustom validator function. Takes the value and returns True if valid.

Methods


TemplateParam.validate

TemplateParam.validate
def validate(self, value: Any) -> None

Validate a value against this parameter's type and optional custom validator.

ParameterTypeDefaultDescription
valueAnyrequiredThe value to validate.

Raises: TemplateError -- If the value has the wrong type or fails the custom validator.

from dagron.template import TemplateParam, TemplateError

param = TemplateParam(
name="replicas",
type=int,
validator=lambda n: 1 <= n <= 10,
)

param.validate(5) # OK

try:
param.validate(15) # Fails validator
except TemplateError as e:
print(e) # Parameter 'replicas' failed custom validation

try:
param.validate("five") # Wrong type
except TemplateError as e:
print(e) # Parameter 'replicas' expects int, got str

TemplateError

TemplateError
class TemplateError(Exception): ...

Raised when template validation or rendering fails. This includes missing parameters, type mismatches, unknown parameters, and custom validator failures.

from dagron.template import DAGTemplate, TemplateError

template = DAGTemplate(params={"env": str})
template.add_node("node_{{env}}")

try:
dag = template.render() # Missing required parameter
except TemplateError as e:
print(e) # Missing required parameter: 'env'

Complete example

from dagron.template import DAGTemplate

# Define a reusable ETL template
etl_template = DAGTemplate(
params={
"source": str,
"target": str,
"parallelism": int,
},
defaults={"parallelism": 4},
descriptions={
"source": "Data source identifier",
"target": "Destination database",
"parallelism": "Number of parallel transform workers",
},
validators={
"parallelism": lambda n: 1 <= n <= 32,
},
)

# Define the template shape
etl_template.add_node("extract_{{source}}")
for i in range(4): # max parallelism slots
etl_template.add_node(f"transform_{{source}}_{i}")
etl_template.add_edge(f"extract_{{{{source}}}}", f"transform_{{{{source}}}}_{i}")
etl_template.add_node("load_{{target}}")

# Validate before rendering
errors = etl_template.validate_params(source="api", target="warehouse")
assert not errors

# Render multiple concrete DAGs
api_dag = etl_template.render(source="api", target="warehouse")
db_dag = etl_template.render(source="db", target="lake", parallelism=2)

# Use render_builder for customization
builder = etl_template.render_builder(source="s3", target="redshift")
builder.add_node("notify")
builder.add_edge("load_redshift", "notify")
custom_dag = builder.build()

# Inspect template parameters
for name, param in etl_template.params.items():
print(f" {name} ({param.type.__name__}): {param.description}")
if param.default is not None:
print(f" default: {param.default}")

See also