Skip to main content

Conditional Execution

Not every branch in a pipeline should always run. Sometimes you want to skip expensive model training if validation fails, route data to different processors based on its type, or gate a deployment step on quality metrics. dagron's conditional execution system lets you attach predicates to edges that are evaluated at runtime, dynamically gating which branches execute.

Concepts

Conditional edges

A conditional edge is an edge with an attached predicate — a Python function that receives the upstream node's result and returns True (execute the downstream node) or False (skip it).

condition: lambda result: result["score"] > 0.9

When a conditional edge evaluates to False, the downstream node and all of its descendants are skipped.

ConditionalDAGBuilder

The ConditionalDAGBuilder extends the standard builder with support for condition predicates on edges.

ConditionalExecutor

The ConditionalExecutor evaluates conditions at runtime and only dispatches nodes whose incoming conditions are satisfied.

Building a conditional DAG

import dagron

builder = dagron.ConditionalDAGBuilder()

builder.add_node("validate")
builder.add_node("fast_path")
builder.add_node("slow_path")
builder.add_node("merge")

# Unconditional edge
builder.add_edge("validate", "fast_path", condition=lambda r: r["size"] < 1000)
builder.add_edge("validate", "slow_path", condition=lambda r: r["size"] >= 1000)
builder.add_edge("fast_path", "merge")
builder.add_edge("slow_path", "merge")

dag, conditions = builder.build()

The .build() method returns a tuple: the DAG and a dictionary of conditions keyed by (from_node, to_node) tuples.

Conditional branching. Only one path executes based on the validation result.

Executing with conditions

tasks = {
"validate": lambda: {"size": 500, "valid": True},
"fast_path": lambda: "processed quickly",
"slow_path": lambda: "processed with full pipeline",
"merge": lambda: "done",
}

executor = dagron.ConditionalExecutor(dag, conditions)
result = executor.execute(tasks)

# fast_path runs (size=500 < 1000), slow_path is skipped
print(result.node_results["fast_path"].status) # COMPLETED
print(result.node_results["slow_path"].status) # SKIPPED
print(result.node_results["merge"].status) # COMPLETED
NodeStatusWhy
validatecompletedAlways runs (root)
fast_pathcompletedCondition size < 1000 is True
slow_pathskippedCondition size >= 1000 is False
mergecompletedAt least one predecessor completed

What if the data is large?

tasks["validate"] = lambda: {"size": 5000, "valid": True}

result = executor.execute(tasks)
print(result.node_results["fast_path"].status) # SKIPPED
print(result.node_results["slow_path"].status) # COMPLETED

With size=5000, slow_path executes and fast_path is skipped.

Condition predicates

Conditions are plain Python callables that receive the predecessor's return value:

# Simple threshold
condition=lambda result: result > 0.9

# Dictionary access
condition=lambda result: result["status"] == "ok"

# Complex logic
def should_retrain(result):
return (
result["accuracy"] < 0.95
or result["data_drift"] > 0.1
or result["days_since_last_train"] > 7
)

builder.add_edge("evaluate", "retrain", condition=should_retrain)

Multi-input conditions

When a node has multiple conditional predecessors, the executor evaluates each incoming edge independently. The node runs if at least one incoming conditional edge evaluates to True:

builder = dagron.ConditionalDAGBuilder()
builder.add_node("source_a")
builder.add_node("source_b")
builder.add_node("process")

builder.add_edge("source_a", "process", condition=lambda r: r is not None)
builder.add_edge("source_b", "process", condition=lambda r: r is not None)

dag, conditions = builder.build()

tasks = {
"source_a": lambda: None, # condition False
"source_b": lambda: "data", # condition True
"process": lambda: "processed",
}

result = dagron.ConditionalExecutor(dag, conditions).execute(tasks)
print(result.node_results["process"].status) # COMPLETED (source_b passed)

Real-world example: ML pipeline with quality gates

A common pattern is gating deployment on model quality:

import dagron

builder = dagron.ConditionalDAGBuilder()

# Pipeline stages
builder.add_node("load_data")
builder.add_node("train")
builder.add_node("evaluate")
builder.add_node("deploy_prod")
builder.add_node("deploy_staging")
builder.add_node("alert_team")

# Edges with conditions
builder.add_edge("load_data", "train")
builder.add_edge("train", "evaluate")

# Gate: deploy to prod only if accuracy >= 0.95
builder.add_edge("evaluate", "deploy_prod",
condition=lambda r: r["accuracy"] >= 0.95)

# Gate: deploy to staging if accuracy between 0.85 and 0.95
builder.add_edge("evaluate", "deploy_staging",
condition=lambda r: 0.85 <= r["accuracy"] < 0.95)

# Gate: alert team if accuracy < 0.85
builder.add_edge("evaluate", "alert_team",
condition=lambda r: r["accuracy"] < 0.85)

dag, conditions = builder.build()

ML pipeline with three quality gates. Exactly one downstream path activates.

# Scenario 1: Great model
tasks = {
"load_data": lambda: "loaded",
"train": lambda: "trained",
"evaluate": lambda: {"accuracy": 0.97, "f1": 0.96},
"deploy_prod": lambda: "deployed to production!",
"deploy_staging": lambda: "deployed to staging",
"alert_team": lambda: "alert sent",
}

result = dagron.ConditionalExecutor(dag, conditions).execute(tasks)
print(result.node_results["deploy_prod"].status) # COMPLETED
print(result.node_results["deploy_staging"].status) # SKIPPED
print(result.node_results["alert_team"].status) # SKIPPED
# Scenario 2: Mediocre model
tasks["evaluate"] = lambda: {"accuracy": 0.90, "f1": 0.88}
result = dagron.ConditionalExecutor(dag, conditions).execute(tasks)
print(result.node_results["deploy_prod"].status) # SKIPPED
print(result.node_results["deploy_staging"].status) # COMPLETED
print(result.node_results["alert_team"].status) # SKIPPED
# Scenario 3: Bad model
tasks["evaluate"] = lambda: {"accuracy": 0.70, "f1": 0.65}
result = dagron.ConditionalExecutor(dag, conditions).execute(tasks)
print(result.node_results["deploy_prod"].status) # SKIPPED
print(result.node_results["deploy_staging"].status) # SKIPPED
print(result.node_results["alert_team"].status) # COMPLETED

Conditional chains

Conditions propagate through the graph. If a conditional edge skips a node, that node's descendants are also skipped:

builder = dagron.ConditionalDAGBuilder()
builder.add_node("check")
builder.add_node("step_1")
builder.add_node("step_2")
builder.add_node("step_3")

builder.add_edge("check", "step_1", condition=lambda r: r["go"])
builder.add_edge("step_1", "step_2") # unconditional
builder.add_edge("step_2", "step_3") # unconditional

dag, conditions = builder.build()

tasks = {
"check": lambda: {"go": False},
"step_1": lambda: "1",
"step_2": lambda: "2",
"step_3": lambda: "3",
}

result = dagron.ConditionalExecutor(dag, conditions).execute(tasks)
NodeStatusWhy
checkcompletedRoot node
step_1skippedCondition is False
step_2skippedPredecessor skipped
step_3skippedPredecessor skipped

Mixing conditional and unconditional edges

You can freely mix conditional and unconditional edges in the same graph:

builder = dagron.ConditionalDAGBuilder()

builder.add_node("extract")
builder.add_node("validate")
builder.add_node("transform")
builder.add_node("quarantine")
builder.add_node("load")

# Unconditional: extract -> validate
builder.add_edge("extract", "validate")

# Conditional: validate -> transform (if valid)
builder.add_edge("validate", "transform",
condition=lambda r: r["valid"])

# Conditional: validate -> quarantine (if invalid)
builder.add_edge("validate", "quarantine",
condition=lambda r: not r["valid"])

# Unconditional: transform -> load
builder.add_edge("transform", "load")

dag, conditions = builder.build()

Mixed conditional and unconditional edges. Invalid data is quarantined.

Combining conditions with fail-fast

When fail_fast=True (the default in the underlying executor), a failure in any executed node skips its descendants — this combines naturally with conditional skipping:

tasks = {
"extract": lambda: "data",
"validate": lambda: {"valid": True},
"transform": lambda: (_ for _ in ()).throw(RuntimeError("transform error")),
"quarantine": lambda: "quarantined",
"load": lambda: "loaded",
}

result = dagron.ConditionalExecutor(dag, conditions).execute(tasks)
NodeStatusWhy
extractcompletedRoot
validatecompletedUnconditional
transformfailedRaised exception
quarantineskippedCondition False (valid=True)
loadskippedPredecessor failed

Debugging conditions

To understand why a node was skipped, inspect the conditions dictionary:

dag, conditions = builder.build()

# List all conditional edges
for (src, dst), predicate in conditions.items():
print(f"{src} -> {dst}: {predicate}")

You can also test conditions in isolation:

validate_result = {"valid": True, "size": 500}

for (src, dst), predicate in conditions.items():
if src == "validate":
print(f"validate -> {dst}: {predicate(validate_result)}")

Example: data routing pipeline

Route records to different processors based on their type:

import dagron

builder = dagron.ConditionalDAGBuilder()

builder.add_node("classify")
builder.add_node("process_text")
builder.add_node("process_image")
builder.add_node("process_video")
builder.add_node("store")

builder.add_edge("classify", "process_text",
condition=lambda r: r["type"] == "text")
builder.add_edge("classify", "process_image",
condition=lambda r: r["type"] == "image")
builder.add_edge("classify", "process_video",
condition=lambda r: r["type"] == "video")

builder.add_edge("process_text", "store")
builder.add_edge("process_image", "store")
builder.add_edge("process_video", "store")

dag, conditions = builder.build()

tasks = {
"classify": lambda: {"type": "image", "data": b"..."},
"process_text": lambda: "text processed",
"process_image": lambda: "image processed",
"process_video": lambda: "video processed",
"store": lambda: "stored",
}

result = dagron.ConditionalExecutor(dag, conditions).execute(tasks)
print(result.node_results["process_image"].status) # COMPLETED
print(result.node_results["process_text"].status) # SKIPPED
print(result.node_results["process_video"].status) # SKIPPED

Data routing. Only the matching processor runs based on the classification result.

Best practices

  1. Keep conditions pure. Conditions should only examine the input result, not produce side effects or access external state.

  2. Handle None results. If a predecessor might return None, guard against it in your condition: condition=lambda r: r is not None and r["ok"].

  3. Use descriptive function names. Named functions are easier to debug than lambdas:

    def is_high_quality(result):
    return result["accuracy"] >= 0.95

    builder.add_edge("evaluate", "deploy", condition=is_high_quality)
  4. Test conditions independently. Unit-test your predicate functions with various inputs before wiring them into the DAG.

  5. Combine with tracing. Enable tracing to see which conditions fired and which branches were taken. See Tracing & Profiling.

API reference

Class / MethodDocs
ConditionalDAGBuilderConditions
ConditionalExecutorConditions

Next steps