Skip to main content

Templates

Many organizations run the same pipeline structure against different environments, datasets, or configurations. Instead of building a separate DAG for each variant, dagron lets you define a DAGTemplate with {{placeholder}} syntax and render concrete DAGs by supplying parameter values.

Templates enforce type safety and support custom validators, so invalid parameter combinations are caught before the graph is ever built.

A single template renders into different concrete DAGs depending on the parameter values.


Quick Start

from dagron.template import DAGTemplate

# 1. Define the template
template = DAGTemplate(
params={"env": str, "batch_size": int},
defaults={"env": "staging", "batch_size": 1000},
)

# 2. Add templated nodes and edges
template.add_node("extract_{{env}}")
template.add_node("transform_{{env}}")
template.add_node("load_{{env}}")
template.add_edge("extract_{{env}}", "transform_{{env}}")
template.add_edge("transform_{{env}}", "load_{{env}}")

# 3. Render a concrete DAG
dag = template.render(env="prod", batch_size=5000)

# The DAG now has nodes: extract_prod, transform_prod, load_prod
print([n.name for n in dag.topological_sort()])
# ['extract_prod', 'transform_prod', 'load_prod']

Template Parameters

Declaring Parameters

Parameters are declared with their types when constructing the template:

template = DAGTemplate(
params={
"env": str,
"replicas": int,
"gpu_enabled": bool,
},
defaults={
"env": "staging",
"replicas": 1,
},
descriptions={
"env": "Target deployment environment",
"replicas": "Number of parallel workers",
"gpu_enabled": "Whether to use GPU acceleration",
},
)

Parameters without a default are required -- render() will raise a TemplateError if they are missing.

TemplateParam

Under the hood, each parameter is a TemplateParam dataclass:

from dagron.template import TemplateParam

param = TemplateParam(
name="env",
type=str,
default="staging",
description="Target deployment environment",
validator=lambda v: v in ("dev", "staging", "prod"),
)

You can access all parameter specs through the template:

for name, param in template.params.items():
print(f" {name}: {param.type.__name__}, default={param.default}")
print(f" {param.description}")

Placeholder Syntax

Default Delimiters

By default, placeholders use double-brace syntax: {{param_name}}. You can place them anywhere in a node name or edge label:

template.add_node("train_{{model}}_{{env}}")
template.add_edge("data_{{env}}", "train_{{model}}_{{env}}")

Custom Delimiters

If double braces conflict with your naming conventions, specify custom delimiters:

template = DAGTemplate(
params={"env": str},
delimiters=("${", "}"), # shell-style
)

template.add_node("extract_${env}")

Type-Preserving Substitution

If an entire node name is a single placeholder (e.g., "{{replicas}}"), dagron returns the raw Python value instead of stringifying it. This is useful for metadata:

template = DAGTemplate(params={"replicas": int})
template.add_node("worker", metadata="{{replicas}}")

dag = template.render(replicas=4)
# The metadata is the integer 4, not the string "4"

When a placeholder is part of a larger string (e.g., "worker_{{env}}"), values are converted to strings via str().


Validation

Automatic Type Checking

Parameters are validated against their declared types at render time:

template = DAGTemplate(params={"replicas": int})

try:
template.render(replicas="three") # str is not int
except TemplateError as e:
print(e)
# "Parameter 'replicas' expects int, got str"

Custom Validators

Supply a validator function for each parameter to enforce domain-specific constraints:

template = DAGTemplate(
params={"env": str, "replicas": int},
validators={
"env": lambda v: v in ("dev", "staging", "prod"),
"replicas": lambda v: 1 <= v <= 100,
},
)

try:
template.render(env="banana", replicas=1)
except TemplateError as e:
print(e)
# "Parameter 'env' failed custom validation"

Pre-Validation

Use validate_params() to check parameters without rendering. This returns a list of error messages instead of raising:

errors = template.validate_params(env="prod", replicas=-1)
for error in errors:
print(f" - {error}")
# - Parameter 'replicas' failed custom validation

errors = template.validate_params() # missing required params
# - Missing required parameter: 'replicas'

Unknown Parameters

Passing parameters not declared in the template is an error:

try:
template.render(env="prod", replicas=3, color="blue")
except TemplateError as e:
print(e)
# "Unknown parameters: color"

Rendering Methods

render() -- Direct DAG

The simplest rendering method produces a finalized DAG:

dag = template.render(env="prod", replicas=3)
# dag is a dagron.DAG, ready for execution

render_builder() -- DAGBuilder for Further Modification

If you need to add extra nodes or edges after rendering, use render_builder() to get a DAGBuilder:

builder = template.render_builder(env="prod", replicas=3)

# Add extra nodes beyond what the template defines
builder.add_node("monitoring")
builder.add_edge("load_prod", "monitoring")

dag = builder.build()

This is useful when you have a standard template but need per-deployment customizations.

render_pipeline() -- Pipeline

Render into a Pipeline for use with the @task decorator workflow:

pipeline = template.render_pipeline(env="prod", replicas=3)

Fluent API

add_node() and add_edge() return self, so you can chain calls:

template = DAGTemplate(params={"env": str}, defaults={"env": "dev"})

template = (
template
.add_node("extract_{{env}}")
.add_node("transform_{{env}}")
.add_node("load_{{env}}")
.add_edge("extract_{{env}}", "transform_{{env}}")
.add_edge("transform_{{env}}", "load_{{env}}")
)

dag = template.render(env="prod")

Parameterized ETL Example

Here is a realistic ETL pipeline template that generates environment-specific DAGs:

from dagron.template import DAGTemplate

def create_etl_template():
"""Create a reusable ETL pipeline template."""
template = DAGTemplate(
params={
"env": str,
"source_table": str,
"target_table": str,
"batch_size": int,
"validate": bool,
},
defaults={
"batch_size": 10000,
"validate": True,
},
descriptions={
"env": "Target environment (dev, staging, prod)",
"source_table": "Source database table name",
"target_table": "Target data warehouse table",
"batch_size": "Number of rows per batch",
"validate": "Whether to run data validation",
},
validators={
"env": lambda v: v in ("dev", "staging", "prod"),
"batch_size": lambda v: 100 <= v <= 1_000_000,
},
)

# Core ETL nodes
(
template
.add_node("extract_{{source_table}}_{{env}}")
.add_node("validate_{{source_table}}_{{env}}")
.add_node("transform_{{source_table}}_{{env}}")
.add_node("load_{{target_table}}_{{env}}")
.add_node("verify_{{target_table}}_{{env}}")
.add_edge("extract_{{source_table}}_{{env}}", "validate_{{source_table}}_{{env}}")
.add_edge("validate_{{source_table}}_{{env}}", "transform_{{source_table}}_{{env}}")
.add_edge("transform_{{source_table}}_{{env}}", "load_{{target_table}}_{{env}}")
.add_edge("load_{{target_table}}_{{env}}", "verify_{{target_table}}_{{env}}")
)

return template

# Create the template once
etl_template = create_etl_template()

# Render for different environments
dev_dag = etl_template.render(
env="dev",
source_table="users",
target_table="dim_users",
)

prod_dag = etl_template.render(
env="prod",
source_table="users",
target_table="dim_users",
batch_size=100000,
)

print(f"Dev nodes: {[n.name for n in dev_dag.topological_sort()]}")
# ['extract_users_dev', 'validate_users_dev', 'transform_users_dev',
# 'load_dim_users_dev', 'verify_dim_users_dev']

print(f"Prod nodes: {[n.name for n in prod_dag.topological_sort()]}")
# ['extract_users_prod', 'validate_users_prod', 'transform_users_prod',
# 'load_dim_users_prod', 'verify_dim_users_prod']

Multi-Tenant Pipeline Generation

Templates are powerful for generating per-tenant pipelines:

template = DAGTemplate(
params={"tenant": str, "region": str},
defaults={"region": "us-east-1"},
)

(
template
.add_node("ingest_{{tenant}}_{{region}}")
.add_node("process_{{tenant}}_{{region}}")
.add_node("deliver_{{tenant}}_{{region}}")
.add_edge("ingest_{{tenant}}_{{region}}", "process_{{tenant}}_{{region}}")
.add_edge("process_{{tenant}}_{{region}}", "deliver_{{tenant}}_{{region}}")
)

tenants = ["acme", "globex", "initech"]
dags = {
tenant: template.render(tenant=tenant, region="eu-west-1")
for tenant in tenants
}

for tenant, dag in dags.items():
nodes = [n.name for n in dag.topological_sort()]
print(f"{tenant}: {nodes}")

Template Composition with render_builder

Use render_builder() to compose a base template with per-use customizations:

# Base template: standard ML training pipeline
base = DAGTemplate(
params={"model": str, "dataset": str},
)
(
base
.add_node("load_{{dataset}}")
.add_node("preprocess_{{dataset}}")
.add_node("train_{{model}}")
.add_node("evaluate_{{model}}")
.add_edge("load_{{dataset}}", "preprocess_{{dataset}}")
.add_edge("preprocess_{{dataset}}", "train_{{model}}")
.add_edge("train_{{model}}", "evaluate_{{model}}")
)

# Render with customization
builder = base.render_builder(model="resnet50", dataset="imagenet")
builder.add_node("deploy_resnet50")
builder.add_edge("evaluate_resnet50", "deploy_resnet50")

# Only production builds get a deploy step
dag = builder.build()

Repr and Debugging

Templates have a helpful repr:

print(template)
# DAGTemplate(params=[batch_size, env, source_table, target_table, validate],
# nodes=5, edges=4)

Best Practices

  1. Define templates as factory functions. Return a DAGTemplate from a function so that the template definition is reusable and testable.

  2. Use validators for all string parameters. Catch typos like env="prodd" at render time instead of at execution time.

  3. Provide defaults for optional parameters. This makes the most common usage concise while still allowing customization.

  4. Use validate_params() in CI. Run parameter validation in your test suite to catch invalid configurations early.

  5. Prefer render_builder() when composing. It gives you flexibility to add environment-specific nodes without modifying the base template.