Skip to main content

Versioning

As pipelines evolve, you often need to answer questions like "What did this DAG look like last week?" or "What changed between version 12 and version 15?". dagron's VersionedDAG wraps a standard DAG with an append-only mutation log that records every structural change. You can time-travel to any historical version, diff any two versions, and fork independent branches from any point.

A VersionedDAG accumulates mutations. You can fork at any version to create an independent branch.


Core Concepts

ClassRole
VersionedDAGWraps a DAG with an append-only mutation log. Every mutation increments the version counter.
MutationA single recorded change: version number, mutation type, arguments, and timestamp.
MutationTypeEnum of mutation types: ADD_NODE, REMOVE_NODE, ADD_EDGE, REMOVE_EDGE, SET_PAYLOAD, SET_METADATA.

Creating a VersionedDAG

Starting Empty

from dagron.versioning import VersionedDAG

vdag = VersionedDAG()
print(vdag.version) # 0 -- no mutations yet

Wrapping an Existing DAG

import dagron

dag = (
dagron.DAG.builder()
.add_node("extract")
.add_node("transform")
.add_edge("extract", "transform")
.build()
)

vdag = VersionedDAG(dag)
print(vdag.version) # 0 -- initial state, no tracked mutations yet

Note that mutations made to the DAG before wrapping are not tracked. The version log starts from the moment you create the VersionedDAG.


Making Mutations

Every structural change is recorded and increments the version:

vdag = VersionedDAG()

vdag.add_node("extract") # version 1
vdag.add_node("transform") # version 2
vdag.add_node("load") # version 3
vdag.add_edge("extract", "transform") # version 4
vdag.add_edge("transform", "load") # version 5

print(vdag.version) # 5

All mutation methods mirror the standard DAG API:

vdag.add_node("name", payload=..., metadata=...)
vdag.remove_node("name")
vdag.add_edge("from", "to", weight=..., label=...)
vdag.remove_edge("from", "to")
vdag.set_payload("name", payload)
vdag.set_metadata("name", metadata)

Each call is recorded with its full arguments and a timestamp.


Accessing the Current DAG

The .dag property returns the underlying DAG for read-only access:

dag = vdag.dag
print(dag.node_count()) # 3
print(dag.edge_count()) # 2

for node in dag.topological_sort():
print(node.name)

You can pass vdag.dag to any executor or analysis function that expects a DAG.


Time-Travel with at_version()

Reconstruct the DAG as it was at any historical version:

vdag = VersionedDAG()
vdag.add_node("a") # v1
vdag.add_node("b") # v2
vdag.add_edge("a", "b") # v3
vdag.add_node("c") # v4
vdag.add_edge("b", "c") # v5

# Go back to version 2 (only nodes "a" and "b", no edges)
dag_v2 = vdag.at_version(2)
print(dag_v2.node_count()) # 2
print(dag_v2.edge_count()) # 0

# Version 0 is the empty DAG
dag_v0 = vdag.at_version(0)
print(dag_v0.node_count()) # 0

at_version() replays the mutation log up to the specified version, constructing a fresh DAG. The original VersionedDAG is not modified.

Version Bounds

try:
vdag.at_version(999)
except ValueError as e:
print(e)
# "Version 999 out of range [0, 5]."

Diffing Versions

Compare any two versions to see what changed:

diff = vdag.diff_versions(2, 5)

print(f"Added nodes: {diff.added_nodes}")
print(f"Removed nodes: {diff.removed_nodes}")
print(f"Added edges: {diff.added_edges}")
print(f"Removed edges: {diff.removed_edges}")

This uses the Rust-side DAG.diff() method, which produces a GraphDiff object with sets of added/removed nodes and edges.

Diffing v2 and v5 shows node 'c' and two edges were added.


Mutation History

Full History

for mutation in vdag.history():
print(
f" v{mutation.version}: {mutation.mutation_type.value} "
f"args={mutation.args} "
f"at={mutation.timestamp:.0f}"
)

Output:

  v1: add_node args={'name': 'a', 'payload': None, 'metadata': None} at=1709400000
v2: add_node args={'name': 'b', 'payload': None, 'metadata': None} at=1709400001
v3: add_edge args={'from_node': 'a', 'to_node': 'b', 'weight': None, 'label': None} at=1709400001
v4: add_node args={'name': 'c', 'payload': None, 'metadata': None} at=1709400002
v5: add_edge args={'from_node': 'b', 'to_node': 'c', 'weight': None, 'label': None} at=1709400002

History Since a Version

Get only the mutations after a specific version:

recent = vdag.history_since(3)
for mutation in recent:
print(f" v{mutation.version}: {mutation.mutation_type.value}")
# v4: add_node
# v5: add_edge

This is useful for incremental synchronization -- fetch only the mutations that happened since the last sync.


Mutation Dataclass

Each Mutation is a frozen dataclass:

FieldTypeDescription
versionintThe version number this mutation created (1-based).
mutation_typeMutationTypeOne of ADD_NODE, REMOVE_NODE, ADD_EDGE, REMOVE_EDGE, SET_PAYLOAD, SET_METADATA.
argsdict[str, Any]The arguments passed to the mutation method.
timestampfloatUnix timestamp when the mutation was recorded.

MutationType Enum

from dagron.versioning import MutationType

MutationType.ADD_NODE # "add_node"
MutationType.REMOVE_NODE # "remove_node"
MutationType.ADD_EDGE # "add_edge"
MutationType.REMOVE_EDGE # "remove_edge"
MutationType.SET_PAYLOAD # "set_payload"
MutationType.SET_METADATA # "set_metadata"

Forking

Create an independent copy of the VersionedDAG at any version. The fork has its own mutation log and does not affect the original:

vdag = VersionedDAG()
vdag.add_node("a") # v1
vdag.add_node("b") # v2
vdag.add_edge("a", "b") # v3
vdag.add_node("c") # v4

# Fork from version 3 (before "c" was added)
fork = vdag.fork(at_version=3)
print(fork.version) # 3
print(fork.dag.node_count()) # 2 (a, b)

# Mutate the fork independently
fork.add_node("d")
fork.add_edge("a", "d")
print(fork.version) # 5

# Original is unaffected
print(vdag.version) # 4
print(vdag.dag.node_count()) # 3 (a, b, c)

Fork at Current Version

Call fork() without arguments to fork at the current version:

fork = vdag.fork()
print(fork.version) # same as vdag.version

Use Cases

Pipeline Auditing

Record every change to a production pipeline and audit the history later:

vdag = VersionedDAG()

# Day 1: initial pipeline
vdag.add_node("ingest")
vdag.add_node("transform")
vdag.add_edge("ingest", "transform")

# Day 2: add a new output
vdag.add_node("export_csv")
vdag.add_edge("transform", "export_csv")

# Day 3: add monitoring
vdag.add_node("monitor")
vdag.add_edge("transform", "monitor")

# Audit: what was the pipeline on day 1?
dag_day1 = vdag.at_version(2)
print(f"Day 1 nodes: {[n.name for n in dag_day1.topological_sort()]}")

# What changed between day 1 and day 3?
diff = vdag.diff_versions(2, 5)
print(f"Added: {diff.added_nodes}")

A/B Testing Pipeline Variants

Fork a pipeline and try different approaches:

# Base pipeline
base = VersionedDAG()
base.add_node("data")
base.add_node("features")
base.add_edge("data", "features")

# Variant A: XGBoost
variant_a = base.fork()
variant_a.add_node("xgboost")
variant_a.add_edge("features", "xgboost")

# Variant B: Neural Network
variant_b = base.fork()
variant_b.add_node("neural_net")
variant_b.add_edge("features", "neural_net")

# Execute both variants
executor_a = dagron.DAGExecutor(variant_a.dag)
executor_b = dagron.DAGExecutor(variant_b.dag)

Rollback

If a mutation causes problems, reconstruct the previous version and create a new VersionedDAG from it:

# Something went wrong after version 10
good_dag = vdag.at_version(10)

# Start fresh from the known-good state
new_vdag = VersionedDAG(good_dag)
# Continue making mutations on the recovered DAG

Serialization Pattern

While VersionedDAG does not have built-in serialization, the mutation log is easy to serialize:

import json

# Serialize the mutation log
log_data = []
for m in vdag.history():
log_data.append({
"version": m.version,
"type": m.mutation_type.value,
"args": m.args,
"timestamp": m.timestamp,
})

with open("pipeline_history.json", "w") as f:
json.dump(log_data, f, indent=2, default=str)

# Deserialize and replay
from dagron.versioning import VersionedDAG, MutationType

vdag_restored = VersionedDAG()
with open("pipeline_history.json") as f:
log_data = json.load(f)

for entry in log_data:
mt = MutationType(entry["type"])
args = entry["args"]
if mt == MutationType.ADD_NODE:
vdag_restored.add_node(args["name"], payload=args.get("payload"), metadata=args.get("metadata"))
elif mt == MutationType.ADD_EDGE:
vdag_restored.add_edge(args["from_node"], args["to_node"], weight=args.get("weight"), label=args.get("label"))
elif mt == MutationType.REMOVE_NODE:
vdag_restored.remove_node(args["name"])
elif mt == MutationType.REMOVE_EDGE:
vdag_restored.remove_edge(args["from_node"], args["to_node"])
elif mt == MutationType.SET_PAYLOAD:
vdag_restored.set_payload(args["name"], args.get("payload"))
elif mt == MutationType.SET_METADATA:
vdag_restored.set_metadata(args["name"], args.get("metadata"))

Best Practices

  1. Use VersionedDAG during development. Wrap your DAG early so you have a full audit trail from the start.

  2. Serialize the mutation log to version control. Store pipeline_history.json alongside your code to track pipeline structure changes in git.

  3. Use diff_versions() in code review. Compare the pipeline before and after a change to verify that only the intended modifications were made.

  4. Fork for experiments. Instead of modifying the main pipeline, fork it, try your changes, and merge back only if they work.

  5. Use history_since() for incremental sync. If you are syncing pipeline state across services, send only the mutations since the last known version.