Versioning
The versioning module provides structural versioning and time-travel for DAGs. Every mutation (add/remove node, add/remove edge, set payload/metadata) is recorded in an append-only log. You can navigate to any historical version, diff between arbitrary versions, inspect the full mutation history, and fork independent copies from any point.
from dagron.versioning import VersionedDAG, Mutation, MutationType
VersionedDAG
class VersionedDAG:
def __init__(
self,
dag: DAG | None = None,
) -> None: ...
DAG with full structural versioning and time-travel. Every mutation is recorded in an append-only log with timestamps. The version counter starts at 0 (empty state) and increments by 1 for each mutation.
| Parameter | Type | Default | Description |
|---|---|---|---|
dag | DAG | None | None | Optional existing DAG to wrap. If None, starts with an empty DAG. The DAG is used as the current state -- its prior history is not tracked. |
from dagron.versioning import VersionedDAG
vdag = VersionedDAG()
vdag.add_node("extract")
vdag.add_node("transform")
vdag.add_edge("extract", "transform")
print(vdag.version) # 3
print(vdag.dag.node_count()) # 2
Properties
VersionedDAG.dag
@property
def dag(self) -> DAG
The current DAG object. Read-only access is recommended -- use the
VersionedDAG mutation methods instead to ensure changes are tracked.
Returns: DAG -- The underlying DAG in its current state.
VersionedDAG.version
@property
def version(self) -> int
Current version number. Starts at 0 and increments by 1 for each mutation.
Returns: int -- The current version.
Mutation methods
These methods mirror the DAG API but record each operation in the
version log.
VersionedDAG.add_node
def add_node(
self,
name: str,
payload: Any = None,
metadata: Any = None,
) -> None
Add a node and record the mutation.
| Parameter | Type | Default | Description |
|---|---|---|---|
name | str | required | The node name. |
payload | Any | None | Optional payload to attach to the node. |
metadata | Any | None | Optional metadata to attach to the node. |
VersionedDAG.remove_node
def remove_node(self, name: str) -> None
Remove a node and all its edges, and record the mutation.
| Parameter | Type | Default | Description |
|---|---|---|---|
name | str | required | The node name to remove. |
VersionedDAG.add_edge
def add_edge(
self,
from_node: str,
to_node: str,
weight: float | None = None,
label: str | None = None,
) -> None
Add an edge and record the mutation.
| Parameter | Type | Default | Description |
|---|---|---|---|
from_node | str | required | Source node name. |
to_node | str | required | Target node name. |
weight | float | None | None | Optional edge weight. |
label | str | None | None | Optional edge label. |
Raises: CycleError -- If adding the edge would create a cycle.
VersionedDAG.remove_edge
def remove_edge(self, from_node: str, to_node: str) -> None
Remove an edge and record the mutation.
| Parameter | Type | Default | Description |
|---|---|---|---|
from_node | str | required | Source node name. |
to_node | str | required | Target node name. |
VersionedDAG.set_payload
def set_payload(self, name: str, payload: Any) -> None
Set a node's payload and record the mutation.
| Parameter | Type | Default | Description |
|---|---|---|---|
name | str | required | The node name. |
payload | Any | required | The new payload value. |
VersionedDAG.set_metadata
def set_metadata(self, name: str, metadata: Any) -> None
Set a node's metadata and record the mutation.
| Parameter | Type | Default | Description |
|---|---|---|---|
name | str | required | The node name. |
metadata | Any | required | The new metadata value. |
Time-travel methods
VersionedDAG.at_version
def at_version(self, version: int) -> DAG
Reconstruct the DAG at a specific version by replaying mutations from version 0 up to the specified version.
| Parameter | Type | Default | Description |
|---|---|---|---|
version | int | required | The version number to reconstruct (0-based). Version 0 is the empty state. |
Returns: DAG -- A new DAG representing the state at that version.
Raises: ValueError -- If version is negative or exceeds the current version.
# Build up history
vdag = VersionedDAG()
vdag.add_node("a") # version 1
vdag.add_node("b") # version 2
vdag.add_edge("a", "b") # version 3
vdag.add_node("c") # version 4
# Time-travel to version 2
old_dag = vdag.at_version(2)
print(old_dag.node_count()) # 2 (only "a" and "b")
print(old_dag.edge_count()) # 0 (edge was added in version 3)
VersionedDAG.diff_versions
def diff_versions(
self,
version_a: int,
version_b: int,
) -> GraphDiff
Diff two versions of the DAG. Reconstructs both versions and computes the structural difference.
| Parameter | Type | Default | Description |
|---|---|---|---|
version_a | int | required | First version number. |
version_b | int | required | Second version number. |
Returns: GraphDiff -- Structural differences between the two versions, including added/removed nodes and edges.
diff = vdag.diff_versions(1, 4)
print(diff) # Shows nodes/edges added between version 1 and version 4
VersionedDAG.history
def history(self) -> list[Mutation]
Get the full mutation history as an ordered list.
Returns: list[Mutation] -- All recorded mutations in chronological order.
for mutation in vdag.history():
print(f"v{mutation.version}: {mutation.mutation_type.value} {mutation.args}")
# v1: add_node {'name': 'a'}
# v2: add_node {'name': 'b'}
# v3: add_edge {'from_node': 'a', 'to_node': 'b'}
VersionedDAG.history_since
def history_since(self, version: int) -> list[Mutation]
Get mutations since a specific version (exclusive).
| Parameter | Type | Default | Description |
|---|---|---|---|
version | int | required | Starting version (exclusive). Returns mutations with version > this value. |
Returns: list[Mutation] -- Mutations after the given version.
recent = vdag.history_since(2)
# Returns mutations at version 3 and 4
VersionedDAG.fork
def fork(
self,
at_version: int | None = None,
) -> VersionedDAG
Create an independent fork of this versioned DAG. The fork gets its own copy of the DAG and history up to the fork point. Subsequent mutations in either the original or the fork do not affect the other.
| Parameter | Type | Default | Description |
|---|---|---|---|
at_version | int | None | None | Version to fork from. None means fork from the current version. |
Returns: VersionedDAG -- A new independent VersionedDAG forked from the specified version.
# Fork from the current state
fork = vdag.fork()
fork.add_node("d")
print(vdag.dag.node_count()) # 3 (original unchanged)
print(fork.dag.node_count()) # 4
# Fork from a specific historical version
old_fork = vdag.fork(at_version=2)
print(old_fork.version) # 2
print(old_fork.dag.node_count()) # 2
Mutation
@dataclass(frozen=True)
class Mutation:
version: int
mutation_type: MutationType
args: dict[str, Any]
timestamp: float
A single recorded mutation in the version log. Frozen dataclass.
| Parameter | Type | Default | Description |
|---|---|---|---|
version | int | required | The version number this mutation created. |
mutation_type | MutationType | required | The type of mutation. |
args | dict[str, Any] | required | Arguments passed to the mutation (e.g., node name, payload). |
timestamp | float | required | Unix timestamp (from time.time()) when the mutation was recorded. |
MutationType
class MutationType(Enum):
ADD_NODE = "add_node"
REMOVE_NODE = "remove_node"
ADD_EDGE = "add_edge"
REMOVE_EDGE = "remove_edge"
SET_PAYLOAD = "set_payload"
SET_METADATA = "set_metadata"
Enumeration of all mutation types that can be recorded.
| Value | Description |
|---|---|
ADD_NODE | A node was added to the DAG. |
REMOVE_NODE | A node (and its edges) was removed. |
ADD_EDGE | An edge was added between two nodes. |
REMOVE_EDGE | An edge was removed. |
SET_PAYLOAD | A node's payload was updated. |
SET_METADATA | A node's metadata was updated. |
Complete example
from dagron.versioning import VersionedDAG, MutationType
# Create a versioned DAG and build it incrementally
vdag = VersionedDAG()
# Phase 1: Basic pipeline
vdag.add_node("extract", payload={"source": "api"}) # v1
vdag.add_node("transform") # v2
vdag.add_edge("extract", "transform") # v3
vdag.add_node("load") # v4
vdag.add_edge("transform", "load") # v5
print(f"Phase 1 complete: v{vdag.version}") # v5
# Phase 2: Add validation step
vdag.add_node("validate") # v6
vdag.add_edge("transform", "validate") # v7
vdag.add_edge("validate", "load") # v8
vdag.remove_edge("transform", "load") # v9
print(f"Phase 2 complete: v{vdag.version}") # v9
# Time-travel: see the DAG before validation was added
phase1_dag = vdag.at_version(5)
print(f"Phase 1: {phase1_dag.node_count()} nodes, {phase1_dag.edge_count()} edges")
# Phase 1: 3 nodes, 2 edges
# Diff between phases
diff = vdag.diff_versions(5, 9)
print(diff)
# Inspect mutation log
print("\nMutation history:")
for m in vdag.history():
print(f" v{m.version}: {m.mutation_type.value} {m.args}")
# Fork for experimentation
experiment = vdag.fork()
experiment.add_node("cache")
experiment.add_edge("extract", "cache")
experiment.add_edge("cache", "transform")
print(f"\nOriginal: {vdag.dag.node_count()} nodes") # 4
print(f"Experiment: {experiment.dag.node_count()} nodes") # 5
# Fork from an earlier version
legacy = vdag.fork(at_version=5)
print(f"Legacy fork: v{legacy.version}, {legacy.dag.node_count()} nodes")
# Legacy fork: v5, 3 nodes
# Get recent changes
recent = vdag.history_since(5)
print(f"\nChanges since v5: {len(recent)} mutations")
for m in recent:
print(f" {m.mutation_type.value}: {m.args}")
See also
- DAG -- the core graph class that
VersionedDAGwraps. - Composition -- combining versioned DAGs.
- Building DAGs guide -- construction patterns and best practices.