Resource Scheduling
Many real-world pipelines require access to scarce physical resources -- GPUs for model training, memory for large datasets, or CPU slots for compute-heavy transforms. dagron's resource scheduling system lets you declare per-node requirements and execute the DAG with a scheduler that respects capacity constraints at all times.
The scheduler dispatches nodes in bottom-level priority order (longest-path-to-sink first), so the critical path gets resources before less important branches.
Core Concepts
A pipeline where training nodes compete for 4 GPUs. The scheduler ensures only 2 training nodes of this size run simultaneously.
There are four main building blocks:
| Class | Role |
|---|---|
ResourcePool | Holds the total capacity of each resource and manages blocking acquire/release. |
ResourceRequirements | Declares how much of each resource a single node needs. |
ResourceAwareExecutor | Synchronous executor that dispatches nodes when their resources are available. |
AsyncResourceAwareExecutor | Async (asyncio) variant of the same scheduler. |
ResourceTimeline | Records timestamped snapshots of resource utilization during execution. |
Declaring Resources
ResourcePool
A ResourcePool represents the total resources available on the machine or cluster. Resources are named strings with integer capacities:
from dagron.execution.resources import ResourcePool
# A machine with 4 GPUs, 16 CPU slots, and 32 GB of memory
pool = ResourcePool(capacities={
"gpu": 4,
"cpu_slots": 16,
"memory_mb": 32768,
})
print(pool.capacities) # {'gpu': 4, 'cpu_slots': 16, 'memory_mb': 32768}
print(pool.available) # same as capacities initially
print(pool.allocated) # {'gpu': 0, 'cpu_slots': 0, 'memory_mb': 0}
The pool is thread-safe. Internally it uses a threading.Condition so that the executor can block on acquire() until resources are freed by another thread.
ResourceRequirements
Each node declares its needs via a ResourceRequirements object. You can use the constructor directly or one of the shorthand factory methods:
from dagron.execution.resources import ResourceRequirements
# Explicit constructor
req = ResourceRequirements(resources={"gpu": 2, "memory_mb": 4096})
# Shorthand factories
gpu_req = ResourceRequirements.gpu(2) # {"gpu": 2}
cpu_req = ResourceRequirements.cpu(4) # {"cpu_slots": 4}
mem_req = ResourceRequirements.memory(8192) # {"memory_mb": 8192}
You can combine multiple resource types in a single ResourceRequirements:
heavy_req = ResourceRequirements(resources={
"gpu": 2,
"cpu_slots": 4,
"memory_mb": 16384,
})
The fits() method checks whether a requirement can be satisfied by a given availability dict:
available = {"gpu": 3, "cpu_slots": 8, "memory_mb": 16384}
print(heavy_req.fits(available)) # True
available["gpu"] = 1
print(heavy_req.fits(available)) # False -- only 1 GPU available
Building a Resource-Scheduled Pipeline
Here is a complete example that trains two ML models concurrently, limited by GPU availability:
import dagron
from dagron.execution.resources import (
ResourceAwareExecutor,
ResourcePool,
ResourceRequirements,
)
# 1. Build the DAG
dag = (
dagron.DAG.builder()
.add_node("fetch_data")
.add_node("preprocess")
.add_node("train_resnet")
.add_node("train_bert")
.add_node("ensemble")
.add_node("deploy")
.add_edge("fetch_data", "preprocess")
.add_edge("preprocess", "train_resnet")
.add_edge("preprocess", "train_bert")
.add_edge("train_resnet", "ensemble")
.add_edge("train_bert", "ensemble")
.add_edge("ensemble", "deploy")
.build()
)
# 2. Declare resource requirements per node
requirements = {
"train_resnet": ResourceRequirements.gpu(2),
"train_bert": ResourceRequirements.gpu(3),
"ensemble": ResourceRequirements.gpu(1),
# fetch_data, preprocess, deploy need no special resources
}
# 3. Create the resource pool (4 GPUs available)
pool = ResourcePool(capacities={"gpu": 4})
# 4. Create the executor
executor = ResourceAwareExecutor(
dag,
resource_pool=pool,
requirements=requirements,
)
# 5. Define tasks
def fetch_data():
print("Fetching dataset...")
return {"rows": 10000}
def preprocess():
print("Cleaning data...")
return {"rows": 9500}
def train_resnet():
print("Training ResNet on 2 GPUs...")
return {"accuracy": 0.91}
def train_bert():
print("Training BERT on 3 GPUs...")
return {"accuracy": 0.94}
def ensemble():
print("Ensembling models on 1 GPU...")
return {"accuracy": 0.96}
def deploy():
print("Deploying model...")
return "deployed"
# 6. Execute
result = executor.execute({
"fetch_data": fetch_data,
"preprocess": preprocess,
"train_resnet": train_resnet,
"train_bert": train_bert,
"ensemble": ensemble,
"deploy": deploy,
})
print(f"Succeeded: {result.succeeded}, Failed: {result.failed}")
Because train_resnet needs 2 GPUs and train_bert needs 3, and the pool has only 4, they cannot run simultaneously. The scheduler dispatches whichever has a higher bottom-level priority first, then dispatches the other once the first releases its GPUs.
Serialized timeline: train_bert runs first (higher bottom-level priority), then train_resnet once GPUs are freed.
If you change the pool to 5 GPUs, both training nodes can run concurrently since 2 + 3 = 5 fits within capacity.
Pre-Validation
Before execution begins, ResourceAwareExecutor validates that every node's requirements can ever be satisfied by the pool:
# This will raise immediately -- a single node needs 8 GPUs but pool has 4
requirements["train_huge"] = ResourceRequirements.gpu(8)
try:
executor.execute(tasks)
except ValueError as e:
print(e)
# "Node 'train_huge' requires {'gpu': 8} but pool capacity is {'gpu': 4}"
This check prevents deadlocks where a node would block forever because the pool is too small.
Priority Scheduling
The executor computes bottom-level priorities for each node. The bottom level is the longest weighted path from a node to any sink. Nodes with higher bottom levels are dispatched first because they sit on the critical path.
You can provide optional cost estimates to influence priority:
costs = {
"train_resnet": 120.0, # seconds
"train_bert": 300.0,
"ensemble": 60.0,
}
executor = ResourceAwareExecutor(
dag,
resource_pool=pool,
requirements=requirements,
costs=costs,
)
With these costs, train_bert has a higher bottom-level value and the scheduler gives it resources first.
Async Resource Scheduling
For asyncio-based pipelines, use AsyncResourceAwareExecutor:
import asyncio
from dagron.execution.resources import AsyncResourceAwareExecutor
async def train_resnet_async():
await asyncio.sleep(2) # simulate training
return {"accuracy": 0.91}
async def train_bert_async():
await asyncio.sleep(5)
return {"accuracy": 0.94}
async def main():
executor = AsyncResourceAwareExecutor(
dag,
resource_pool=pool,
requirements=requirements,
)
result = await executor.execute({
"fetch_data": lambda: "data",
"preprocess": lambda: "cleaned",
"train_resnet": train_resnet_async,
"train_bert": train_bert_async,
"ensemble": lambda: "ensembled",
"deploy": lambda: "deployed",
})
print(f"Done in {result.total_duration_seconds:.1f}s")
asyncio.run(main())
The async executor uses asyncio.create_task for concurrency while the underlying ResourcePool still uses threading primitives for acquire/release (safe from async code via the GIL).
ResourceTimeline and Utilization Tracking
Every ResourcePool automatically records a ResourceTimeline that captures timestamped snapshots of resource allocation and availability. After execution you can inspect utilization:
timeline = pool.timeline
# Iterate over snapshots
for snap in timeline.snapshots:
print(
f" t={snap.timestamp:.3f}s "
f"node={snap.node_name} "
f"event={snap.event} "
f"allocated={snap.allocated} "
f"available={snap.available}"
)
# Peak utilization across the entire execution
peaks = timeline.peak_utilization()
print(f"Peak GPU utilization: {peaks.get('gpu', 0)} / {pool.capacities['gpu']}")
A typical timeline output might look like:
t=0.001s node=train_bert event=acquired allocated={'gpu': 3} available={'gpu': 1}
t=5.012s node=train_bert event=released allocated={'gpu': 0} available={'gpu': 4}
t=5.013s node=train_resnet event=acquired allocated={'gpu': 2} available={'gpu': 2}
t=7.045s node=train_resnet event=released allocated={'gpu': 0} available={'gpu': 4}
t=7.046s node=ensemble event=acquired allocated={'gpu': 1} available={'gpu': 3}
t=7.102s node=ensemble event=released allocated={'gpu': 0} available={'gpu': 4}
Peak GPU utilization: 3 / 4
ResourceSnapshot
Each snapshot is a ResourceSnapshot dataclass:
| Field | Type | Description |
|---|---|---|
timestamp | float | Seconds since the first snapshot was recorded. |
allocated | dict[str, int] | Resources currently allocated at this point in time. |
available | dict[str, int] | Resources still available at this point in time. |
node_name | str | None | The node that triggered this snapshot. |
event | str | Either "acquired" or "released". |
Manual Acquire and Release
You can also use the pool directly outside of an executor, for example in custom scheduling logic:
pool = ResourcePool(capacities={"gpu": 4})
req = ResourceRequirements.gpu(2)
# Non-blocking attempt
if pool.try_acquire(req, node_name="my_node"):
try:
run_gpu_work()
finally:
pool.release(req, node_name="my_node")
else:
print("GPUs not available right now")
# Blocking with timeout
acquired = pool.acquire(req, node_name="my_node", timeout=30.0)
if acquired:
try:
run_gpu_work()
finally:
pool.release(req, node_name="my_node")
else:
print("Timed out waiting for GPUs")
Callbacks and Tracing
ResourceAwareExecutor accepts ExecutionCallbacks that fire during execution. Two resource-specific callbacks are available:
from dagron.execution._types import ExecutionCallbacks
callbacks = ExecutionCallbacks(
on_start=lambda name: print(f" [{name}] started"),
on_complete=lambda name, val: print(f" [{name}] completed: {val}"),
on_failure=lambda name, err: print(f" [{name}] FAILED: {err}"),
on_resource_acquired=lambda name, res: print(f" [{name}] acquired {res}"),
on_resource_released=lambda name, res: print(f" [{name}] released {res}"),
)
executor = ResourceAwareExecutor(
dag,
resource_pool=pool,
requirements=requirements,
callbacks=callbacks,
enable_tracing=True,
)
result = executor.execute(tasks)
# Access the trace for Chrome-compatible profiling
if result.trace:
result.trace.to_chrome_json("resource_trace.json")
When enable_tracing=True, the executor records RESOURCE_ACQUIRED and RESOURCE_RELEASED trace events alongside the standard node start/complete events.
Fail-Fast Behavior
By default, fail_fast=True. If a node fails, all downstream nodes are skipped immediately without acquiring resources:
executor = ResourceAwareExecutor(
dag,
resource_pool=pool,
requirements=requirements,
fail_fast=True, # default
)
Set fail_fast=False to let independent branches continue executing even when one branch fails.
Multi-Resource Scheduling Example
Here is a more realistic example combining GPU, CPU, and memory constraints:
pool = ResourcePool(capacities={
"gpu": 4,
"cpu_slots": 16,
"memory_mb": 65536, # 64 GB
})
requirements = {
"ingest": ResourceRequirements(resources={"cpu_slots": 2, "memory_mb": 4096}),
"feature_eng": ResourceRequirements(resources={"cpu_slots": 8, "memory_mb": 16384}),
"train_xgboost": ResourceRequirements(resources={"cpu_slots": 4, "memory_mb": 8192}),
"train_nn": ResourceRequirements(resources={"gpu": 2, "cpu_slots": 2, "memory_mb": 16384}),
"explain_shap": ResourceRequirements(resources={"cpu_slots": 8, "memory_mb": 32768}),
"deploy": ResourceRequirements(resources={"cpu_slots": 1}),
}
executor = ResourceAwareExecutor(dag, pool, requirements)
result = executor.execute(tasks)
A node is only dispatched when all of its required resources are simultaneously available. This prevents situations where a node acquires some GPUs but blocks on memory, starving other nodes.
Best Practices
-
Right-size your pools. Start with the actual hardware capacity. If you have 4 GPUs, set
"gpu": 4. -
Use
costsfor critical-path optimization. Provide estimated runtimes so the scheduler prioritizes the bottleneck path. -
Always release resources. The executor handles this automatically, but if you use
ResourcePoolmanually, usetry/finally. -
Check
peak_utilization()after execution. If peak usage is far below capacity, you may have too-conservative requirements. If it equals capacity, you are fully saturating your hardware. -
Combine with tracing. Enable
enable_tracing=Trueto generate Chrome-compatible traces that show resource acquire/release events overlaid on the execution timeline.
Related
- API Reference: Resources -- full API documentation for all resource classes.
- Executing Tasks -- standard execution without resource constraints.
- Distributed Execution -- running nodes across multiple machines.
- Tracing & Profiling -- Chrome-compatible execution traces.