Skip to main content

Resource Scheduling

Many real-world pipelines require access to scarce physical resources -- GPUs for model training, memory for large datasets, or CPU slots for compute-heavy transforms. dagron's resource scheduling system lets you declare per-node requirements and execute the DAG with a scheduler that respects capacity constraints at all times.

The scheduler dispatches nodes in bottom-level priority order (longest-path-to-sink first), so the critical path gets resources before less important branches.

Core Concepts

A pipeline where training nodes compete for 4 GPUs. The scheduler ensures only 2 training nodes of this size run simultaneously.

There are four main building blocks:

ClassRole
ResourcePoolHolds the total capacity of each resource and manages blocking acquire/release.
ResourceRequirementsDeclares how much of each resource a single node needs.
ResourceAwareExecutorSynchronous executor that dispatches nodes when their resources are available.
AsyncResourceAwareExecutorAsync (asyncio) variant of the same scheduler.
ResourceTimelineRecords timestamped snapshots of resource utilization during execution.

Declaring Resources

ResourcePool

A ResourcePool represents the total resources available on the machine or cluster. Resources are named strings with integer capacities:

from dagron.execution.resources import ResourcePool

# A machine with 4 GPUs, 16 CPU slots, and 32 GB of memory
pool = ResourcePool(capacities={
"gpu": 4,
"cpu_slots": 16,
"memory_mb": 32768,
})

print(pool.capacities) # {'gpu': 4, 'cpu_slots': 16, 'memory_mb': 32768}
print(pool.available) # same as capacities initially
print(pool.allocated) # {'gpu': 0, 'cpu_slots': 0, 'memory_mb': 0}

The pool is thread-safe. Internally it uses a threading.Condition so that the executor can block on acquire() until resources are freed by another thread.

ResourceRequirements

Each node declares its needs via a ResourceRequirements object. You can use the constructor directly or one of the shorthand factory methods:

from dagron.execution.resources import ResourceRequirements

# Explicit constructor
req = ResourceRequirements(resources={"gpu": 2, "memory_mb": 4096})

# Shorthand factories
gpu_req = ResourceRequirements.gpu(2) # {"gpu": 2}
cpu_req = ResourceRequirements.cpu(4) # {"cpu_slots": 4}
mem_req = ResourceRequirements.memory(8192) # {"memory_mb": 8192}

You can combine multiple resource types in a single ResourceRequirements:

heavy_req = ResourceRequirements(resources={
"gpu": 2,
"cpu_slots": 4,
"memory_mb": 16384,
})

The fits() method checks whether a requirement can be satisfied by a given availability dict:

available = {"gpu": 3, "cpu_slots": 8, "memory_mb": 16384}
print(heavy_req.fits(available)) # True

available["gpu"] = 1
print(heavy_req.fits(available)) # False -- only 1 GPU available

Building a Resource-Scheduled Pipeline

Here is a complete example that trains two ML models concurrently, limited by GPU availability:

import dagron
from dagron.execution.resources import (
ResourceAwareExecutor,
ResourcePool,
ResourceRequirements,
)

# 1. Build the DAG
dag = (
dagron.DAG.builder()
.add_node("fetch_data")
.add_node("preprocess")
.add_node("train_resnet")
.add_node("train_bert")
.add_node("ensemble")
.add_node("deploy")
.add_edge("fetch_data", "preprocess")
.add_edge("preprocess", "train_resnet")
.add_edge("preprocess", "train_bert")
.add_edge("train_resnet", "ensemble")
.add_edge("train_bert", "ensemble")
.add_edge("ensemble", "deploy")
.build()
)

# 2. Declare resource requirements per node
requirements = {
"train_resnet": ResourceRequirements.gpu(2),
"train_bert": ResourceRequirements.gpu(3),
"ensemble": ResourceRequirements.gpu(1),
# fetch_data, preprocess, deploy need no special resources
}

# 3. Create the resource pool (4 GPUs available)
pool = ResourcePool(capacities={"gpu": 4})

# 4. Create the executor
executor = ResourceAwareExecutor(
dag,
resource_pool=pool,
requirements=requirements,
)

# 5. Define tasks
def fetch_data():
print("Fetching dataset...")
return {"rows": 10000}

def preprocess():
print("Cleaning data...")
return {"rows": 9500}

def train_resnet():
print("Training ResNet on 2 GPUs...")
return {"accuracy": 0.91}

def train_bert():
print("Training BERT on 3 GPUs...")
return {"accuracy": 0.94}

def ensemble():
print("Ensembling models on 1 GPU...")
return {"accuracy": 0.96}

def deploy():
print("Deploying model...")
return "deployed"

# 6. Execute
result = executor.execute({
"fetch_data": fetch_data,
"preprocess": preprocess,
"train_resnet": train_resnet,
"train_bert": train_bert,
"ensemble": ensemble,
"deploy": deploy,
})

print(f"Succeeded: {result.succeeded}, Failed: {result.failed}")

Because train_resnet needs 2 GPUs and train_bert needs 3, and the pool has only 4, they cannot run simultaneously. The scheduler dispatches whichever has a higher bottom-level priority first, then dispatches the other once the first releases its GPUs.

Serialized timeline: train_bert runs first (higher bottom-level priority), then train_resnet once GPUs are freed.

If you change the pool to 5 GPUs, both training nodes can run concurrently since 2 + 3 = 5 fits within capacity.


Pre-Validation

Before execution begins, ResourceAwareExecutor validates that every node's requirements can ever be satisfied by the pool:

# This will raise immediately -- a single node needs 8 GPUs but pool has 4
requirements["train_huge"] = ResourceRequirements.gpu(8)

try:
executor.execute(tasks)
except ValueError as e:
print(e)
# "Node 'train_huge' requires {'gpu': 8} but pool capacity is {'gpu': 4}"

This check prevents deadlocks where a node would block forever because the pool is too small.


Priority Scheduling

The executor computes bottom-level priorities for each node. The bottom level is the longest weighted path from a node to any sink. Nodes with higher bottom levels are dispatched first because they sit on the critical path.

You can provide optional cost estimates to influence priority:

costs = {
"train_resnet": 120.0, # seconds
"train_bert": 300.0,
"ensemble": 60.0,
}

executor = ResourceAwareExecutor(
dag,
resource_pool=pool,
requirements=requirements,
costs=costs,
)

With these costs, train_bert has a higher bottom-level value and the scheduler gives it resources first.


Async Resource Scheduling

For asyncio-based pipelines, use AsyncResourceAwareExecutor:

import asyncio
from dagron.execution.resources import AsyncResourceAwareExecutor

async def train_resnet_async():
await asyncio.sleep(2) # simulate training
return {"accuracy": 0.91}

async def train_bert_async():
await asyncio.sleep(5)
return {"accuracy": 0.94}

async def main():
executor = AsyncResourceAwareExecutor(
dag,
resource_pool=pool,
requirements=requirements,
)
result = await executor.execute({
"fetch_data": lambda: "data",
"preprocess": lambda: "cleaned",
"train_resnet": train_resnet_async,
"train_bert": train_bert_async,
"ensemble": lambda: "ensembled",
"deploy": lambda: "deployed",
})
print(f"Done in {result.total_duration_seconds:.1f}s")

asyncio.run(main())

The async executor uses asyncio.create_task for concurrency while the underlying ResourcePool still uses threading primitives for acquire/release (safe from async code via the GIL).


ResourceTimeline and Utilization Tracking

Every ResourcePool automatically records a ResourceTimeline that captures timestamped snapshots of resource allocation and availability. After execution you can inspect utilization:

timeline = pool.timeline

# Iterate over snapshots
for snap in timeline.snapshots:
print(
f" t={snap.timestamp:.3f}s "
f"node={snap.node_name} "
f"event={snap.event} "
f"allocated={snap.allocated} "
f"available={snap.available}"
)

# Peak utilization across the entire execution
peaks = timeline.peak_utilization()
print(f"Peak GPU utilization: {peaks.get('gpu', 0)} / {pool.capacities['gpu']}")

A typical timeline output might look like:

  t=0.001s  node=train_bert    event=acquired   allocated={'gpu': 3}  available={'gpu': 1}
t=5.012s node=train_bert event=released allocated={'gpu': 0} available={'gpu': 4}
t=5.013s node=train_resnet event=acquired allocated={'gpu': 2} available={'gpu': 2}
t=7.045s node=train_resnet event=released allocated={'gpu': 0} available={'gpu': 4}
t=7.046s node=ensemble event=acquired allocated={'gpu': 1} available={'gpu': 3}
t=7.102s node=ensemble event=released allocated={'gpu': 0} available={'gpu': 4}
Peak GPU utilization: 3 / 4

ResourceSnapshot

Each snapshot is a ResourceSnapshot dataclass:

FieldTypeDescription
timestampfloatSeconds since the first snapshot was recorded.
allocateddict[str, int]Resources currently allocated at this point in time.
availabledict[str, int]Resources still available at this point in time.
node_namestr | NoneThe node that triggered this snapshot.
eventstrEither "acquired" or "released".

Manual Acquire and Release

You can also use the pool directly outside of an executor, for example in custom scheduling logic:

pool = ResourcePool(capacities={"gpu": 4})
req = ResourceRequirements.gpu(2)

# Non-blocking attempt
if pool.try_acquire(req, node_name="my_node"):
try:
run_gpu_work()
finally:
pool.release(req, node_name="my_node")
else:
print("GPUs not available right now")

# Blocking with timeout
acquired = pool.acquire(req, node_name="my_node", timeout=30.0)
if acquired:
try:
run_gpu_work()
finally:
pool.release(req, node_name="my_node")
else:
print("Timed out waiting for GPUs")

Callbacks and Tracing

ResourceAwareExecutor accepts ExecutionCallbacks that fire during execution. Two resource-specific callbacks are available:

from dagron.execution._types import ExecutionCallbacks

callbacks = ExecutionCallbacks(
on_start=lambda name: print(f" [{name}] started"),
on_complete=lambda name, val: print(f" [{name}] completed: {val}"),
on_failure=lambda name, err: print(f" [{name}] FAILED: {err}"),
on_resource_acquired=lambda name, res: print(f" [{name}] acquired {res}"),
on_resource_released=lambda name, res: print(f" [{name}] released {res}"),
)

executor = ResourceAwareExecutor(
dag,
resource_pool=pool,
requirements=requirements,
callbacks=callbacks,
enable_tracing=True,
)

result = executor.execute(tasks)

# Access the trace for Chrome-compatible profiling
if result.trace:
result.trace.to_chrome_json("resource_trace.json")

When enable_tracing=True, the executor records RESOURCE_ACQUIRED and RESOURCE_RELEASED trace events alongside the standard node start/complete events.


Fail-Fast Behavior

By default, fail_fast=True. If a node fails, all downstream nodes are skipped immediately without acquiring resources:

executor = ResourceAwareExecutor(
dag,
resource_pool=pool,
requirements=requirements,
fail_fast=True, # default
)

Set fail_fast=False to let independent branches continue executing even when one branch fails.


Multi-Resource Scheduling Example

Here is a more realistic example combining GPU, CPU, and memory constraints:

pool = ResourcePool(capacities={
"gpu": 4,
"cpu_slots": 16,
"memory_mb": 65536, # 64 GB
})

requirements = {
"ingest": ResourceRequirements(resources={"cpu_slots": 2, "memory_mb": 4096}),
"feature_eng": ResourceRequirements(resources={"cpu_slots": 8, "memory_mb": 16384}),
"train_xgboost": ResourceRequirements(resources={"cpu_slots": 4, "memory_mb": 8192}),
"train_nn": ResourceRequirements(resources={"gpu": 2, "cpu_slots": 2, "memory_mb": 16384}),
"explain_shap": ResourceRequirements(resources={"cpu_slots": 8, "memory_mb": 32768}),
"deploy": ResourceRequirements(resources={"cpu_slots": 1}),
}

executor = ResourceAwareExecutor(dag, pool, requirements)
result = executor.execute(tasks)

A node is only dispatched when all of its required resources are simultaneously available. This prevents situations where a node acquires some GPUs but blocks on memory, starving other nodes.


Best Practices

  1. Right-size your pools. Start with the actual hardware capacity. If you have 4 GPUs, set "gpu": 4.

  2. Use costs for critical-path optimization. Provide estimated runtimes so the scheduler prioritizes the bottleneck path.

  3. Always release resources. The executor handles this automatically, but if you use ResourcePool manually, use try/finally.

  4. Check peak_utilization() after execution. If peak usage is far below capacity, you may have too-conservative requirements. If it equals capacity, you are fully saturating your hardware.

  5. Combine with tracing. Enable enable_tracing=True to generate Chrome-compatible traces that show resource acquire/release events overlaid on the execution timeline.