Serialization
dagron supports multiple serialization formats for persisting DAGs, sharing them across processes, embedding them in documentation, and visualizing them with external tools. This guide covers every format and shows how to handle custom payloads.
Format comparison
| Format | Round-trip? | Human-readable? | Best for |
|---|---|---|---|
| JSON | Yes | Yes | Config files, APIs, debugging |
| Binary | Yes | No | Performance-critical storage, IPC |
| DOT | No (export only) | Yes | Graphviz visualization |
| Mermaid | No (export only) | Yes | Documentation, Markdown |
| File (save/load) | Yes | No | Disk persistence with compression |
JSON serialization
Export to JSON
import dagron
dag = (
dagron.DAG.builder()
.add_edge("extract", "transform")
.add_edge("transform", "load")
.build()
)
json_str = dag.to_json()
print(json_str)
Output:
{
"nodes": ["extract", "transform", "load"],
"edges": [
["extract", "transform"],
["transform", "load"]
]
}
Import from JSON
restored = dagron.DAG.from_json(json_str)
print(restored.node_count()) # 3
print(restored.edge_count()) # 2
print(list(restored.nodes())) # ['extract', 'transform', 'load']
The round-trip preserves all structural information: nodes, edges, and their ordering.
JSON with payloads
When nodes carry payloads, they are included in the JSON output:
dag = dagron.DAG()
dag.add_node("train", payload={"epochs": 10, "lr": 0.001})
dag.add_node("evaluate", payload={"metrics": ["accuracy", "f1"]})
dag.add_edge("train", "evaluate")
json_str = dag.to_json()
print(json_str)
{
"nodes": [
{"name": "train", "payload": {"epochs": 10, "lr": 0.001}},
{"name": "evaluate", "payload": {"metrics": ["accuracy", "f1"]}}
],
"edges": [
["train", "evaluate"]
]
}
Storing JSON to a file
import json
# Write
with open("pipeline.json", "w") as f:
f.write(dag.to_json())
# Read
with open("pipeline.json", "r") as f:
dag = dagron.DAG.from_json(f.read())
Use case: sharing DAG definitions via APIs
from flask import Flask, jsonify, request
app = Flask(__name__)
@app.route("/pipeline", methods=["GET"])
def get_pipeline():
return dag.to_json(), 200, {"Content-Type": "application/json"}
@app.route("/pipeline", methods=["POST"])
def set_pipeline():
new_dag = dagron.DAG.from_json(request.data.decode())
# ... use new_dag ...
return jsonify({"nodes": new_dag.node_count()})
Binary serialization
Binary format uses an efficient Rust-native encoding that is significantly faster and more compact than JSON. Use it when performance matters.
Export to bytes
data = dag.to_bytes()
print(type(data)) # <class 'bytes'>
print(len(data)) # compact binary representation
Import from bytes
restored = dagron.DAG.from_bytes(data)
print(restored.node_count()) # same as original
Use case: Redis caching
import redis
r = redis.Redis()
# Store
r.set("pipeline:etl", dag.to_bytes())
# Retrieve
data = r.get("pipeline:etl")
if data:
dag = dagron.DAG.from_bytes(data)
Use case: inter-process communication
import multiprocessing as mp
def worker(dag_bytes):
dag = dagron.DAG.from_bytes(dag_bytes)
print(f"Worker received DAG with {dag.node_count()} nodes")
# Send the DAG to a subprocess
p = mp.Process(target=worker, args=(dag.to_bytes(),))
p.start()
p.join()
Performance comparison
Binary serialization is typically 5-10x faster than JSON and produces 3-5x smaller output, because it avoids string parsing and uses Rust's native serialization:
import time
# JSON
start = time.perf_counter()
for _ in range(10000):
dagron.DAG.from_json(dag.to_json())
json_time = time.perf_counter() - start
# Binary
start = time.perf_counter()
for _ in range(10000):
dagron.DAG.from_bytes(dag.to_bytes())
binary_time = time.perf_counter() - start
print(f"JSON: {json_time:.3f}s")
print(f"Binary: {binary_time:.3f}s")
print(f"Speedup: {json_time / binary_time:.1f}x")
File persistence (save / load)
The save() and load() methods write and read DAGs to/from disk files. They
use the binary format internally with optional compression.
Saving to disk
dag.save("pipeline.dagron")
Loading from disk
dag = dagron.DAG.load("pipeline.dagron")
print(dag.node_count())
Use case: checkpoint-style persistence
import os
PIPELINE_PATH = "/var/data/pipeline.dagron"
def get_or_create_pipeline():
if os.path.exists(PIPELINE_PATH):
return dagron.DAG.load(PIPELINE_PATH)
dag = (
dagron.DAG.builder()
.add_edge("extract", "transform")
.add_edge("transform", "load")
.build()
)
dag.save(PIPELINE_PATH)
return dag
DOT export (Graphviz)
The DOT language is the standard input format for Graphviz.
dot = dag.to_dot()
print(dot)
Output:
digraph {
"extract" -> "transform"
"transform" -> "load"
}
Rendering with Graphviz
import subprocess
dot = dag.to_dot()
with open("pipeline.dot", "w") as f:
f.write(dot)
subprocess.run(["dot", "-Tpng", "pipeline.dot", "-o", "pipeline.png"])
Rendering in a Jupyter notebook
from IPython.display import SVG, display
import subprocess
dot = dag.to_dot()
result = subprocess.run(
["dot", "-Tsvg"],
input=dot.encode(),
capture_output=True,
)
display(SVG(result.stdout))
Mermaid export
Mermaid is a Markdown-friendly diagramming language supported by GitHub, GitLab, Docusaurus, and many other platforms.
mermaid = dag.to_mermaid()
print(mermaid)
Output:
graph TD
extract --> transform
transform --> load
Embedding in Markdown
```mermaid
graph TD
extract --> transform
transform --> load
```
Use case: auto-generated documentation
def generate_pipeline_docs(dag, output_path):
"""Generate a Markdown file with an embedded DAG diagram."""
mermaid = dag.to_mermaid()
content = f"""# Pipeline Overview
## DAG Structure
```mermaid
{mermaid}
```
## Statistics
- Nodes: {dag.node_count()}
- Edges: {dag.edge_count()}
- Roots: {', '.join(dag.roots())}
- Leaves: {', '.join(dag.leaves())}
"""
with open(output_path, "w") as f:
f.write(content)
generate_pipeline_docs(dag, "pipeline.md")
The same DAG rendered as a DagDiagram component.
Custom payload serializers
By default, dagron serializes payloads using Python's standard JSON encoder,
which handles dict, list, str, int, float, bool, and None. For
custom objects, you need to provide serializer functions.
Example: serializing dataclass payloads
import dagron
import json
from dataclasses import dataclass, asdict
@dataclass
class TaskConfig:
retries: int
timeout_seconds: float
tags: list
# Build a DAG with dataclass payloads
dag = dagron.DAG()
dag.add_node("fetch", payload=TaskConfig(retries=3, timeout_seconds=30.0, tags=["io"]))
dag.add_node("process", payload=TaskConfig(retries=1, timeout_seconds=120.0, tags=["cpu"]))
dag.add_edge("fetch", "process")
# Custom encoder
class ConfigEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, TaskConfig):
return {"__type__": "TaskConfig", **asdict(obj)}
return super().default(obj)
# Custom decoder
def config_decoder(dct):
if dct.get("__type__") == "TaskConfig":
return TaskConfig(
retries=dct["retries"],
timeout_seconds=dct["timeout_seconds"],
tags=dct["tags"],
)
return dct
# Serialize with custom encoder
json_str = dag.to_json(cls=ConfigEncoder)
print(json_str)
# Deserialize with custom decoder
restored = dagron.DAG.from_json(json_str, object_hook=config_decoder)
Example: binary serialization with pickle payloads
For the binary format, payloads are serialized using pickle by default, so custom objects work out of the box as long as they are picklable:
import dagron
import numpy as np
dag = dagron.DAG()
dag.add_node("matrix", payload=np.array([[1, 2], [3, 4]]))
dag.add_node("result")
dag.add_edge("matrix", "result")
# Binary round-trip preserves numpy arrays
data = dag.to_bytes()
restored = dagron.DAG.from_bytes(data)
Combining serialization with snapshots
Snapshots and serialization work well together for versioning:
import dagron
from datetime import datetime
dag = (
dagron.DAG.builder()
.add_edge("a", "b")
.add_edge("b", "c")
.build()
)
# Save version 1
dag.save(f"pipeline_v1.dagron")
# Make changes
dag.add_node("d")
dag.add_edge("c", "d")
# Save version 2
dag.save(f"pipeline_v2.dagron")
# Compare versions
v1 = dagron.DAG.load("pipeline_v1.dagron")
v2 = dagron.DAG.load("pipeline_v2.dagron")
print(f"v1: {v1.node_count()} nodes, {v1.edge_count()} edges")
print(f"v2: {v2.node_count()} nodes, {v2.edge_count()} edges")
Format selection guide
Use this decision tree to pick the right format:
-
Need to read/write from Python? Use
save()/load()for files, orto_bytes()/from_bytes()for in-memory. -
Need human-readable config? Use
to_json()/from_json(). -
Need to visualize with Graphviz? Use
to_dot(). -
Need to embed in Markdown/docs? Use
to_mermaid(). -
Need maximum performance? Use
to_bytes()/from_bytes().
API reference
| Method | Docs |
|---|---|
dag.to_json() | DAG |
DAG.from_json() | DAG |
dag.to_bytes() | DAG |
DAG.from_bytes() | DAG |
dag.save() | DAG |
DAG.load() | DAG |
dag.to_dot() | DAG |
dag.to_mermaid() | DAG |
Next steps
- Incremental Execution — use save/load for caching intermediate state.
- Checkpointing — persist execution progress to disk.
- Graph Transforms — create snapshots before applying transforms.