Conditional Execution
Not every branch in a pipeline should always run. Sometimes you want to skip expensive model training if validation fails, route data to different processors based on its type, or gate a deployment step on quality metrics. dagron's conditional execution system lets you attach predicates to edges that are evaluated at runtime, dynamically gating which branches execute.
Concepts
Conditional edges
A conditional edge is an edge with an attached predicate — a Python function
that receives the upstream node's result and returns True (execute the
downstream node) or False (skip it).
condition: lambda result: result["score"] > 0.9
When a conditional edge evaluates to False, the downstream node and all of
its descendants are skipped.
ConditionalDAGBuilder
The ConditionalDAGBuilder extends the standard builder
with support for condition predicates on edges.
ConditionalExecutor
The ConditionalExecutor evaluates conditions at
runtime and only dispatches nodes whose incoming conditions are satisfied.
Building a conditional DAG
import dagron
builder = dagron.ConditionalDAGBuilder()
builder.add_node("validate")
builder.add_node("fast_path")
builder.add_node("slow_path")
builder.add_node("merge")
# Unconditional edge
builder.add_edge("validate", "fast_path", condition=lambda r: r["size"] < 1000)
builder.add_edge("validate", "slow_path", condition=lambda r: r["size"] >= 1000)
builder.add_edge("fast_path", "merge")
builder.add_edge("slow_path", "merge")
dag, conditions = builder.build()
The .build() method returns a tuple: the DAG and a dictionary of conditions
keyed by (from_node, to_node) tuples.
Conditional branching. Only one path executes based on the validation result.
Executing with conditions
tasks = {
"validate": lambda: {"size": 500, "valid": True},
"fast_path": lambda: "processed quickly",
"slow_path": lambda: "processed with full pipeline",
"merge": lambda: "done",
}
executor = dagron.ConditionalExecutor(dag, conditions)
result = executor.execute(tasks)
# fast_path runs (size=500 < 1000), slow_path is skipped
print(result.node_results["fast_path"].status) # COMPLETED
print(result.node_results["slow_path"].status) # SKIPPED
print(result.node_results["merge"].status) # COMPLETED
| Node | Status | Why |
|---|---|---|
| validate | completed | Always runs (root) |
| fast_path | completed | Condition size < 1000 is True |
| slow_path | skipped | Condition size >= 1000 is False |
| merge | completed | At least one predecessor completed |
What if the data is large?
tasks["validate"] = lambda: {"size": 5000, "valid": True}
result = executor.execute(tasks)
print(result.node_results["fast_path"].status) # SKIPPED
print(result.node_results["slow_path"].status) # COMPLETED
With size=5000, slow_path executes and fast_path is skipped.
Condition predicates
Conditions are plain Python callables that receive the predecessor's return value:
# Simple threshold
condition=lambda result: result > 0.9
# Dictionary access
condition=lambda result: result["status"] == "ok"
# Complex logic
def should_retrain(result):
return (
result["accuracy"] < 0.95
or result["data_drift"] > 0.1
or result["days_since_last_train"] > 7
)
builder.add_edge("evaluate", "retrain", condition=should_retrain)
Multi-input conditions
When a node has multiple conditional predecessors, the executor evaluates each
incoming edge independently. The node runs if at least one incoming
conditional edge evaluates to True:
builder = dagron.ConditionalDAGBuilder()
builder.add_node("source_a")
builder.add_node("source_b")
builder.add_node("process")
builder.add_edge("source_a", "process", condition=lambda r: r is not None)
builder.add_edge("source_b", "process", condition=lambda r: r is not None)
dag, conditions = builder.build()
tasks = {
"source_a": lambda: None, # condition False
"source_b": lambda: "data", # condition True
"process": lambda: "processed",
}
result = dagron.ConditionalExecutor(dag, conditions).execute(tasks)
print(result.node_results["process"].status) # COMPLETED (source_b passed)
Real-world example: ML pipeline with quality gates
A common pattern is gating deployment on model quality:
import dagron
builder = dagron.ConditionalDAGBuilder()
# Pipeline stages
builder.add_node("load_data")
builder.add_node("train")
builder.add_node("evaluate")
builder.add_node("deploy_prod")
builder.add_node("deploy_staging")
builder.add_node("alert_team")
# Edges with conditions
builder.add_edge("load_data", "train")
builder.add_edge("train", "evaluate")
# Gate: deploy to prod only if accuracy >= 0.95
builder.add_edge("evaluate", "deploy_prod",
condition=lambda r: r["accuracy"] >= 0.95)
# Gate: deploy to staging if accuracy between 0.85 and 0.95
builder.add_edge("evaluate", "deploy_staging",
condition=lambda r: 0.85 <= r["accuracy"] < 0.95)
# Gate: alert team if accuracy < 0.85
builder.add_edge("evaluate", "alert_team",
condition=lambda r: r["accuracy"] < 0.85)
dag, conditions = builder.build()
ML pipeline with three quality gates. Exactly one downstream path activates.
# Scenario 1: Great model
tasks = {
"load_data": lambda: "loaded",
"train": lambda: "trained",
"evaluate": lambda: {"accuracy": 0.97, "f1": 0.96},
"deploy_prod": lambda: "deployed to production!",
"deploy_staging": lambda: "deployed to staging",
"alert_team": lambda: "alert sent",
}
result = dagron.ConditionalExecutor(dag, conditions).execute(tasks)
print(result.node_results["deploy_prod"].status) # COMPLETED
print(result.node_results["deploy_staging"].status) # SKIPPED
print(result.node_results["alert_team"].status) # SKIPPED
# Scenario 2: Mediocre model
tasks["evaluate"] = lambda: {"accuracy": 0.90, "f1": 0.88}
result = dagron.ConditionalExecutor(dag, conditions).execute(tasks)
print(result.node_results["deploy_prod"].status) # SKIPPED
print(result.node_results["deploy_staging"].status) # COMPLETED
print(result.node_results["alert_team"].status) # SKIPPED
# Scenario 3: Bad model
tasks["evaluate"] = lambda: {"accuracy": 0.70, "f1": 0.65}
result = dagron.ConditionalExecutor(dag, conditions).execute(tasks)
print(result.node_results["deploy_prod"].status) # SKIPPED
print(result.node_results["deploy_staging"].status) # SKIPPED
print(result.node_results["alert_team"].status) # COMPLETED
Conditional chains
Conditions propagate through the graph. If a conditional edge skips a node, that node's descendants are also skipped:
builder = dagron.ConditionalDAGBuilder()
builder.add_node("check")
builder.add_node("step_1")
builder.add_node("step_2")
builder.add_node("step_3")
builder.add_edge("check", "step_1", condition=lambda r: r["go"])
builder.add_edge("step_1", "step_2") # unconditional
builder.add_edge("step_2", "step_3") # unconditional
dag, conditions = builder.build()
tasks = {
"check": lambda: {"go": False},
"step_1": lambda: "1",
"step_2": lambda: "2",
"step_3": lambda: "3",
}
result = dagron.ConditionalExecutor(dag, conditions).execute(tasks)
| Node | Status | Why |
|---|---|---|
| check | completed | Root node |
| step_1 | skipped | Condition is False |
| step_2 | skipped | Predecessor skipped |
| step_3 | skipped | Predecessor skipped |
Mixing conditional and unconditional edges
You can freely mix conditional and unconditional edges in the same graph:
builder = dagron.ConditionalDAGBuilder()
builder.add_node("extract")
builder.add_node("validate")
builder.add_node("transform")
builder.add_node("quarantine")
builder.add_node("load")
# Unconditional: extract -> validate
builder.add_edge("extract", "validate")
# Conditional: validate -> transform (if valid)
builder.add_edge("validate", "transform",
condition=lambda r: r["valid"])
# Conditional: validate -> quarantine (if invalid)
builder.add_edge("validate", "quarantine",
condition=lambda r: not r["valid"])
# Unconditional: transform -> load
builder.add_edge("transform", "load")
dag, conditions = builder.build()
Mixed conditional and unconditional edges. Invalid data is quarantined.
Combining conditions with fail-fast
When fail_fast=True (the default in the underlying executor), a failure in
any executed node skips its descendants — this combines naturally with
conditional skipping:
tasks = {
"extract": lambda: "data",
"validate": lambda: {"valid": True},
"transform": lambda: (_ for _ in ()).throw(RuntimeError("transform error")),
"quarantine": lambda: "quarantined",
"load": lambda: "loaded",
}
result = dagron.ConditionalExecutor(dag, conditions).execute(tasks)
| Node | Status | Why |
|---|---|---|
| extract | completed | Root |
| validate | completed | Unconditional |
| transform | failed | Raised exception |
| quarantine | skipped | Condition False (valid=True) |
| load | skipped | Predecessor failed |
Debugging conditions
To understand why a node was skipped, inspect the conditions dictionary:
dag, conditions = builder.build()
# List all conditional edges
for (src, dst), predicate in conditions.items():
print(f"{src} -> {dst}: {predicate}")
You can also test conditions in isolation:
validate_result = {"valid": True, "size": 500}
for (src, dst), predicate in conditions.items():
if src == "validate":
print(f"validate -> {dst}: {predicate(validate_result)}")
Example: data routing pipeline
Route records to different processors based on their type:
import dagron
builder = dagron.ConditionalDAGBuilder()
builder.add_node("classify")
builder.add_node("process_text")
builder.add_node("process_image")
builder.add_node("process_video")
builder.add_node("store")
builder.add_edge("classify", "process_text",
condition=lambda r: r["type"] == "text")
builder.add_edge("classify", "process_image",
condition=lambda r: r["type"] == "image")
builder.add_edge("classify", "process_video",
condition=lambda r: r["type"] == "video")
builder.add_edge("process_text", "store")
builder.add_edge("process_image", "store")
builder.add_edge("process_video", "store")
dag, conditions = builder.build()
tasks = {
"classify": lambda: {"type": "image", "data": b"..."},
"process_text": lambda: "text processed",
"process_image": lambda: "image processed",
"process_video": lambda: "video processed",
"store": lambda: "stored",
}
result = dagron.ConditionalExecutor(dag, conditions).execute(tasks)
print(result.node_results["process_image"].status) # COMPLETED
print(result.node_results["process_text"].status) # SKIPPED
print(result.node_results["process_video"].status) # SKIPPED
Data routing. Only the matching processor runs based on the classification result.
Best practices
-
Keep conditions pure. Conditions should only examine the input result, not produce side effects or access external state.
-
Handle None results. If a predecessor might return
None, guard against it in your condition:condition=lambda r: r is not None and r["ok"]. -
Use descriptive function names. Named functions are easier to debug than lambdas:
def is_high_quality(result):
return result["accuracy"] >= 0.95
builder.add_edge("evaluate", "deploy", condition=is_high_quality) -
Test conditions independently. Unit-test your predicate functions with various inputs before wiring them into the DAG.
-
Combine with tracing. Enable tracing to see which conditions fired and which branches were taken. See Tracing & Profiling.
API reference
| Class / Method | Docs |
|---|---|
ConditionalDAGBuilder | Conditions |
ConditionalExecutor | Conditions |
Next steps
- Dynamic DAGs — expand the graph at runtime based on node results.
- Incremental Execution — combine conditions with incremental recomputation.
- Checkpointing — resume conditional pipelines after failures.