Skip to main content

Conditional Execution

The conditional execution module allows edges in your DAG to carry predicate functions. At runtime, the executor evaluates each condition before traversing the edge. If the condition returns False, the downstream node (and its subtree) is skipped. This enables branching, feature flags, and data-dependent routing without modifying the graph structure.

See the Conditional Execution guide for patterns including if/else branches, switch-case routing, and dynamic feature flags.


ConditionalDAGBuilder

ConditionalDAGBuilder
class ConditionalDAGBuilder()

A specialized builder that supports conditional edges. Similar to DAGBuilder but with an additional condition parameter on add_edge(). The builder produces both a DAG and a conditions dictionary that the ConditionalExecutor uses at runtime.

import dagron

builder = dagron.ConditionalDAGBuilder()

add_node

ConditionalDAGBuilder.add_node
def add_node(
name: str,
payload: Any = None,
metadata: dict | None = None,
) -> ConditionalDAGBuilder

Add a node to the graph. Returns self for chaining.

ParameterTypeDefaultDescription
namestrrequiredUnique name for the node.
payloadAnyNoneArbitrary Python object attached to the node.
metadatadict | NoneNoneOptional metadata dictionary.

add_edge

ConditionalDAGBuilder.add_edge
def add_edge(
from_node: str,
to_node: str,
weight: float | None = None,
label: str | None = None,
condition: Callable[[], bool] | None = None,
) -> ConditionalDAGBuilder

Add a directed edge with an optional condition predicate. If condition is provided, the edge is only traversed at runtime when condition() returns True. If condition is None, the edge is unconditional (always traversed).

ParameterTypeDefaultDescription
from_nodestrrequiredName of the source node.
to_nodestrrequiredName of the target node.
weightfloat | NoneNoneOptional numeric weight.
labelstr | NoneNoneOptional label for visualization.
conditionCallable[[], bool] | NoneNonePredicate function evaluated at runtime. Edge is traversed only if it returns True.

build

ConditionalDAGBuilder.build
def build() -> tuple[DAG, dict[tuple[str, str], Callable[[], bool]]]

Finalize and validate the graph. Returns a tuple of the DAG and a dictionary mapping (from_node, to_node) pairs to their condition predicates. Pass both to the ConditionalExecutor.

Returns: tuple[DAG, dict] — the validated DAG and the conditions map.

Raises:

  • CycleError — if the graph contains a cycle.
  • NodeNotFoundError — if an edge references a non-existent node.
import dagron

use_gpu = True # runtime flag

dag, conditions = (
dagron.ConditionalDAGBuilder()
.add_node("preprocess")
.add_node("cpu_train")
.add_node("gpu_train")
.add_node("evaluate")
.add_edge("preprocess", "cpu_train", condition=lambda: not use_gpu)
.add_edge("preprocess", "gpu_train", condition=lambda: use_gpu)
.add_edge("cpu_train", "evaluate")
.add_edge("gpu_train", "evaluate")
.build()
)

print(dag.node_count()) # 4
print(len(conditions)) # 2 (only conditional edges)

ConditionalEdge

ConditionalEdge
class ConditionalEdge(
from_node: str,
to_node: str,
condition: Callable[[], bool],
label: str | None = None,
)

A data class representing a conditional edge. Returned by ConditionalDAGBuilder internals and useful for introspection.

PropertyTypeDescription
from_nodestrThe source node name.
to_nodestrThe target node name.
conditionCallable[[], bool]The predicate function.
label`strNone`

ConditionalExecutor

ConditionalExecutor
class ConditionalExecutor(
dag: DAG,
conditions: dict[tuple[str, str], Callable[[], bool]],
callbacks: ExecutionCallbacks | None = None,
fail_fast: bool = True,
enable_tracing: bool = False,
)

An executor that evaluates edge conditions at runtime. Before dispatching a node, the executor checks all incoming conditional edges. If any required condition returns False, the node is skipped. A node is executed only when at least one incoming conditional edge evaluates to True (or the node has at least one unconditional incoming edge whose source completed).

ParameterTypeDefaultDescription
dagDAGrequiredThe DAG whose topology drives execution order.
conditionsdict[tuple[str, str], Callable[[], bool]]requiredMap of (from, to) edge pairs to condition predicates. Produced by ConditionalDAGBuilder.build().
callbacksExecutionCallbacks | NoneNoneOptional lifecycle callbacks.
fail_fastboolTrueIf True, skip downstream nodes when any node fails.
enable_tracingboolFalseIf True, record a Chrome-compatible execution trace.

execute

ConditionalExecutor.execute
def execute(
tasks: dict[str, Callable],
) -> ExecutionResult

Execute tasks, evaluating conditions on each edge before dispatching.

ParameterTypeDefaultDescription
tasksdict[str, Callable]requiredMap of node names to callable tasks.

Returns: ExecutionResult

import dagron

use_gpu = True

dag, conditions = (
dagron.ConditionalDAGBuilder()
.add_node("preprocess")
.add_node("cpu_train")
.add_node("gpu_train")
.add_node("evaluate")
.add_edge("preprocess", "cpu_train", condition=lambda: not use_gpu)
.add_edge("preprocess", "gpu_train", condition=lambda: use_gpu)
.add_edge("cpu_train", "evaluate")
.add_edge("gpu_train", "evaluate")
.build()
)

executor = dagron.ConditionalExecutor(dag, conditions)
result = executor.execute({
"preprocess": lambda: "data ready",
"cpu_train": lambda: "trained on CPU",
"gpu_train": lambda: "trained on GPU",
"evaluate": lambda: "accuracy: 0.95",
})

print(result.succeeded) # 3
print(result.skipped) # 1 (cpu_train skipped)
print(result.node_results["gpu_train"].status) # COMPLETED
print(result.node_results["cpu_train"].status) # SKIPPED

Patterns

If/Else Branch

flag = True

dag, conditions = (
dagron.ConditionalDAGBuilder()
.add_node("check")
.add_node("branch_true")
.add_node("branch_false")
.add_node("merge")
.add_edge("check", "branch_true", condition=lambda: flag)
.add_edge("check", "branch_false", condition=lambda: not flag)
.add_edge("branch_true", "merge")
.add_edge("branch_false", "merge")
.build()
)

Feature Flags

import os

dag, conditions = (
dagron.ConditionalDAGBuilder()
.add_node("fetch")
.add_node("cache_result")
.add_node("process")
.add_edge("fetch", "cache_result",
condition=lambda: os.getenv("ENABLE_CACHE") == "1")
.add_edge("fetch", "process")
.add_edge("cache_result", "process")
.build()
)

Data-Dependent Routing

Conditions can inspect shared state that is updated by upstream tasks:

shared = {}

def classify():
shared["category"] = "premium"
return shared["category"]

dag, conditions = (
dagron.ConditionalDAGBuilder()
.add_node("classify")
.add_node("premium_flow")
.add_node("standard_flow")
.add_node("finalize")
.add_edge("classify", "premium_flow",
condition=lambda: shared.get("category") == "premium")
.add_edge("classify", "standard_flow",
condition=lambda: shared.get("category") != "premium")
.add_edge("premium_flow", "finalize")
.add_edge("standard_flow", "finalize")
.build()
)