Composition
The compose module provides multi-DAG composition with automatic namespace prefixing. Combine independent DAGs into a single unified graph, preserving node payloads and metadata, with optional cross-namespace connections.
from dagron.compose import compose
compose
def compose(
dags: dict[str, DAG],
connections: list[tuple[str, str]] | None = None,
*,
separator: str = "/",
) -> DAG
Compose multiple DAGs into one with namespace prefixes. Each DAG's nodes
are prefixed with its namespace key (e.g., a node "load" in namespace
"etl" becomes "etl/load"). Internal edges within each DAG are preserved
with the same prefixing. Cross-namespace edges can be added via the
connections parameter.
| Parameter | Type | Default | Description |
|---|---|---|---|
dags | dict[str, DAG] | required | Mapping of namespace names to DAG objects. Each key becomes the prefix for that DAG's nodes. |
connections | list[tuple[str, str]] | None | None | List of (from, to) tuples using fully namespaced names (e.g., ("etl/load", "ml/train")). These create edges between nodes in different namespaces. |
separator | str | "/" | Separator between namespace prefix and node name. |
Returns: DAG -- A new DAG containing all nodes and edges from all input DAGs with namespaces applied, plus any cross-namespace connections.
Node prefixing
Every node in the resulting DAG has a name of the form
{namespace}{separator}{original_name}. Payloads and metadata from the
original nodes are preserved on the prefixed nodes.
Edge preservation
All edges within each input DAG are preserved with prefixed names. For
example, an edge ("a", "b") in namespace "etl" becomes
("etl/a", "etl/b") in the composed DAG.
import dagron
from dagron.compose import compose
# Build individual DAGs
etl_dag = (
dagron.DAG.builder()
.add_edge("extract", "transform")
.add_edge("transform", "load")
.build()
)
ml_dag = (
dagron.DAG.builder()
.add_edge("train", "evaluate")
.add_edge("evaluate", "deploy")
.build()
)
# Compose with a cross-namespace connection
combined = compose(
dags={"etl": etl_dag, "ml": ml_dag},
connections=[("etl/load", "ml/train")],
)
print(list(combined.nodes()))
# ['etl/extract', 'etl/transform', 'etl/load',
# 'ml/train', 'ml/evaluate', 'ml/deploy']
print(combined.edge_count())
# 5 (2 from etl + 2 from ml + 1 cross-namespace)
print(combined.roots()) # ['etl/extract']
print(combined.leaves()) # ['ml/deploy']
Custom separators
Use the separator parameter to change the namespace delimiter:
combined = compose(
dags={"etl": etl_dag, "ml": ml_dag},
connections=[("etl.load", "ml.train")],
separator=".",
)
print(list(combined.nodes()))
# ['etl.extract', 'etl.transform', 'etl.load',
# 'ml.train', 'ml.evaluate', 'ml.deploy']
Composing many DAGs
The function accepts any number of DAGs:
ingestion = dagron.DAG.builder().add_edge("fetch", "parse").build()
validation = dagron.DAG.builder().add_edge("check_schema", "check_values").build()
storage = dagron.DAG.builder().add_edge("write_db", "write_cache").build()
pipeline = compose(
dags={
"ingest": ingestion,
"validate": validation,
"store": storage,
},
connections=[
("ingest/parse", "validate/check_schema"),
("validate/check_values", "store/write_db"),
],
)
print(pipeline.node_count()) # 6
print(pipeline.edge_count()) # 5
Preserving payloads and metadata
Payloads and metadata from the original DAGs are carried over to the composed DAG:
dag_with_payloads = dagron.DAG()
dag_with_payloads.add_node("train", payload={"epochs": 10, "lr": 0.001})
dag_with_payloads.add_node("evaluate", payload={"metrics": ["acc", "f1"]})
dag_with_payloads.add_edge("train", "evaluate")
combined = compose(dags={"ml": dag_with_payloads})
# Payloads are accessible via prefixed names
print(combined.get_payload("ml/train"))
# {"epochs": 10, "lr": 0.001}
Complete example
import dagron
from dagron.compose import compose
# === Build sub-DAGs for different teams ===
# Data Engineering team
data_eng = (
dagron.DAG.builder()
.add_node("raw_ingest", payload={"source": "s3://bucket/raw"})
.add_node("clean")
.add_node("feature_store")
.add_edge("raw_ingest", "clean")
.add_edge("clean", "feature_store")
.build()
)
# ML team
ml = (
dagron.DAG.builder()
.add_node("train", payload={"model": "xgboost"})
.add_node("evaluate")
.add_node("register")
.add_edge("train", "evaluate")
.add_edge("evaluate", "register")
.build()
)
# Platform team
platform = (
dagron.DAG.builder()
.add_node("deploy_staging")
.add_node("integration_test")
.add_node("deploy_prod")
.add_edge("deploy_staging", "integration_test")
.add_edge("integration_test", "deploy_prod")
.build()
)
# === Compose into a unified pipeline ===
full_pipeline = compose(
dags={
"data": data_eng,
"ml": ml,
"platform": platform,
},
connections=[
("data/feature_store", "ml/train"),
("ml/register", "platform/deploy_staging"),
],
)
print(f"Full pipeline: {full_pipeline.node_count()} nodes, {full_pipeline.edge_count()} edges")
# Full pipeline: 9 nodes, 8 edges
# Inspect the composed structure
for level_idx, level in enumerate(full_pipeline.topological_levels()):
names = [n.name for n in level]
print(f"Level {level_idx}: {names}")
# Level 0: ['data/raw_ingest']
# Level 1: ['data/clean']
# Level 2: ['data/feature_store']
# Level 3: ['ml/train']
# Level 4: ['ml/evaluate']
# Level 5: ['ml/register']
# Level 6: ['platform/deploy_staging']
# Level 7: ['platform/integration_test']
# Level 8: ['platform/deploy_prod']
# Execute the composed DAG
tasks = {
"data/raw_ingest": lambda: "raw data",
"data/clean": lambda: "clean data",
"data/feature_store": lambda: "features",
"ml/train": lambda: "model",
"ml/evaluate": lambda: {"accuracy": 0.95},
"ml/register": lambda: "model_v1",
"platform/deploy_staging": lambda: "staging OK",
"platform/integration_test": lambda: "tests pass",
"platform/deploy_prod": lambda: "deployed",
}
executor = dagron.DAGExecutor(full_pipeline, max_workers=4)
result = executor.execute(tasks)
print(f"Succeeded: {result.succeeded}/{full_pipeline.node_count()}")
See also
- DAG -- the core graph class.
- Templates -- parameterized DAG construction.
- Versioning -- tracking changes to composed DAGs.
- Building DAGs guide -- construction patterns and composition walkthrough.
- Graph Transforms guide -- filtering, merging, and reshaping DAGs.