Skip to main content

Versioning

The versioning module provides structural versioning and time-travel for DAGs. Every mutation (add/remove node, add/remove edge, set payload/metadata) is recorded in an append-only log. You can navigate to any historical version, diff between arbitrary versions, inspect the full mutation history, and fork independent copies from any point.

from dagron.versioning import VersionedDAG, Mutation, MutationType

VersionedDAG

VersionedDAG
class VersionedDAG:
def __init__(
self,
dag: DAG | None = None,
) -> None: ...

DAG with full structural versioning and time-travel. Every mutation is recorded in an append-only log with timestamps. The version counter starts at 0 (empty state) and increments by 1 for each mutation.

ParameterTypeDefaultDescription
dagDAG | NoneNoneOptional existing DAG to wrap. If None, starts with an empty DAG. The DAG is used as the current state -- its prior history is not tracked.
from dagron.versioning import VersionedDAG

vdag = VersionedDAG()
vdag.add_node("extract")
vdag.add_node("transform")
vdag.add_edge("extract", "transform")

print(vdag.version) # 3
print(vdag.dag.node_count()) # 2

Properties


VersionedDAG.dag

VersionedDAG.dag
@property
def dag(self) -> DAG

The current DAG object. Read-only access is recommended -- use the VersionedDAG mutation methods instead to ensure changes are tracked.

Returns: DAG -- The underlying DAG in its current state.


VersionedDAG.version

VersionedDAG.version
@property
def version(self) -> int

Current version number. Starts at 0 and increments by 1 for each mutation.

Returns: int -- The current version.

Mutation methods

These methods mirror the DAG API but record each operation in the version log.


VersionedDAG.add_node

VersionedDAG.add_node
def add_node(
self,
name: str,
payload: Any = None,
metadata: Any = None,
) -> None

Add a node and record the mutation.

ParameterTypeDefaultDescription
namestrrequiredThe node name.
payloadAnyNoneOptional payload to attach to the node.
metadataAnyNoneOptional metadata to attach to the node.

VersionedDAG.remove_node

VersionedDAG.remove_node
def remove_node(self, name: str) -> None

Remove a node and all its edges, and record the mutation.

ParameterTypeDefaultDescription
namestrrequiredThe node name to remove.

VersionedDAG.add_edge

VersionedDAG.add_edge
def add_edge(
self,
from_node: str,
to_node: str,
weight: float | None = None,
label: str | None = None,
) -> None

Add an edge and record the mutation.

ParameterTypeDefaultDescription
from_nodestrrequiredSource node name.
to_nodestrrequiredTarget node name.
weightfloat | NoneNoneOptional edge weight.
labelstr | NoneNoneOptional edge label.

Raises: CycleError -- If adding the edge would create a cycle.


VersionedDAG.remove_edge

VersionedDAG.remove_edge
def remove_edge(self, from_node: str, to_node: str) -> None

Remove an edge and record the mutation.

ParameterTypeDefaultDescription
from_nodestrrequiredSource node name.
to_nodestrrequiredTarget node name.

VersionedDAG.set_payload

VersionedDAG.set_payload
def set_payload(self, name: str, payload: Any) -> None

Set a node's payload and record the mutation.

ParameterTypeDefaultDescription
namestrrequiredThe node name.
payloadAnyrequiredThe new payload value.

VersionedDAG.set_metadata

VersionedDAG.set_metadata
def set_metadata(self, name: str, metadata: Any) -> None

Set a node's metadata and record the mutation.

ParameterTypeDefaultDescription
namestrrequiredThe node name.
metadataAnyrequiredThe new metadata value.

Time-travel methods


VersionedDAG.at_version

VersionedDAG.at_version
def at_version(self, version: int) -> DAG

Reconstruct the DAG at a specific version by replaying mutations from version 0 up to the specified version.

ParameterTypeDefaultDescription
versionintrequiredThe version number to reconstruct (0-based). Version 0 is the empty state.

Returns: DAG -- A new DAG representing the state at that version.

Raises: ValueError -- If version is negative or exceeds the current version.

# Build up history
vdag = VersionedDAG()
vdag.add_node("a") # version 1
vdag.add_node("b") # version 2
vdag.add_edge("a", "b") # version 3
vdag.add_node("c") # version 4

# Time-travel to version 2
old_dag = vdag.at_version(2)
print(old_dag.node_count()) # 2 (only "a" and "b")
print(old_dag.edge_count()) # 0 (edge was added in version 3)

VersionedDAG.diff_versions

VersionedDAG.diff_versions
def diff_versions(
self,
version_a: int,
version_b: int,
) -> GraphDiff

Diff two versions of the DAG. Reconstructs both versions and computes the structural difference.

ParameterTypeDefaultDescription
version_aintrequiredFirst version number.
version_bintrequiredSecond version number.

Returns: GraphDiff -- Structural differences between the two versions, including added/removed nodes and edges.

diff = vdag.diff_versions(1, 4)
print(diff) # Shows nodes/edges added between version 1 and version 4

VersionedDAG.history

VersionedDAG.history
def history(self) -> list[Mutation]

Get the full mutation history as an ordered list.

Returns: list[Mutation] -- All recorded mutations in chronological order.

for mutation in vdag.history():
print(f"v{mutation.version}: {mutation.mutation_type.value} {mutation.args}")
# v1: add_node {'name': 'a'}
# v2: add_node {'name': 'b'}
# v3: add_edge {'from_node': 'a', 'to_node': 'b'}

VersionedDAG.history_since

VersionedDAG.history_since
def history_since(self, version: int) -> list[Mutation]

Get mutations since a specific version (exclusive).

ParameterTypeDefaultDescription
versionintrequiredStarting version (exclusive). Returns mutations with version > this value.

Returns: list[Mutation] -- Mutations after the given version.

recent = vdag.history_since(2)
# Returns mutations at version 3 and 4

VersionedDAG.fork

VersionedDAG.fork
def fork(
self,
at_version: int | None = None,
) -> VersionedDAG

Create an independent fork of this versioned DAG. The fork gets its own copy of the DAG and history up to the fork point. Subsequent mutations in either the original or the fork do not affect the other.

ParameterTypeDefaultDescription
at_versionint | NoneNoneVersion to fork from. None means fork from the current version.

Returns: VersionedDAG -- A new independent VersionedDAG forked from the specified version.

# Fork from the current state
fork = vdag.fork()
fork.add_node("d")

print(vdag.dag.node_count()) # 3 (original unchanged)
print(fork.dag.node_count()) # 4

# Fork from a specific historical version
old_fork = vdag.fork(at_version=2)
print(old_fork.version) # 2
print(old_fork.dag.node_count()) # 2

Mutation

Mutation
@dataclass(frozen=True)
class Mutation:
version: int
mutation_type: MutationType
args: dict[str, Any]
timestamp: float

A single recorded mutation in the version log. Frozen dataclass.

ParameterTypeDefaultDescription
versionintrequiredThe version number this mutation created.
mutation_typeMutationTyperequiredThe type of mutation.
argsdict[str, Any]requiredArguments passed to the mutation (e.g., node name, payload).
timestampfloatrequiredUnix timestamp (from time.time()) when the mutation was recorded.

MutationType

MutationType
class MutationType(Enum):
ADD_NODE = "add_node"
REMOVE_NODE = "remove_node"
ADD_EDGE = "add_edge"
REMOVE_EDGE = "remove_edge"
SET_PAYLOAD = "set_payload"
SET_METADATA = "set_metadata"

Enumeration of all mutation types that can be recorded.

ValueDescription
ADD_NODEA node was added to the DAG.
REMOVE_NODEA node (and its edges) was removed.
ADD_EDGEAn edge was added between two nodes.
REMOVE_EDGEAn edge was removed.
SET_PAYLOADA node's payload was updated.
SET_METADATAA node's metadata was updated.

Complete example

from dagron.versioning import VersionedDAG, MutationType

# Create a versioned DAG and build it incrementally
vdag = VersionedDAG()

# Phase 1: Basic pipeline
vdag.add_node("extract", payload={"source": "api"}) # v1
vdag.add_node("transform") # v2
vdag.add_edge("extract", "transform") # v3
vdag.add_node("load") # v4
vdag.add_edge("transform", "load") # v5
print(f"Phase 1 complete: v{vdag.version}") # v5

# Phase 2: Add validation step
vdag.add_node("validate") # v6
vdag.add_edge("transform", "validate") # v7
vdag.add_edge("validate", "load") # v8
vdag.remove_edge("transform", "load") # v9
print(f"Phase 2 complete: v{vdag.version}") # v9

# Time-travel: see the DAG before validation was added
phase1_dag = vdag.at_version(5)
print(f"Phase 1: {phase1_dag.node_count()} nodes, {phase1_dag.edge_count()} edges")
# Phase 1: 3 nodes, 2 edges

# Diff between phases
diff = vdag.diff_versions(5, 9)
print(diff)

# Inspect mutation log
print("\nMutation history:")
for m in vdag.history():
print(f" v{m.version}: {m.mutation_type.value} {m.args}")

# Fork for experimentation
experiment = vdag.fork()
experiment.add_node("cache")
experiment.add_edge("extract", "cache")
experiment.add_edge("cache", "transform")

print(f"\nOriginal: {vdag.dag.node_count()} nodes") # 4
print(f"Experiment: {experiment.dag.node_count()} nodes") # 5

# Fork from an earlier version
legacy = vdag.fork(at_version=5)
print(f"Legacy fork: v{legacy.version}, {legacy.dag.node_count()} nodes")
# Legacy fork: v5, 3 nodes

# Get recent changes
recent = vdag.history_since(5)
print(f"\nChanges since v5: {len(recent)} mutations")
for m in recent:
print(f" {m.mutation_type.value}: {m.args}")

See also