Caching
The caching module provides content-addressable, Merkle-tree-based caching for DAG execution. A node's cache key is derived from its task code, its predecessors' output hashes, and its name — so if nothing upstream has changed, the cached result is returned without re-execution.
See the Caching guide for configuration patterns, backend selection, and cache invalidation strategies.
CachedDAGExecutor
class CachedDAGExecutor(
dag: DAG,
cache: ContentAddressableCache,
callbacks: ExecutionCallbacks | None = None,
fail_fast: bool = True,
enable_tracing: bool = False,
)
An executor that wraps the standard DAGExecutor with a
content-addressable cache layer. Before executing a node, the executor checks
the cache; if a valid entry exists, the cached result is returned and the node
is marked CACHE_HIT.
| Parameter | Type | Default | Description |
|---|---|---|---|
dag | DAG | required | The DAG whose topology drives execution order. |
cache | ContentAddressableCache | required | The cache instance to use for storing and retrieving results. |
callbacks | ExecutionCallbacks | None | None | Optional lifecycle callbacks. |
fail_fast | bool | True | If True, skip downstream nodes when any node fails. |
enable_tracing | bool | False | If True, record a Chrome-compatible execution trace. |
execute
def execute(
tasks: dict[str, Callable],
) -> CachedExecutionResult
Execute tasks with caching. Returns a CachedExecutionResult with cache statistics.
| Parameter | Type | Default | Description |
|---|---|---|---|
tasks | dict[str, Callable] | required | Map of node names to callable tasks. |
import dagron
dag = (
dagron.DAG.builder()
.add_node("fetch").add_node("process").add_node("save")
.add_edge("fetch", "process").add_edge("process", "save")
.build()
)
cache = dagron.ContentAddressableCache(
backend=dagron.FileSystemCacheBackend("./cache_dir")
)
executor = dagron.CachedDAGExecutor(dag, cache=cache)
# First run — all nodes executed
result = executor.execute({
"fetch": lambda: [1, 2, 3],
"process": lambda: [2, 4, 6],
"save": lambda: "done",
})
print(result.cache_hits, result.cache_misses) # 0, 3
# Second run — all nodes cached
result = executor.execute({
"fetch": lambda: [1, 2, 3],
"process": lambda: [2, 4, 6],
"save": lambda: "done",
})
print(result.cache_hits, result.cache_misses) # 3, 0
CachedExecutionResult
class CachedExecutionResult(
execution_result: ExecutionResult,
cache_hits: int,
cache_misses: int,
nodes_executed: int,
nodes_cached: int,
)
Extends the standard ExecutionResult with cache statistics.
| Property | Type | Description |
|---|---|---|
execution_result | ExecutionResult | The underlying execution result with per-node details. |
cache_hits | int | Number of nodes whose results were loaded from cache. |
cache_misses | int | Number of nodes that were executed (cache miss). |
nodes_executed | int | Number of nodes that were actually executed. |
nodes_cached | int | Number of nodes that returned cached results. |
print(f"Hit rate: {result.cache_hits}/{result.cache_hits + result.cache_misses}")
print(f"Nodes executed: {result.nodes_executed}")
print(f"Nodes cached: {result.nodes_cached}")
ContentAddressableCache
class ContentAddressableCache(
backend: CacheBackend,
)
A Merkle-tree cache that computes content-addressable keys from task code and predecessor output hashes. This ensures that a cache entry is only valid when the exact same computation with the exact same inputs was previously executed.
| Parameter | Type | Default | Description |
|---|---|---|---|
backend | CacheBackend | required | The storage backend (e.g., FileSystemCacheBackend). |
Methods
def compute_key(
name: str,
task: Callable,
predecessor_hashes: dict[str, str],
) -> str
Compute the content-addressable cache key for a node.
| Parameter | Type | Default | Description |
|---|---|---|---|
name | str | required | The node name. |
task | Callable | required | The task callable. |
predecessor_hashes | dict[str, str] | required | Map of predecessor names to their output hashes. |
def get(key: str) -> Any | None
Retrieve a cached value by key. Returns None on cache miss.
def put(key: str, value: Any) -> None
Store a value in the cache.
def has(key: str) -> bool
Check if a key exists in the cache.
def clear() -> None
Clear all entries from the cache.
def stats() -> CacheStats
Return current cache statistics.
cache = dagron.ContentAddressableCache(
backend=dagron.FileSystemCacheBackend("./my_cache")
)
key = cache.compute_key("process", process_fn, {"fetch": "abc123"})
cache.put(key, [2, 4, 6])
print(cache.has(key)) # True
print(cache.get(key)) # [2, 4, 6]
print(cache.stats()) # CacheStats(...)
CachePolicy
class CachePolicy(
max_entries: int | None = None,
max_size_bytes: int | None = None,
ttl_seconds: float | None = None,
)
A policy that controls cache eviction. Applied to a cache backend to limit storage consumption.
| Parameter | Type | Default | Description |
|---|---|---|---|
max_entries | int | None | None | Maximum number of cache entries. Oldest entries are evicted first. |
max_size_bytes | int | None | None | Maximum total cache size in bytes. Oldest entries are evicted first. |
ttl_seconds | float | None | None | Time-to-live for each entry in seconds. Expired entries are evicted on access. |
policy = dagron.CachePolicy(
max_entries=1000,
max_size_bytes=500 * 1024 * 1024, # 500 MB
ttl_seconds=3600, # 1 hour
)
backend = dagron.FileSystemCacheBackend("./cache_dir", policy=policy)
CacheStats
class CacheStats(
hits: int,
misses: int,
evictions: int,
total_entries: int,
total_size_bytes: int,
)
Cache performance statistics.
| Property | Type | Description |
|---|---|---|
hits | int | Total cache hits since creation or last clear. |
misses | int | Total cache misses. |
evictions | int | Total entries evicted by policy. |
total_entries | int | Current number of entries in the cache. |
total_size_bytes | int | Current total size of cached data in bytes. |
@property
def hit_rate() -> float
The cache hit rate as a float between 0.0 and 1.0. Returns 0.0 if no lookups have been performed.
stats = cache.stats()
print(f"Hit rate: {stats.hit_rate:.1%}")
print(f"Entries: {stats.total_entries}")
print(f"Size: {stats.total_size_bytes / 1024 / 1024:.1f} MB")
print(f"Evictions: {stats.evictions}")
FileSystemCacheBackend
class FileSystemCacheBackend(
cache_dir: str,
policy: CachePolicy | None = None,
)
A cache backend that stores entries as files on the local filesystem. Each cache
key maps to a file in cache_dir. Supports optional eviction via a
CachePolicy.
| Parameter | Type | Default | Description |
|---|---|---|---|
cache_dir | str | required | Directory for storing cache files. Created if it does not exist. |
policy | CachePolicy | None | None | Optional eviction policy. |
Methods
def get(key: str) -> Any | None
Retrieve a cached value by key from disk.
def put(key: str, value: Any) -> None
Store a value on disk.
def has(key: str) -> bool
Check if a key exists on disk.
def delete(key: str) -> None
Remove a single entry from disk.
def clear() -> None
Remove all cache files.
def stats() -> CacheStats
Return current backend statistics.
backend = dagron.FileSystemCacheBackend(
"./my_pipeline_cache",
policy=dagron.CachePolicy(max_entries=500, ttl_seconds=7200),
)
backend.put("key123", {"data": [1, 2, 3]})
print(backend.has("key123")) # True
print(backend.get("key123")) # {"data": [1, 2, 3]}
CacheKeyBuilder
class CacheKeyBuilder
A utility for manually constructing cache keys. Used internally by
ContentAddressableCache but available for advanced use cases.
def hash_task(fn: Callable) -> str
Compute a hash of a callable's bytecode and closure.
def hash_value(value: Any) -> str
Compute a hash of an arbitrary Python value.
def build_key(
name: str,
task_hash: str,
pred_hashes: dict[str, str],
) -> str
Combine a node name, task hash, and predecessor hashes into a final cache key.
| Parameter | Type | Default | Description |
|---|---|---|---|
name | str | required | The node name. |
task_hash | str | required | Hash of the task callable. |
pred_hashes | dict[str, str] | required | Map of predecessor names to their output hashes. |
builder = dagron.CacheKeyBuilder()
task_hash = builder.hash_task(my_function)
value_hash = builder.hash_value([1, 2, 3])
key = builder.build_key("process", task_hash, {"fetch": value_hash})
CacheKeyProtocol
class CacheKeyProtocol(Protocol):
def __dagron_cache_key__(self) -> str: ...
A protocol for objects that provide their own cache key. If a task's return
value implements this protocol, the cache uses its __dagron_cache_key__()
method instead of the default hashing strategy.
class MyModel:
def __init__(self, version, data):
self.version = version
self.data = data
def __dagron_cache_key__(self) -> str:
return f"model-v{self.version}-{hash(tuple(self.data))}"
# When MyModel is returned from a task, the cache uses __dagron_cache_key__
Related
- DAGExecutor — the base executor that caching wraps.
- Incremental Execution — dirty-set-based re-execution.
- Checkpointing — save progress to disk for resume.
- Caching guide — configuration and invalidation patterns.