Conditional Execution
The conditional execution module allows edges in your DAG to carry predicate
functions. At runtime, the executor evaluates each condition before traversing
the edge. If the condition returns False, the downstream node (and its
subtree) is skipped. This enables branching, feature flags, and data-dependent
routing without modifying the graph structure.
See the Conditional Execution guide for patterns including if/else branches, switch-case routing, and dynamic feature flags.
ConditionalDAGBuilder
class ConditionalDAGBuilder()
A specialized builder that supports conditional edges. Similar to
DAGBuilder but with an additional condition parameter on
add_edge(). The builder produces both a DAG and a conditions dictionary that
the ConditionalExecutor uses at runtime.
import dagron
builder = dagron.ConditionalDAGBuilder()
add_node
def add_node(
name: str,
payload: Any = None,
metadata: dict | None = None,
) -> ConditionalDAGBuilder
Add a node to the graph. Returns self for chaining.
| Parameter | Type | Default | Description |
|---|---|---|---|
name | str | required | Unique name for the node. |
payload | Any | None | Arbitrary Python object attached to the node. |
metadata | dict | None | None | Optional metadata dictionary. |
add_edge
def add_edge(
from_node: str,
to_node: str,
weight: float | None = None,
label: str | None = None,
condition: Callable[[], bool] | None = None,
) -> ConditionalDAGBuilder
Add a directed edge with an optional condition predicate. If condition is
provided, the edge is only traversed at runtime when condition() returns
True. If condition is None, the edge is unconditional (always traversed).
| Parameter | Type | Default | Description |
|---|---|---|---|
from_node | str | required | Name of the source node. |
to_node | str | required | Name of the target node. |
weight | float | None | None | Optional numeric weight. |
label | str | None | None | Optional label for visualization. |
condition | Callable[[], bool] | None | None | Predicate function evaluated at runtime. Edge is traversed only if it returns True. |
build
def build() -> tuple[DAG, dict[tuple[str, str], Callable[[], bool]]]
Finalize and validate the graph. Returns a tuple of the
DAG and a dictionary mapping (from_node, to_node) pairs to
their condition predicates. Pass both to the
ConditionalExecutor.
Returns: tuple[DAG, dict] — the validated DAG and the conditions map.
Raises:
CycleError— if the graph contains a cycle.NodeNotFoundError— if an edge references a non-existent node.
import dagron
use_gpu = True # runtime flag
dag, conditions = (
dagron.ConditionalDAGBuilder()
.add_node("preprocess")
.add_node("cpu_train")
.add_node("gpu_train")
.add_node("evaluate")
.add_edge("preprocess", "cpu_train", condition=lambda: not use_gpu)
.add_edge("preprocess", "gpu_train", condition=lambda: use_gpu)
.add_edge("cpu_train", "evaluate")
.add_edge("gpu_train", "evaluate")
.build()
)
print(dag.node_count()) # 4
print(len(conditions)) # 2 (only conditional edges)
ConditionalEdge
class ConditionalEdge(
from_node: str,
to_node: str,
condition: Callable[[], bool],
label: str | None = None,
)
A data class representing a conditional edge. Returned by
ConditionalDAGBuilder internals and useful for introspection.
| Property | Type | Description |
|---|---|---|
from_node | str | The source node name. |
to_node | str | The target node name. |
condition | Callable[[], bool] | The predicate function. |
label | `str | None` |
ConditionalExecutor
class ConditionalExecutor(
dag: DAG,
conditions: dict[tuple[str, str], Callable[[], bool]],
callbacks: ExecutionCallbacks | None = None,
fail_fast: bool = True,
enable_tracing: bool = False,
)
An executor that evaluates edge conditions at runtime. Before dispatching a
node, the executor checks all incoming conditional edges. If any required
condition returns False, the node is skipped. A node is executed only when at
least one incoming conditional edge evaluates to True (or the node has at
least one unconditional incoming edge whose source completed).
| Parameter | Type | Default | Description |
|---|---|---|---|
dag | DAG | required | The DAG whose topology drives execution order. |
conditions | dict[tuple[str, str], Callable[[], bool]] | required | Map of (from, to) edge pairs to condition predicates. Produced by ConditionalDAGBuilder.build(). |
callbacks | ExecutionCallbacks | None | None | Optional lifecycle callbacks. |
fail_fast | bool | True | If True, skip downstream nodes when any node fails. |
enable_tracing | bool | False | If True, record a Chrome-compatible execution trace. |
execute
def execute(
tasks: dict[str, Callable],
) -> ExecutionResult
Execute tasks, evaluating conditions on each edge before dispatching.
| Parameter | Type | Default | Description |
|---|---|---|---|
tasks | dict[str, Callable] | required | Map of node names to callable tasks. |
Returns: ExecutionResult
import dagron
use_gpu = True
dag, conditions = (
dagron.ConditionalDAGBuilder()
.add_node("preprocess")
.add_node("cpu_train")
.add_node("gpu_train")
.add_node("evaluate")
.add_edge("preprocess", "cpu_train", condition=lambda: not use_gpu)
.add_edge("preprocess", "gpu_train", condition=lambda: use_gpu)
.add_edge("cpu_train", "evaluate")
.add_edge("gpu_train", "evaluate")
.build()
)
executor = dagron.ConditionalExecutor(dag, conditions)
result = executor.execute({
"preprocess": lambda: "data ready",
"cpu_train": lambda: "trained on CPU",
"gpu_train": lambda: "trained on GPU",
"evaluate": lambda: "accuracy: 0.95",
})
print(result.succeeded) # 3
print(result.skipped) # 1 (cpu_train skipped)
print(result.node_results["gpu_train"].status) # COMPLETED
print(result.node_results["cpu_train"].status) # SKIPPED
Patterns
If/Else Branch
flag = True
dag, conditions = (
dagron.ConditionalDAGBuilder()
.add_node("check")
.add_node("branch_true")
.add_node("branch_false")
.add_node("merge")
.add_edge("check", "branch_true", condition=lambda: flag)
.add_edge("check", "branch_false", condition=lambda: not flag)
.add_edge("branch_true", "merge")
.add_edge("branch_false", "merge")
.build()
)
Feature Flags
import os
dag, conditions = (
dagron.ConditionalDAGBuilder()
.add_node("fetch")
.add_node("cache_result")
.add_node("process")
.add_edge("fetch", "cache_result",
condition=lambda: os.getenv("ENABLE_CACHE") == "1")
.add_edge("fetch", "process")
.add_edge("cache_result", "process")
.build()
)
Data-Dependent Routing
Conditions can inspect shared state that is updated by upstream tasks:
shared = {}
def classify():
shared["category"] = "premium"
return shared["category"]
dag, conditions = (
dagron.ConditionalDAGBuilder()
.add_node("classify")
.add_node("premium_flow")
.add_node("standard_flow")
.add_node("finalize")
.add_edge("classify", "premium_flow",
condition=lambda: shared.get("category") == "premium")
.add_edge("classify", "standard_flow",
condition=lambda: shared.get("category") != "premium")
.add_edge("premium_flow", "finalize")
.add_edge("standard_flow", "finalize")
.build()
)
Related
- DAGExecutor — the base executor without conditions.
- DAGBuilder — the standard builder without conditional edges.
- Dynamic Execution — modify the graph at runtime instead of skipping edges.
- Approval Gates — human-in-the-loop pause/resume.
- Conditional Execution guide — patterns and best practices.