Skip to main content

Reactive DAG

The reactive module extends dagron's execution model into a push-based reactive system. When you set an input value, the ReactiveDAG automatically cascades recomputation through the graph, only recomputing nodes whose inputs have actually changed (early cutoff). Subscriber callbacks are fired whenever a node's value changes, enabling live dashboards, incremental pipelines, and interactive data exploration.

from dagron.execution.reactive import ReactiveDAG

ReactiveDAG

ReactiveDAG
class ReactiveDAG:
def __init__(
self,
dag: DAG,
tasks: dict[str, Callable[..., Any]],
) -> None: ...

Push-based reactive DAG execution system. Setting an input value automatically cascades recomputation through the graph and fires subscriber callbacks when outputs change.

Each task function receives keyword arguments named after its predecessor nodes, with the current values of those predecessors. For example, a node "transform" with predecessor "extract" receives transform(extract=<extract_value>).

ParameterTypeDefaultDescription
dagDAGrequiredThe DAG defining the dependency structure.
tasksdict[str, Callable[..., Any]]requiredMapping of node names to callables. Each callable receives keyword arguments named after its predecessor nodes.
import dagron
from dagron.execution.reactive import ReactiveDAG

dag = (
dagron.DAG.builder()
.add_edge("raw", "cleaned")
.add_edge("cleaned", "features")
.add_edge("features", "prediction")
.build()
)

tasks = {
"raw": lambda: None, # Input node -- value set externally
"cleaned": lambda raw=None: [x.strip() for x in raw] if raw else [],
"features": lambda cleaned=None: len(cleaned) if cleaned else 0,
"prediction": lambda features=None: features > 5 if features else False,
}

reactive = ReactiveDAG(dag, tasks)

Properties


ReactiveDAG.dag

ReactiveDAG.dag
@property
def dag(self) -> DAG

The underlying DAG.

Returns: DAG -- The DAG defining the dependency structure.


ReactiveDAG.values

ReactiveDAG.values
@property
def values(self) -> dict[str, Any]

Current values of all computed nodes. Returns a read-only copy.

Returns: dict[str, Any] -- Mapping of node names to their current values.

all_values = reactive.values
for name, value in all_values.items():
print(f"{name}: {value}")

Methods


ReactiveDAG.initialize

ReactiveDAG.initialize
def initialize(self) -> dict[str, Any]

Compute all nodes in topological order. This performs the initial full computation. After this, use set_input() for incremental updates.

Any values pre-set via set_input() before initialization are preserved and used during the computation.

Returns: dict[str, Any] -- Dictionary of all computed values.

reactive = ReactiveDAG(dag, tasks)
values = reactive.initialize()
print(f"Initial prediction: {values.get('prediction')}")

ReactiveDAG.set_input

ReactiveDAG.set_input
def set_input(
self,
node_name: str,
value: Any,
) -> dict[str, Any]

Set an input value and cascade recomputation through the graph. Only nodes that are transitively downstream of the changed input are considered for recomputation, and the early cutoff optimization skips nodes whose computed value has not actually changed.

If initialize() has not been called yet, this method stores the value and then calls initialize() automatically.

ParameterTypeDefaultDescription
node_namestrrequiredName of the input node to set.
valueAnyrequiredNew value for the node.

Returns: dict[str, Any] -- Dictionary of all nodes that were recomputed, mapping node name to new value. Includes the input node itself.

# Set a new input and see what changed
changed = reactive.set_input("raw", [" Alice ", " Bob ", " Charlie "])
print(f"Changed nodes: {list(changed.keys())}")
# Changed nodes: ['raw', 'cleaned', 'features', 'prediction']

# Set the same value again -- early cutoff prevents recomputation
changed = reactive.set_input("raw", [" Alice ", " Bob ", " Charlie "])
print(f"Changed nodes: {list(changed.keys())}")
# Changed nodes: {} (nothing changed)

ReactiveDAG.set_inputs

ReactiveDAG.set_inputs
def set_inputs(
self,
values: dict[str, Any],
) -> dict[str, Any]

Set multiple input values and cascade recomputation. More efficient than calling set_input() multiple times because it computes the combined dirty set and processes all changes in a single topological pass.

ParameterTypeDefaultDescription
valuesdict[str, Any]requiredMapping of node names to new values.

Returns: dict[str, Any] -- Dictionary of all nodes that were recomputed.

changed = reactive.set_inputs({
"raw": [" Alice ", " Bob "],
})

ReactiveDAG.subscribe

ReactiveDAG.subscribe
def subscribe(
self,
node_name: str,
callback: Callable[[str, Any], None],
) -> Callable[[], None]

Subscribe to changes on a specific node. The callback is called with (node_name, new_value) whenever the node's value changes during set_input() or initialize().

ParameterTypeDefaultDescription
node_namestrrequiredName of the node to watch.
callbackCallable[[str, Any], None]requiredFunction called with (node_name, new_value) on each change.

Returns: Callable[[], None] -- An unsubscribe function. Call it to remove the subscription.

def on_prediction_change(name: str, value):
print(f"Prediction updated: {value}")

unsubscribe = reactive.subscribe("prediction", on_prediction_change)

reactive.set_input("raw", ["a", "b", "c", "d", "e", "f"])
# Prints: Prediction updated: True

# Stop listening
unsubscribe()

ReactiveDAG.subscribe_all

ReactiveDAG.subscribe_all
def subscribe_all(
self,
callback: Callable[[str, Any], None],
) -> Callable[[], None]

Subscribe to changes on any node in the DAG. The callback is called with (node_name, new_value) whenever any node's value changes.

ParameterTypeDefaultDescription
callbackCallable[[str, Any], None]requiredFunction called with (node_name, new_value) on each change to any node.

Returns: Callable[[], None] -- An unsubscribe function.

changes_log = []

def log_all(name: str, value):
changes_log.append((name, value))

unsub = reactive.subscribe_all(log_all)
reactive.set_input("raw", ["x", "y"])

print(f"Total changes: {len(changes_log)}")
for name, val in changes_log:
print(f" {name} = {val}")

ReactiveDAG.get

ReactiveDAG.get
def get(self, node_name: str) -> Any

Get the current value of a node.

ParameterTypeDefaultDescription
node_namestrrequiredName of the node.

Returns: Any -- The node's current value, or None if not yet computed.

prediction = reactive.get("prediction")
features = reactive.get("features")
print(f"Features: {features}, Prediction: {prediction}")

Early cutoff

The reactive system implements early cutoff optimization. When a node is recomputed and its new value equals the old value (via ==), downstream nodes are not recomputed. This prevents unnecessary cascading through the graph.

import dagron
from dagron.execution.reactive import ReactiveDAG

dag = (
dagron.DAG.builder()
.add_edge("input", "round")
.add_edge("round", "format")
.build()
)

tasks = {
"input": lambda: None,
"round": lambda input=None: round(input, 2) if input else 0,
"format": lambda round=None: f"Value: {round}",
}

reactive = ReactiveDAG(dag, tasks)
reactive.initialize()

# Setting 3.14159 -- round(3.14159, 2) = 3.14
changed = reactive.set_input("input", 3.14159)
print(f"Changed: {list(changed.keys())}")
# Changed: ['input', 'round', 'format']

# Setting 3.14001 -- round(3.14001, 2) = 3.14 (same!)
changed = reactive.set_input("input", 3.14001)
print(f"Changed: {list(changed.keys())}")
# Changed: ['input', 'round']
# 'format' was NOT recomputed because 'round' value didn't change

Complete example

import dagron
from dagron.execution.reactive import ReactiveDAG

# Build a reactive data processing pipeline
dag = (
dagron.DAG.builder()
.add_edge("raw_data", "clean")
.add_edge("clean", "stats")
.add_edge("clean", "top_n")
.add_edge("stats", "report")
.add_edge("top_n", "report")
.build()
)

tasks = {
"raw_data": lambda: None,
"clean": lambda raw_data=None: (
[x for x in raw_data if x is not None] if raw_data else []
),
"stats": lambda clean=None: (
{"count": len(clean), "sum": sum(clean)} if clean else {}
),
"top_n": lambda clean=None: (
sorted(clean, reverse=True)[:3] if clean else []
),
"report": lambda stats=None, top_n=None: (
f"Count: {stats.get('count', 0)}, Top 3: {top_n}"
),
}

# Create the reactive DAG
reactive = ReactiveDAG(dag, tasks)

# Subscribe to the report node
def on_report(name, value):
print(f"Report updated: {value}")

reactive.subscribe("report", on_report)

# Initialize with initial data
reactive.set_input("raw_data", [10, 20, None, 30, 5, 15])
# Report updated: Count: 5, Top 3: [30, 20, 15]

# Update with new data -- only affected nodes recompute
changed = reactive.set_input("raw_data", [10, 20, 30, 5, 15, 25])
print(f"\nRecomputed: {list(changed.keys())}")
# Recomputed: ['raw_data', 'clean', 'stats', 'top_n', 'report']

# Check current values
print(f"\nCurrent stats: {reactive.get('stats')}")
print(f"Current top 3: {reactive.get('top_n')}")
print(f"Current report: {reactive.get('report')}")

# Set multiple inputs at once
changes = reactive.set_inputs({"raw_data": [100, 200, 300]})
print(f"\nBatch update changed {len(changes)} nodes")

See also

  • Execution -- standard batch execution with DAGExecutor.
  • Plugins -- hook-based lifecycle extensions.
  • Tracing -- recording events during reactive updates.