Reactive DAG
The reactive module extends dagron's execution model into a push-based
reactive system. When you set an input value, the ReactiveDAG
automatically cascades recomputation through the graph, only recomputing
nodes whose inputs have actually changed (early cutoff). Subscriber
callbacks are fired whenever a node's value changes, enabling live
dashboards, incremental pipelines, and interactive data exploration.
from dagron.execution.reactive import ReactiveDAG
ReactiveDAG
class ReactiveDAG:
def __init__(
self,
dag: DAG,
tasks: dict[str, Callable[..., Any]],
) -> None: ...
Push-based reactive DAG execution system. Setting an input value automatically cascades recomputation through the graph and fires subscriber callbacks when outputs change.
Each task function receives keyword arguments named after its predecessor
nodes, with the current values of those predecessors. For example, a node
"transform" with predecessor "extract" receives
transform(extract=<extract_value>).
| Parameter | Type | Default | Description |
|---|---|---|---|
dag | DAG | required | The DAG defining the dependency structure. |
tasks | dict[str, Callable[..., Any]] | required | Mapping of node names to callables. Each callable receives keyword arguments named after its predecessor nodes. |
import dagron
from dagron.execution.reactive import ReactiveDAG
dag = (
dagron.DAG.builder()
.add_edge("raw", "cleaned")
.add_edge("cleaned", "features")
.add_edge("features", "prediction")
.build()
)
tasks = {
"raw": lambda: None, # Input node -- value set externally
"cleaned": lambda raw=None: [x.strip() for x in raw] if raw else [],
"features": lambda cleaned=None: len(cleaned) if cleaned else 0,
"prediction": lambda features=None: features > 5 if features else False,
}
reactive = ReactiveDAG(dag, tasks)
Properties
ReactiveDAG.dag
@property
def dag(self) -> DAG
The underlying DAG.
Returns: DAG -- The DAG defining the dependency structure.
ReactiveDAG.values
@property
def values(self) -> dict[str, Any]
Current values of all computed nodes. Returns a read-only copy.
Returns: dict[str, Any] -- Mapping of node names to their current values.
all_values = reactive.values
for name, value in all_values.items():
print(f"{name}: {value}")
Methods
ReactiveDAG.initialize
def initialize(self) -> dict[str, Any]
Compute all nodes in topological order. This performs the initial full
computation. After this, use set_input() for incremental updates.
Any values pre-set via set_input() before initialization are preserved
and used during the computation.
Returns: dict[str, Any] -- Dictionary of all computed values.
reactive = ReactiveDAG(dag, tasks)
values = reactive.initialize()
print(f"Initial prediction: {values.get('prediction')}")
ReactiveDAG.set_input
def set_input(
self,
node_name: str,
value: Any,
) -> dict[str, Any]
Set an input value and cascade recomputation through the graph. Only nodes that are transitively downstream of the changed input are considered for recomputation, and the early cutoff optimization skips nodes whose computed value has not actually changed.
If initialize() has not been called yet, this method stores the value
and then calls initialize() automatically.
| Parameter | Type | Default | Description |
|---|---|---|---|
node_name | str | required | Name of the input node to set. |
value | Any | required | New value for the node. |
Returns: dict[str, Any] -- Dictionary of all nodes that were recomputed, mapping node name to new value. Includes the input node itself.
# Set a new input and see what changed
changed = reactive.set_input("raw", [" Alice ", " Bob ", " Charlie "])
print(f"Changed nodes: {list(changed.keys())}")
# Changed nodes: ['raw', 'cleaned', 'features', 'prediction']
# Set the same value again -- early cutoff prevents recomputation
changed = reactive.set_input("raw", [" Alice ", " Bob ", " Charlie "])
print(f"Changed nodes: {list(changed.keys())}")
# Changed nodes: {} (nothing changed)
ReactiveDAG.set_inputs
def set_inputs(
self,
values: dict[str, Any],
) -> dict[str, Any]
Set multiple input values and cascade recomputation. More efficient than
calling set_input() multiple times because it computes the combined dirty
set and processes all changes in a single topological pass.
| Parameter | Type | Default | Description |
|---|---|---|---|
values | dict[str, Any] | required | Mapping of node names to new values. |
Returns: dict[str, Any] -- Dictionary of all nodes that were recomputed.
changed = reactive.set_inputs({
"raw": [" Alice ", " Bob "],
})
ReactiveDAG.subscribe
def subscribe(
self,
node_name: str,
callback: Callable[[str, Any], None],
) -> Callable[[], None]
Subscribe to changes on a specific node. The callback is called with
(node_name, new_value) whenever the node's value changes during
set_input() or initialize().
| Parameter | Type | Default | Description |
|---|---|---|---|
node_name | str | required | Name of the node to watch. |
callback | Callable[[str, Any], None] | required | Function called with (node_name, new_value) on each change. |
Returns: Callable[[], None] -- An unsubscribe function. Call it to remove the subscription.
def on_prediction_change(name: str, value):
print(f"Prediction updated: {value}")
unsubscribe = reactive.subscribe("prediction", on_prediction_change)
reactive.set_input("raw", ["a", "b", "c", "d", "e", "f"])
# Prints: Prediction updated: True
# Stop listening
unsubscribe()
ReactiveDAG.subscribe_all
def subscribe_all(
self,
callback: Callable[[str, Any], None],
) -> Callable[[], None]
Subscribe to changes on any node in the DAG. The callback is called with
(node_name, new_value) whenever any node's value changes.
| Parameter | Type | Default | Description |
|---|---|---|---|
callback | Callable[[str, Any], None] | required | Function called with (node_name, new_value) on each change to any node. |
Returns: Callable[[], None] -- An unsubscribe function.
changes_log = []
def log_all(name: str, value):
changes_log.append((name, value))
unsub = reactive.subscribe_all(log_all)
reactive.set_input("raw", ["x", "y"])
print(f"Total changes: {len(changes_log)}")
for name, val in changes_log:
print(f" {name} = {val}")
ReactiveDAG.get
def get(self, node_name: str) -> Any
Get the current value of a node.
| Parameter | Type | Default | Description |
|---|---|---|---|
node_name | str | required | Name of the node. |
Returns: Any -- The node's current value, or None if not yet computed.
prediction = reactive.get("prediction")
features = reactive.get("features")
print(f"Features: {features}, Prediction: {prediction}")
Early cutoff
The reactive system implements early cutoff optimization. When a node is
recomputed and its new value equals the old value (via ==), downstream
nodes are not recomputed. This prevents unnecessary cascading through the
graph.
import dagron
from dagron.execution.reactive import ReactiveDAG
dag = (
dagron.DAG.builder()
.add_edge("input", "round")
.add_edge("round", "format")
.build()
)
tasks = {
"input": lambda: None,
"round": lambda input=None: round(input, 2) if input else 0,
"format": lambda round=None: f"Value: {round}",
}
reactive = ReactiveDAG(dag, tasks)
reactive.initialize()
# Setting 3.14159 -- round(3.14159, 2) = 3.14
changed = reactive.set_input("input", 3.14159)
print(f"Changed: {list(changed.keys())}")
# Changed: ['input', 'round', 'format']
# Setting 3.14001 -- round(3.14001, 2) = 3.14 (same!)
changed = reactive.set_input("input", 3.14001)
print(f"Changed: {list(changed.keys())}")
# Changed: ['input', 'round']
# 'format' was NOT recomputed because 'round' value didn't change
Complete example
import dagron
from dagron.execution.reactive import ReactiveDAG
# Build a reactive data processing pipeline
dag = (
dagron.DAG.builder()
.add_edge("raw_data", "clean")
.add_edge("clean", "stats")
.add_edge("clean", "top_n")
.add_edge("stats", "report")
.add_edge("top_n", "report")
.build()
)
tasks = {
"raw_data": lambda: None,
"clean": lambda raw_data=None: (
[x for x in raw_data if x is not None] if raw_data else []
),
"stats": lambda clean=None: (
{"count": len(clean), "sum": sum(clean)} if clean else {}
),
"top_n": lambda clean=None: (
sorted(clean, reverse=True)[:3] if clean else []
),
"report": lambda stats=None, top_n=None: (
f"Count: {stats.get('count', 0)}, Top 3: {top_n}"
),
}
# Create the reactive DAG
reactive = ReactiveDAG(dag, tasks)
# Subscribe to the report node
def on_report(name, value):
print(f"Report updated: {value}")
reactive.subscribe("report", on_report)
# Initialize with initial data
reactive.set_input("raw_data", [10, 20, None, 30, 5, 15])
# Report updated: Count: 5, Top 3: [30, 20, 15]
# Update with new data -- only affected nodes recompute
changed = reactive.set_input("raw_data", [10, 20, 30, 5, 15, 25])
print(f"\nRecomputed: {list(changed.keys())}")
# Recomputed: ['raw_data', 'clean', 'stats', 'top_n', 'report']
# Check current values
print(f"\nCurrent stats: {reactive.get('stats')}")
print(f"Current top 3: {reactive.get('top_n')}")
print(f"Current report: {reactive.get('report')}")
# Set multiple inputs at once
changes = reactive.set_inputs({"raw_data": [100, 200, 300]})
print(f"\nBatch update changed {len(changes)} nodes")