Templates
The template module provides parameterized DAG construction. Define a DAG shape with placeholder values in node names (and optionally in payloads and metadata), then render concrete DAGs by supplying parameter values. Templates support type checking, custom validators, default values, and configurable delimiters.
from dagron.template import DAGTemplate, TemplateParam, TemplateError
DAGTemplate
class DAGTemplate:
def __init__(
self,
params: dict[str, type] | None = None,
defaults: dict[str, Any] | None = None,
descriptions: dict[str, str] | None = None,
validators: dict[str, Any] | None = None,
delimiters: tuple[str, str] = ("{{", "}}"),
) -> None: ...
A parameterized DAG template that renders concrete DAGs by substituting placeholder values. Placeholders in node names, payloads, and edge labels are replaced at render time.
| Parameter | Type | Default | Description |
|---|---|---|---|
params | dict[str, type] | None | None | Parameter definitions mapping parameter name to expected type. |
defaults | dict[str, Any] | None | None | Default values for parameters. Parameters with defaults are optional at render time. |
descriptions | dict[str, str] | None | None | Human-readable descriptions for each parameter. |
validators | dict[str, Any] | None | None | Custom validator callables for each parameter. Each validator takes a value and returns True if valid. |
delimiters | tuple[str, str] | ("{{", "}}") | Opening and closing delimiter pair for placeholders. |
from dagron.template import DAGTemplate
template = DAGTemplate(
params={"env": str, "replicas": int},
defaults={"env": "staging"},
descriptions={"env": "Target environment", "replicas": "Number of replicas"},
validators={"replicas": lambda n: 1 <= n <= 10},
)
template.add_node("extract_{{env}}")
template.add_node("load_{{env}}")
template.add_edge("extract_{{env}}", "load_{{env}}")
dag = template.render(env="prod", replicas=3)
print(list(dag.nodes())) # ['extract_prod', 'load_prod']
Properties
DAGTemplate.params
@property
def params(self) -> dict[str, TemplateParam]
Return a copy of the template parameters.
Returns: dict[str, TemplateParam] -- Mapping of parameter names to their TemplateParam specifications.
for name, param in template.params.items():
print(f"{name}: type={param.type.__name__}, default={param.default}")
Methods
DAGTemplate.add_node
def add_node(
self,
name: str,
*,
payload: object = None,
metadata: object = None,
) -> DAGTemplate
Add a templated node. The name may contain {{param}} placeholders that
are resolved at render time. Returns self for method chaining.
| Parameter | Type | Default | Description |
|---|---|---|---|
name | str | required | Node name template. May contain placeholder expressions like {{env}}. |
payload | object | None | Optional payload to attach. String payloads also undergo placeholder substitution. |
metadata | object | None | Optional metadata to attach. String metadata also undergoes substitution. |
Returns: DAGTemplate -- Self, for fluent chaining.
template.add_node("worker_{{env}}_{{replicas}}", payload={"region": "{{env}}"})
DAGTemplate.add_edge
def add_edge(
self,
from_node: str,
to_node: str,
*,
weight: float | None = None,
label: str | None = None,
) -> DAGTemplate
Add a templated edge. Both node names may contain placeholders. Returns
self for method chaining.
| Parameter | Type | Default | Description |
|---|---|---|---|
from_node | str | required | Source node name template. |
to_node | str | required | Target node name template. |
weight | float | None | None | Optional edge weight. |
label | str | None | None | Optional edge label (supports placeholder substitution). |
Returns: DAGTemplate -- Self, for fluent chaining.
template.add_edge("extract_{{env}}", "transform_{{env}}", label="{{env}}_pipeline")
DAGTemplate.validate_params
def validate_params(self, **kwargs: Any) -> list[str]
Validate parameters without rendering. Checks for unknown parameters, missing required parameters, type mismatches, and custom validator failures.
Returns: list[str] -- List of error messages. Empty means all parameters are valid.
errors = template.validate_params(env="prod", replicas="three")
for err in errors:
print(err)
# Parameter 'replicas' expects int, got str
DAGTemplate.render
def render(self, **kwargs: Any) -> DAG
Render the template into a concrete DAG. All placeholders are resolved with the provided parameter values (merged with defaults).
Returns: DAG -- A new DAG instance with all placeholders resolved.
Raises: TemplateError -- If parameters are missing, wrong type, or fail custom validation.
dag = template.render(env="production", replicas=5)
print(dag.node_count())
Type-preserving substitution
If the entire string is a single placeholder (e.g., "{{replicas}}"), the
raw Python value is returned rather than a stringified version. This means
integer and other non-string parameters pass through as their original type
when they are the sole content of a placeholder.
DAGTemplate.render_builder
def render_builder(self, **kwargs: Any) -> DAGBuilder
Render the template into a pre-populated DAGBuilder. This allows you to
add additional nodes and edges before calling .build().
Returns: DAGBuilder -- A builder with all templated nodes and edges added.
Raises: TemplateError -- If parameter validation fails.
builder = template.render_builder(env="staging", replicas=2)
# Add extra nodes to the rendered template
builder.add_node("monitoring")
builder.add_edge("load_staging", "monitoring")
dag = builder.build()
DAGTemplate.render_pipeline
def render_pipeline(
self,
tasks: list[Any] | None = None,
**kwargs: Any,
) -> Pipeline
Render the template into a Pipeline.
| Parameter | Type | Default | Description |
|---|---|---|---|
tasks | list[Any] | None | None | Optional list of @task-decorated functions. |
**kwargs | Any | required | Template parameters. |
Returns: Pipeline -- A Pipeline wrapping the rendered DAG.
TemplateParam
@dataclass(frozen=True)
class TemplateParam:
name: str
type: type = str
default: Any = None
description: str = ""
validator: Any = None
Specification for a single template parameter. Frozen dataclass.
| Parameter | Type | Default | Description |
|---|---|---|---|
name | str | required | The parameter name. |
type | type | str | Expected Python type for the parameter value. |
default | Any | None | Default value. If None, the parameter is required. |
description | str | "" | Human-readable description. |
validator | Callable[[Any], bool] | None | None | Custom validator function. Takes the value and returns True if valid. |
Methods
TemplateParam.validate
def validate(self, value: Any) -> None
Validate a value against this parameter's type and optional custom validator.
| Parameter | Type | Default | Description |
|---|---|---|---|
value | Any | required | The value to validate. |
Raises: TemplateError -- If the value has the wrong type or fails the custom validator.
from dagron.template import TemplateParam, TemplateError
param = TemplateParam(
name="replicas",
type=int,
validator=lambda n: 1 <= n <= 10,
)
param.validate(5) # OK
try:
param.validate(15) # Fails validator
except TemplateError as e:
print(e) # Parameter 'replicas' failed custom validation
try:
param.validate("five") # Wrong type
except TemplateError as e:
print(e) # Parameter 'replicas' expects int, got str
TemplateError
class TemplateError(Exception): ...
Raised when template validation or rendering fails. This includes missing parameters, type mismatches, unknown parameters, and custom validator failures.
from dagron.template import DAGTemplate, TemplateError
template = DAGTemplate(params={"env": str})
template.add_node("node_{{env}}")
try:
dag = template.render() # Missing required parameter
except TemplateError as e:
print(e) # Missing required parameter: 'env'
Complete example
from dagron.template import DAGTemplate
# Define a reusable ETL template
etl_template = DAGTemplate(
params={
"source": str,
"target": str,
"parallelism": int,
},
defaults={"parallelism": 4},
descriptions={
"source": "Data source identifier",
"target": "Destination database",
"parallelism": "Number of parallel transform workers",
},
validators={
"parallelism": lambda n: 1 <= n <= 32,
},
)
# Define the template shape
etl_template.add_node("extract_{{source}}")
for i in range(4): # max parallelism slots
etl_template.add_node(f"transform_{{source}}_{i}")
etl_template.add_edge(f"extract_{{{{source}}}}", f"transform_{{{{source}}}}_{i}")
etl_template.add_node("load_{{target}}")
# Validate before rendering
errors = etl_template.validate_params(source="api", target="warehouse")
assert not errors
# Render multiple concrete DAGs
api_dag = etl_template.render(source="api", target="warehouse")
db_dag = etl_template.render(source="db", target="lake", parallelism=2)
# Use render_builder for customization
builder = etl_template.render_builder(source="s3", target="redshift")
builder.add_node("notify")
builder.add_edge("load_redshift", "notify")
custom_dag = builder.build()
# Inspect template parameters
for name, param in etl_template.params.items():
print(f" {name} ({param.type.__name__}): {param.description}")
if param.default is not None:
print(f" default: {param.default}")
See also
- DAGBuilder -- the builder returned by
render_builder(). - Composition -- combining multiple rendered DAGs.
- Building DAGs guide -- construction patterns overview.