Resource Scheduling
The resource scheduling module extends the standard executor with capacity-aware scheduling. Nodes declare their resource requirements (GPU, CPU, memory, or custom resources), and the executor only dispatches a node when the resource pool has sufficient capacity. This prevents oversubscription and enables scheduling of heterogeneous workloads.
See the Resource Scheduling guide for usage patterns and capacity planning.
ResourceAwareExecutor
class ResourceAwareExecutor(
dag: DAG,
resource_pool: ResourcePool,
requirements: dict[str, ResourceRequirements],
costs: dict[str, float] | None = None,
callbacks: ExecutionCallbacks | None = None,
fail_fast: bool = True,
enable_tracing: bool = False,
)
A synchronous executor that checks resource availability before dispatching each node. When a node cannot be scheduled due to insufficient resources, it blocks until resources are released by completed nodes.
| Parameter | Type | Default | Description |
|---|---|---|---|
dag | DAG | required | The DAG whose topology drives execution order. |
resource_pool | ResourcePool | required | The pool of available resources. |
requirements | dict[str, ResourceRequirements] | required | Map of node names to their resource requirements. |
costs | dict[str, float] | None | None | Per-node cost estimates for scheduling priority. |
callbacks | ExecutionCallbacks | None | None | Optional lifecycle callbacks. Includes on_resource_acquired and on_resource_released. |
fail_fast | bool | True | If True, skip downstream nodes when any node fails. |
enable_tracing | bool | False | If True, record a Chrome-compatible execution trace. |
execute
def execute(
tasks: dict[str, Callable],
) -> ExecutionResult
Execute tasks with resource-aware scheduling.
| Parameter | Type | Default | Description |
|---|---|---|---|
tasks | dict[str, Callable] | required | Map of node names to callable tasks. |
Returns: ExecutionResult
import dagron
dag = (
dagron.DAG.builder()
.add_node("preprocess")
.add_node("train_model_a")
.add_node("train_model_b")
.add_node("evaluate")
.add_edge("preprocess", "train_model_a")
.add_edge("preprocess", "train_model_b")
.add_edge("train_model_a", "evaluate")
.add_edge("train_model_b", "evaluate")
.build()
)
pool = dagron.ResourcePool(capacities={"gpu": 2, "cpu": 8, "memory_mb": 16000})
requirements = {
"preprocess": dagron.ResourceRequirements(resources={"cpu": 2, "memory_mb": 2000}),
"train_model_a": dagron.ResourceRequirements(resources={"gpu": 1, "cpu": 4, "memory_mb": 8000}),
"train_model_b": dagron.ResourceRequirements(resources={"gpu": 1, "cpu": 4, "memory_mb": 8000}),
"evaluate": dagron.ResourceRequirements(resources={"cpu": 2, "memory_mb": 4000}),
}
executor = dagron.ResourceAwareExecutor(dag, pool, requirements)
result = executor.execute({
"preprocess": lambda: "data ready",
"train_model_a": lambda: "model A trained",
"train_model_b": lambda: "model B trained",
"evaluate": lambda: "evaluation complete",
})
print(result.succeeded) # 4
AsyncResourceAwareExecutor
class AsyncResourceAwareExecutor(
dag: DAG,
resource_pool: ResourcePool,
requirements: dict[str, ResourceRequirements],
costs: dict[str, float] | None = None,
callbacks: ExecutionCallbacks | None = None,
fail_fast: bool = True,
enable_tracing: bool = False,
)
An async variant of ResourceAwareExecutor. Same interface but tasks must be
async callables.
| Parameter | Type | Default | Description |
|---|---|---|---|
dag | DAG | required | The DAG whose topology drives execution order. |
resource_pool | ResourcePool | required | The pool of available resources. |
requirements | dict[str, ResourceRequirements] | required | Map of node names to their resource requirements. |
costs | dict[str, float] | None | None | Per-node cost estimates for scheduling priority. |
callbacks | ExecutionCallbacks | None | None | Optional lifecycle callbacks. |
fail_fast | bool | True | If True, skip downstream nodes when any node fails. |
enable_tracing | bool | False | If True, record a Chrome-compatible execution trace. |
execute (async)
async def execute(
tasks: dict[str, Callable[..., Awaitable]],
) -> ExecutionResult
Execute async tasks with resource-aware scheduling.
import asyncio
import dagron
async def main():
executor = dagron.AsyncResourceAwareExecutor(dag, pool, requirements)
result = await executor.execute({
"preprocess": lambda: preprocess_async(),
"train_model_a": lambda: train_async("model_a"),
"train_model_b": lambda: train_async("model_b"),
"evaluate": lambda: evaluate_async(),
})
print(result.succeeded)
asyncio.run(main())
ResourcePool
class ResourcePool(
capacities: dict[str, float],
)
A pool of named resources with finite capacities. Resources are acquired before a node runs and released after it completes (or fails).
| Parameter | Type | Default | Description |
|---|---|---|---|
capacities | dict[str, float] | required | Map of resource names to their total capacities. |
can_satisfy
def can_satisfy(
requirements: ResourceRequirements,
) -> bool
Return True if the pool's total capacity can satisfy the given requirements
(ignoring current allocation). Useful for validating requirements before
execution.
try_acquire
def try_acquire(
requirements: ResourceRequirements,
) -> bool
Attempt to acquire resources without blocking. Returns True if successful,
False if insufficient resources are currently available.
acquire
def acquire(
requirements: ResourceRequirements,
timeout: float | None = None,
) -> bool
Acquire resources, blocking until they become available or the timeout expires.
Returns True on success, False on timeout.
| Parameter | Type | Default | Description |
|---|---|---|---|
requirements | ResourceRequirements | required | The resources to acquire. |
timeout | float | None | None | Maximum time to wait in seconds. None means wait indefinitely. |
release
def release(
requirements: ResourceRequirements,
) -> None
Release previously acquired resources back to the pool.
Properties
@property
def capacities() -> dict[str, float]
The total capacity of each resource.
@property
def available() -> dict[str, float]
The currently available (unallocated) amount of each resource.
@property
def allocated() -> dict[str, float]
The currently allocated amount of each resource.
@property
def timeline() -> ResourceTimeline
A timeline of resource allocation events for visualization and debugging.
pool = dagron.ResourcePool(capacities={"gpu": 4, "cpu": 16, "memory_mb": 32000})
print(pool.capacities) # {"gpu": 4, "cpu": 16, "memory_mb": 32000}
print(pool.available) # {"gpu": 4, "cpu": 16, "memory_mb": 32000}
req = dagron.ResourceRequirements(resources={"gpu": 2})
pool.acquire(req)
print(pool.available) # {"gpu": 2, "cpu": 16, "memory_mb": 32000}
print(pool.allocated) # {"gpu": 2, "cpu": 0, "memory_mb": 0}
pool.release(req)
print(pool.available) # {"gpu": 4, "cpu": 16, "memory_mb": 32000}
ResourceRequirements
class ResourceRequirements(
resources: dict[str, float],
)
A set of resource requirements for a single node.
| Parameter | Type | Default | Description |
|---|---|---|---|
resources | dict[str, float] | required | Map of resource names to required amounts. |
Convenience Constructors
@classmethod
def gpu(n: float) -> ResourceRequirements
Create a requirement for n GPUs.
@classmethod
def cpu(n: float) -> ResourceRequirements
Create a requirement for n CPU cores.
@classmethod
def memory(mb: float) -> ResourceRequirements
Create a requirement for mb megabytes of memory.
fits
def fits(
available: dict[str, float],
) -> bool
Return True if the given available resources can satisfy this requirement.
| Parameter | Type | Default | Description |
|---|---|---|---|
available | dict[str, float] | required | Map of resource names to available amounts. |
# Explicit construction
req = dagron.ResourceRequirements(resources={"gpu": 2, "cpu": 4, "memory_mb": 8000})
# Convenience constructors
gpu_req = dagron.ResourceRequirements.gpu(1)
cpu_req = dagron.ResourceRequirements.cpu(4)
mem_req = dagron.ResourceRequirements.memory(4096)
# Check fit
print(req.fits({"gpu": 4, "cpu": 16, "memory_mb": 32000})) # True
print(req.fits({"gpu": 1, "cpu": 16, "memory_mb": 32000})) # False (needs 2 GPUs)
ResourceSnapshot
class ResourceSnapshot(
timestamp: float,
allocated: dict[str, float],
available: dict[str, float],
node_name: str | None,
event: str,
)
A point-in-time snapshot of resource allocation. Recorded by the
ResourceTimeline.
| Property | Type | Description |
|---|---|---|
timestamp | float | Unix timestamp of the snapshot. |
allocated | dict[str, float] | Allocated resources at this point. |
available | dict[str, float] | Available resources at this point. |
node_name | `str | None` |
event | str | Event type: "acquire" or "release". |
ResourceTimeline
class ResourceTimeline
A recorded timeline of resource allocation events. Access via
ResourcePool.timeline.
record
def record(
snapshot: ResourceSnapshot,
) -> None
Manually record a snapshot (typically done automatically by the pool).
snapshots
@property
def snapshots() -> list[ResourceSnapshot]
All recorded snapshots in chronological order.
peak_utilization
def peak_utilization() -> dict[str, float]
Return the peak utilization (as a fraction 0.0-1.0) for each resource across the entire timeline.
result = executor.execute(tasks)
timeline = pool.timeline
print(f"Snapshots: {len(timeline.snapshots)}")
print(f"Peak utilization: {timeline.peak_utilization()}")
# {"gpu": 1.0, "cpu": 0.75, "memory_mb": 0.5}
for snap in timeline.snapshots:
print(f" t={snap.timestamp:.3f} {snap.event} {snap.node_name}: gpu={snap.allocated.get('gpu', 0)}")
Related
- DAGExecutor — the base executor without resource awareness.
- Distributed Execution — multi-backend execution for cluster workloads.
- Execution Plans — cost-aware scheduling at the graph level.
- Resource Scheduling guide — usage patterns and capacity planning.