Skip to main content

Composition

The compose module provides multi-DAG composition with automatic namespace prefixing. Combine independent DAGs into a single unified graph, preserving node payloads and metadata, with optional cross-namespace connections.

from dagron.compose import compose

compose

compose
def compose(
dags: dict[str, DAG],
connections: list[tuple[str, str]] | None = None,
*,
separator: str = "/",
) -> DAG

Compose multiple DAGs into one with namespace prefixes. Each DAG's nodes are prefixed with its namespace key (e.g., a node "load" in namespace "etl" becomes "etl/load"). Internal edges within each DAG are preserved with the same prefixing. Cross-namespace edges can be added via the connections parameter.

ParameterTypeDefaultDescription
dagsdict[str, DAG]requiredMapping of namespace names to DAG objects. Each key becomes the prefix for that DAG's nodes.
connectionslist[tuple[str, str]] | NoneNoneList of (from, to) tuples using fully namespaced names (e.g., ("etl/load", "ml/train")). These create edges between nodes in different namespaces.
separatorstr"/"Separator between namespace prefix and node name.

Returns: DAG -- A new DAG containing all nodes and edges from all input DAGs with namespaces applied, plus any cross-namespace connections.

Node prefixing

Every node in the resulting DAG has a name of the form {namespace}{separator}{original_name}. Payloads and metadata from the original nodes are preserved on the prefixed nodes.

Edge preservation

All edges within each input DAG are preserved with prefixed names. For example, an edge ("a", "b") in namespace "etl" becomes ("etl/a", "etl/b") in the composed DAG.

import dagron
from dagron.compose import compose

# Build individual DAGs
etl_dag = (
dagron.DAG.builder()
.add_edge("extract", "transform")
.add_edge("transform", "load")
.build()
)

ml_dag = (
dagron.DAG.builder()
.add_edge("train", "evaluate")
.add_edge("evaluate", "deploy")
.build()
)

# Compose with a cross-namespace connection
combined = compose(
dags={"etl": etl_dag, "ml": ml_dag},
connections=[("etl/load", "ml/train")],
)

print(list(combined.nodes()))
# ['etl/extract', 'etl/transform', 'etl/load',
# 'ml/train', 'ml/evaluate', 'ml/deploy']

print(combined.edge_count())
# 5 (2 from etl + 2 from ml + 1 cross-namespace)

print(combined.roots()) # ['etl/extract']
print(combined.leaves()) # ['ml/deploy']

Custom separators

Use the separator parameter to change the namespace delimiter:

combined = compose(
dags={"etl": etl_dag, "ml": ml_dag},
connections=[("etl.load", "ml.train")],
separator=".",
)

print(list(combined.nodes()))
# ['etl.extract', 'etl.transform', 'etl.load',
# 'ml.train', 'ml.evaluate', 'ml.deploy']

Composing many DAGs

The function accepts any number of DAGs:

ingestion = dagron.DAG.builder().add_edge("fetch", "parse").build()
validation = dagron.DAG.builder().add_edge("check_schema", "check_values").build()
storage = dagron.DAG.builder().add_edge("write_db", "write_cache").build()

pipeline = compose(
dags={
"ingest": ingestion,
"validate": validation,
"store": storage,
},
connections=[
("ingest/parse", "validate/check_schema"),
("validate/check_values", "store/write_db"),
],
)

print(pipeline.node_count()) # 6
print(pipeline.edge_count()) # 5

Preserving payloads and metadata

Payloads and metadata from the original DAGs are carried over to the composed DAG:

dag_with_payloads = dagron.DAG()
dag_with_payloads.add_node("train", payload={"epochs": 10, "lr": 0.001})
dag_with_payloads.add_node("evaluate", payload={"metrics": ["acc", "f1"]})
dag_with_payloads.add_edge("train", "evaluate")

combined = compose(dags={"ml": dag_with_payloads})

# Payloads are accessible via prefixed names
print(combined.get_payload("ml/train"))
# {"epochs": 10, "lr": 0.001}

Complete example

import dagron
from dagron.compose import compose

# === Build sub-DAGs for different teams ===

# Data Engineering team
data_eng = (
dagron.DAG.builder()
.add_node("raw_ingest", payload={"source": "s3://bucket/raw"})
.add_node("clean")
.add_node("feature_store")
.add_edge("raw_ingest", "clean")
.add_edge("clean", "feature_store")
.build()
)

# ML team
ml = (
dagron.DAG.builder()
.add_node("train", payload={"model": "xgboost"})
.add_node("evaluate")
.add_node("register")
.add_edge("train", "evaluate")
.add_edge("evaluate", "register")
.build()
)

# Platform team
platform = (
dagron.DAG.builder()
.add_node("deploy_staging")
.add_node("integration_test")
.add_node("deploy_prod")
.add_edge("deploy_staging", "integration_test")
.add_edge("integration_test", "deploy_prod")
.build()
)

# === Compose into a unified pipeline ===
full_pipeline = compose(
dags={
"data": data_eng,
"ml": ml,
"platform": platform,
},
connections=[
("data/feature_store", "ml/train"),
("ml/register", "platform/deploy_staging"),
],
)

print(f"Full pipeline: {full_pipeline.node_count()} nodes, {full_pipeline.edge_count()} edges")
# Full pipeline: 9 nodes, 8 edges

# Inspect the composed structure
for level_idx, level in enumerate(full_pipeline.topological_levels()):
names = [n.name for n in level]
print(f"Level {level_idx}: {names}")
# Level 0: ['data/raw_ingest']
# Level 1: ['data/clean']
# Level 2: ['data/feature_store']
# Level 3: ['ml/train']
# Level 4: ['ml/evaluate']
# Level 5: ['ml/register']
# Level 6: ['platform/deploy_staging']
# Level 7: ['platform/integration_test']
# Level 8: ['platform/deploy_prod']

# Execute the composed DAG
tasks = {
"data/raw_ingest": lambda: "raw data",
"data/clean": lambda: "clean data",
"data/feature_store": lambda: "features",
"ml/train": lambda: "model",
"ml/evaluate": lambda: {"accuracy": 0.95},
"ml/register": lambda: "model_v1",
"platform/deploy_staging": lambda: "staging OK",
"platform/integration_test": lambda: "tests pass",
"platform/deploy_prod": lambda: "deployed",
}

executor = dagron.DAGExecutor(full_pipeline, max_workers=4)
result = executor.execute(tasks)
print(f"Succeeded: {result.succeeded}/{full_pipeline.node_count()}")

See also