Skip to main content

Caching

The caching module provides content-addressable, Merkle-tree-based caching for DAG execution. A node's cache key is derived from its task code, its predecessors' output hashes, and its name — so if nothing upstream has changed, the cached result is returned without re-execution.

See the Caching guide for configuration patterns, backend selection, and cache invalidation strategies.


CachedDAGExecutor

CachedDAGExecutor
class CachedDAGExecutor(
dag: DAG,
cache: ContentAddressableCache,
callbacks: ExecutionCallbacks | None = None,
fail_fast: bool = True,
enable_tracing: bool = False,
)

An executor that wraps the standard DAGExecutor with a content-addressable cache layer. Before executing a node, the executor checks the cache; if a valid entry exists, the cached result is returned and the node is marked CACHE_HIT.

ParameterTypeDefaultDescription
dagDAGrequiredThe DAG whose topology drives execution order.
cacheContentAddressableCacherequiredThe cache instance to use for storing and retrieving results.
callbacksExecutionCallbacks | NoneNoneOptional lifecycle callbacks.
fail_fastboolTrueIf True, skip downstream nodes when any node fails.
enable_tracingboolFalseIf True, record a Chrome-compatible execution trace.

execute

CachedDAGExecutor.execute
def execute(
tasks: dict[str, Callable],
) -> CachedExecutionResult

Execute tasks with caching. Returns a CachedExecutionResult with cache statistics.

ParameterTypeDefaultDescription
tasksdict[str, Callable]requiredMap of node names to callable tasks.
import dagron

dag = (
dagron.DAG.builder()
.add_node("fetch").add_node("process").add_node("save")
.add_edge("fetch", "process").add_edge("process", "save")
.build()
)

cache = dagron.ContentAddressableCache(
backend=dagron.FileSystemCacheBackend("./cache_dir")
)

executor = dagron.CachedDAGExecutor(dag, cache=cache)

# First run — all nodes executed
result = executor.execute({
"fetch": lambda: [1, 2, 3],
"process": lambda: [2, 4, 6],
"save": lambda: "done",
})
print(result.cache_hits, result.cache_misses) # 0, 3

# Second run — all nodes cached
result = executor.execute({
"fetch": lambda: [1, 2, 3],
"process": lambda: [2, 4, 6],
"save": lambda: "done",
})
print(result.cache_hits, result.cache_misses) # 3, 0

CachedExecutionResult

CachedExecutionResult
class CachedExecutionResult(
execution_result: ExecutionResult,
cache_hits: int,
cache_misses: int,
nodes_executed: int,
nodes_cached: int,
)

Extends the standard ExecutionResult with cache statistics.

PropertyTypeDescription
execution_resultExecutionResultThe underlying execution result with per-node details.
cache_hitsintNumber of nodes whose results were loaded from cache.
cache_missesintNumber of nodes that were executed (cache miss).
nodes_executedintNumber of nodes that were actually executed.
nodes_cachedintNumber of nodes that returned cached results.
print(f"Hit rate: {result.cache_hits}/{result.cache_hits + result.cache_misses}")
print(f"Nodes executed: {result.nodes_executed}")
print(f"Nodes cached: {result.nodes_cached}")

ContentAddressableCache

ContentAddressableCache
class ContentAddressableCache(
backend: CacheBackend,
)

A Merkle-tree cache that computes content-addressable keys from task code and predecessor output hashes. This ensures that a cache entry is only valid when the exact same computation with the exact same inputs was previously executed.

ParameterTypeDefaultDescription
backendCacheBackendrequiredThe storage backend (e.g., FileSystemCacheBackend).

Methods

ContentAddressableCache.compute_key
def compute_key(
name: str,
task: Callable,
predecessor_hashes: dict[str, str],
) -> str

Compute the content-addressable cache key for a node.

ParameterTypeDefaultDescription
namestrrequiredThe node name.
taskCallablerequiredThe task callable.
predecessor_hashesdict[str, str]requiredMap of predecessor names to their output hashes.
ContentAddressableCache.get
def get(key: str) -> Any | None

Retrieve a cached value by key. Returns None on cache miss.

ContentAddressableCache.put
def put(key: str, value: Any) -> None

Store a value in the cache.

ContentAddressableCache.has
def has(key: str) -> bool

Check if a key exists in the cache.

Clear all entries from the cache.

ContentAddressableCache.stats
def stats() -> CacheStats

Return current cache statistics.

cache = dagron.ContentAddressableCache(
backend=dagron.FileSystemCacheBackend("./my_cache")
)

key = cache.compute_key("process", process_fn, {"fetch": "abc123"})
cache.put(key, [2, 4, 6])
print(cache.has(key)) # True
print(cache.get(key)) # [2, 4, 6]
print(cache.stats()) # CacheStats(...)

CachePolicy

CachePolicy
class CachePolicy(
max_entries: int | None = None,
max_size_bytes: int | None = None,
ttl_seconds: float | None = None,
)

A policy that controls cache eviction. Applied to a cache backend to limit storage consumption.

ParameterTypeDefaultDescription
max_entriesint | NoneNoneMaximum number of cache entries. Oldest entries are evicted first.
max_size_bytesint | NoneNoneMaximum total cache size in bytes. Oldest entries are evicted first.
ttl_secondsfloat | NoneNoneTime-to-live for each entry in seconds. Expired entries are evicted on access.
policy = dagron.CachePolicy(
max_entries=1000,
max_size_bytes=500 * 1024 * 1024, # 500 MB
ttl_seconds=3600, # 1 hour
)
backend = dagron.FileSystemCacheBackend("./cache_dir", policy=policy)

CacheStats

CacheStats
class CacheStats(
hits: int,
misses: int,
evictions: int,
total_entries: int,
total_size_bytes: int,
)

Cache performance statistics.

PropertyTypeDescription
hitsintTotal cache hits since creation or last clear.
missesintTotal cache misses.
evictionsintTotal entries evicted by policy.
total_entriesintCurrent number of entries in the cache.
total_size_bytesintCurrent total size of cached data in bytes.
CacheStats.hit_rate
@property
def hit_rate() -> float

The cache hit rate as a float between 0.0 and 1.0. Returns 0.0 if no lookups have been performed.

stats = cache.stats()
print(f"Hit rate: {stats.hit_rate:.1%}")
print(f"Entries: {stats.total_entries}")
print(f"Size: {stats.total_size_bytes / 1024 / 1024:.1f} MB")
print(f"Evictions: {stats.evictions}")

FileSystemCacheBackend

FileSystemCacheBackend
class FileSystemCacheBackend(
cache_dir: str,
policy: CachePolicy | None = None,
)

A cache backend that stores entries as files on the local filesystem. Each cache key maps to a file in cache_dir. Supports optional eviction via a CachePolicy.

ParameterTypeDefaultDescription
cache_dirstrrequiredDirectory for storing cache files. Created if it does not exist.
policyCachePolicy | NoneNoneOptional eviction policy.

Methods

FileSystemCacheBackend.get
def get(key: str) -> Any | None

Retrieve a cached value by key from disk.

FileSystemCacheBackend.put
def put(key: str, value: Any) -> None

Store a value on disk.

FileSystemCacheBackend.has
def has(key: str) -> bool

Check if a key exists on disk.

FileSystemCacheBackend.delete
def delete(key: str) -> None

Remove a single entry from disk.

FileSystemCacheBackend.clear
def clear() -> None

Remove all cache files.

FileSystemCacheBackend.stats
def stats() -> CacheStats

Return current backend statistics.

backend = dagron.FileSystemCacheBackend(
"./my_pipeline_cache",
policy=dagron.CachePolicy(max_entries=500, ttl_seconds=7200),
)

backend.put("key123", {"data": [1, 2, 3]})
print(backend.has("key123")) # True
print(backend.get("key123")) # {"data": [1, 2, 3]}

CacheKeyBuilder

CacheKeyBuilder
class CacheKeyBuilder

A utility for manually constructing cache keys. Used internally by ContentAddressableCache but available for advanced use cases.

CacheKeyBuilder.hash_task
def hash_task(fn: Callable) -> str

Compute a hash of a callable's bytecode and closure.

CacheKeyBuilder.hash_value
def hash_value(value: Any) -> str

Compute a hash of an arbitrary Python value.

CacheKeyBuilder.build_key
def build_key(
name: str,
task_hash: str,
pred_hashes: dict[str, str],
) -> str

Combine a node name, task hash, and predecessor hashes into a final cache key.

ParameterTypeDefaultDescription
namestrrequiredThe node name.
task_hashstrrequiredHash of the task callable.
pred_hashesdict[str, str]requiredMap of predecessor names to their output hashes.
builder = dagron.CacheKeyBuilder()
task_hash = builder.hash_task(my_function)
value_hash = builder.hash_value([1, 2, 3])
key = builder.build_key("process", task_hash, {"fetch": value_hash})

CacheKeyProtocol

CacheKeyProtocol
class CacheKeyProtocol(Protocol):
def __dagron_cache_key__(self) -> str: ...

A protocol for objects that provide their own cache key. If a task's return value implements this protocol, the cache uses its __dagron_cache_key__() method instead of the default hashing strategy.

class MyModel:
def __init__(self, version, data):
self.version = version
self.data = data

def __dagron_cache_key__(self) -> str:
return f"model-v{self.version}-{hash(tuple(self.data))}"

# When MyModel is returned from a task, the cache uses __dagron_cache_key__