Templates
Many organizations run the same pipeline structure against different environments, datasets, or configurations. Instead of building a separate DAG for each variant, dagron lets you define a DAGTemplate with {{placeholder}} syntax and render concrete DAGs by supplying parameter values.
Templates enforce type safety and support custom validators, so invalid parameter combinations are caught before the graph is ever built.
A single template renders into different concrete DAGs depending on the parameter values.
Quick Start
from dagron.template import DAGTemplate
# 1. Define the template
template = DAGTemplate(
params={"env": str, "batch_size": int},
defaults={"env": "staging", "batch_size": 1000},
)
# 2. Add templated nodes and edges
template.add_node("extract_{{env}}")
template.add_node("transform_{{env}}")
template.add_node("load_{{env}}")
template.add_edge("extract_{{env}}", "transform_{{env}}")
template.add_edge("transform_{{env}}", "load_{{env}}")
# 3. Render a concrete DAG
dag = template.render(env="prod", batch_size=5000)
# The DAG now has nodes: extract_prod, transform_prod, load_prod
print([n.name for n in dag.topological_sort()])
# ['extract_prod', 'transform_prod', 'load_prod']
Template Parameters
Declaring Parameters
Parameters are declared with their types when constructing the template:
template = DAGTemplate(
params={
"env": str,
"replicas": int,
"gpu_enabled": bool,
},
defaults={
"env": "staging",
"replicas": 1,
},
descriptions={
"env": "Target deployment environment",
"replicas": "Number of parallel workers",
"gpu_enabled": "Whether to use GPU acceleration",
},
)
Parameters without a default are required -- render() will raise a TemplateError if they are missing.
TemplateParam
Under the hood, each parameter is a TemplateParam dataclass:
from dagron.template import TemplateParam
param = TemplateParam(
name="env",
type=str,
default="staging",
description="Target deployment environment",
validator=lambda v: v in ("dev", "staging", "prod"),
)
You can access all parameter specs through the template:
for name, param in template.params.items():
print(f" {name}: {param.type.__name__}, default={param.default}")
print(f" {param.description}")
Placeholder Syntax
Default Delimiters
By default, placeholders use double-brace syntax: {{param_name}}. You can place them anywhere in a node name or edge label:
template.add_node("train_{{model}}_{{env}}")
template.add_edge("data_{{env}}", "train_{{model}}_{{env}}")
Custom Delimiters
If double braces conflict with your naming conventions, specify custom delimiters:
template = DAGTemplate(
params={"env": str},
delimiters=("${", "}"), # shell-style
)
template.add_node("extract_${env}")
Type-Preserving Substitution
If an entire node name is a single placeholder (e.g., "{{replicas}}"), dagron returns the raw Python value instead of stringifying it. This is useful for metadata:
template = DAGTemplate(params={"replicas": int})
template.add_node("worker", metadata="{{replicas}}")
dag = template.render(replicas=4)
# The metadata is the integer 4, not the string "4"
When a placeholder is part of a larger string (e.g., "worker_{{env}}"), values are converted to strings via str().
Validation
Automatic Type Checking
Parameters are validated against their declared types at render time:
template = DAGTemplate(params={"replicas": int})
try:
template.render(replicas="three") # str is not int
except TemplateError as e:
print(e)
# "Parameter 'replicas' expects int, got str"
Custom Validators
Supply a validator function for each parameter to enforce domain-specific constraints:
template = DAGTemplate(
params={"env": str, "replicas": int},
validators={
"env": lambda v: v in ("dev", "staging", "prod"),
"replicas": lambda v: 1 <= v <= 100,
},
)
try:
template.render(env="banana", replicas=1)
except TemplateError as e:
print(e)
# "Parameter 'env' failed custom validation"
Pre-Validation
Use validate_params() to check parameters without rendering. This returns a list of error messages instead of raising:
errors = template.validate_params(env="prod", replicas=-1)
for error in errors:
print(f" - {error}")
# - Parameter 'replicas' failed custom validation
errors = template.validate_params() # missing required params
# - Missing required parameter: 'replicas'
Unknown Parameters
Passing parameters not declared in the template is an error:
try:
template.render(env="prod", replicas=3, color="blue")
except TemplateError as e:
print(e)
# "Unknown parameters: color"
Rendering Methods
render() -- Direct DAG
The simplest rendering method produces a finalized DAG:
dag = template.render(env="prod", replicas=3)
# dag is a dagron.DAG, ready for execution
render_builder() -- DAGBuilder for Further Modification
If you need to add extra nodes or edges after rendering, use render_builder() to get a DAGBuilder:
builder = template.render_builder(env="prod", replicas=3)
# Add extra nodes beyond what the template defines
builder.add_node("monitoring")
builder.add_edge("load_prod", "monitoring")
dag = builder.build()
This is useful when you have a standard template but need per-deployment customizations.
render_pipeline() -- Pipeline
Render into a Pipeline for use with the @task decorator workflow:
pipeline = template.render_pipeline(env="prod", replicas=3)
Fluent API
add_node() and add_edge() return self, so you can chain calls:
template = DAGTemplate(params={"env": str}, defaults={"env": "dev"})
template = (
template
.add_node("extract_{{env}}")
.add_node("transform_{{env}}")
.add_node("load_{{env}}")
.add_edge("extract_{{env}}", "transform_{{env}}")
.add_edge("transform_{{env}}", "load_{{env}}")
)
dag = template.render(env="prod")
Parameterized ETL Example
Here is a realistic ETL pipeline template that generates environment-specific DAGs:
from dagron.template import DAGTemplate
def create_etl_template():
"""Create a reusable ETL pipeline template."""
template = DAGTemplate(
params={
"env": str,
"source_table": str,
"target_table": str,
"batch_size": int,
"validate": bool,
},
defaults={
"batch_size": 10000,
"validate": True,
},
descriptions={
"env": "Target environment (dev, staging, prod)",
"source_table": "Source database table name",
"target_table": "Target data warehouse table",
"batch_size": "Number of rows per batch",
"validate": "Whether to run data validation",
},
validators={
"env": lambda v: v in ("dev", "staging", "prod"),
"batch_size": lambda v: 100 <= v <= 1_000_000,
},
)
# Core ETL nodes
(
template
.add_node("extract_{{source_table}}_{{env}}")
.add_node("validate_{{source_table}}_{{env}}")
.add_node("transform_{{source_table}}_{{env}}")
.add_node("load_{{target_table}}_{{env}}")
.add_node("verify_{{target_table}}_{{env}}")
.add_edge("extract_{{source_table}}_{{env}}", "validate_{{source_table}}_{{env}}")
.add_edge("validate_{{source_table}}_{{env}}", "transform_{{source_table}}_{{env}}")
.add_edge("transform_{{source_table}}_{{env}}", "load_{{target_table}}_{{env}}")
.add_edge("load_{{target_table}}_{{env}}", "verify_{{target_table}}_{{env}}")
)
return template
# Create the template once
etl_template = create_etl_template()
# Render for different environments
dev_dag = etl_template.render(
env="dev",
source_table="users",
target_table="dim_users",
)
prod_dag = etl_template.render(
env="prod",
source_table="users",
target_table="dim_users",
batch_size=100000,
)
print(f"Dev nodes: {[n.name for n in dev_dag.topological_sort()]}")
# ['extract_users_dev', 'validate_users_dev', 'transform_users_dev',
# 'load_dim_users_dev', 'verify_dim_users_dev']
print(f"Prod nodes: {[n.name for n in prod_dag.topological_sort()]}")
# ['extract_users_prod', 'validate_users_prod', 'transform_users_prod',
# 'load_dim_users_prod', 'verify_dim_users_prod']
Multi-Tenant Pipeline Generation
Templates are powerful for generating per-tenant pipelines:
template = DAGTemplate(
params={"tenant": str, "region": str},
defaults={"region": "us-east-1"},
)
(
template
.add_node("ingest_{{tenant}}_{{region}}")
.add_node("process_{{tenant}}_{{region}}")
.add_node("deliver_{{tenant}}_{{region}}")
.add_edge("ingest_{{tenant}}_{{region}}", "process_{{tenant}}_{{region}}")
.add_edge("process_{{tenant}}_{{region}}", "deliver_{{tenant}}_{{region}}")
)
tenants = ["acme", "globex", "initech"]
dags = {
tenant: template.render(tenant=tenant, region="eu-west-1")
for tenant in tenants
}
for tenant, dag in dags.items():
nodes = [n.name for n in dag.topological_sort()]
print(f"{tenant}: {nodes}")
Template Composition with render_builder
Use render_builder() to compose a base template with per-use customizations:
# Base template: standard ML training pipeline
base = DAGTemplate(
params={"model": str, "dataset": str},
)
(
base
.add_node("load_{{dataset}}")
.add_node("preprocess_{{dataset}}")
.add_node("train_{{model}}")
.add_node("evaluate_{{model}}")
.add_edge("load_{{dataset}}", "preprocess_{{dataset}}")
.add_edge("preprocess_{{dataset}}", "train_{{model}}")
.add_edge("train_{{model}}", "evaluate_{{model}}")
)
# Render with customization
builder = base.render_builder(model="resnet50", dataset="imagenet")
builder.add_node("deploy_resnet50")
builder.add_edge("evaluate_resnet50", "deploy_resnet50")
# Only production builds get a deploy step
dag = builder.build()
Repr and Debugging
Templates have a helpful repr:
print(template)
# DAGTemplate(params=[batch_size, env, source_table, target_table, validate],
# nodes=5, edges=4)
Best Practices
-
Define templates as factory functions. Return a
DAGTemplatefrom a function so that the template definition is reusable and testable. -
Use validators for all string parameters. Catch typos like
env="prodd"at render time instead of at execution time. -
Provide defaults for optional parameters. This makes the most common usage concise while still allowing customization.
-
Use
validate_params()in CI. Run parameter validation in your test suite to catch invalid configurations early. -
Prefer
render_builder()when composing. It gives you flexibility to add environment-specific nodes without modifying the base template.
Related
- API Reference: Templates -- full API documentation.
- Building DAGs -- the DAGBuilder that templates render into.
- Versioning -- version-tracking the rendered DAGs.
- Contracts -- type-checking the rendered DAG's edges.