Skip to main content

Resource Scheduling

The resource scheduling module extends the standard executor with capacity-aware scheduling. Nodes declare their resource requirements (GPU, CPU, memory, or custom resources), and the executor only dispatches a node when the resource pool has sufficient capacity. This prevents oversubscription and enables scheduling of heterogeneous workloads.

See the Resource Scheduling guide for usage patterns and capacity planning.


ResourceAwareExecutor

ResourceAwareExecutor
class ResourceAwareExecutor(
dag: DAG,
resource_pool: ResourcePool,
requirements: dict[str, ResourceRequirements],
costs: dict[str, float] | None = None,
callbacks: ExecutionCallbacks | None = None,
fail_fast: bool = True,
enable_tracing: bool = False,
)

A synchronous executor that checks resource availability before dispatching each node. When a node cannot be scheduled due to insufficient resources, it blocks until resources are released by completed nodes.

ParameterTypeDefaultDescription
dagDAGrequiredThe DAG whose topology drives execution order.
resource_poolResourcePoolrequiredThe pool of available resources.
requirementsdict[str, ResourceRequirements]requiredMap of node names to their resource requirements.
costsdict[str, float] | NoneNonePer-node cost estimates for scheduling priority.
callbacksExecutionCallbacks | NoneNoneOptional lifecycle callbacks. Includes on_resource_acquired and on_resource_released.
fail_fastboolTrueIf True, skip downstream nodes when any node fails.
enable_tracingboolFalseIf True, record a Chrome-compatible execution trace.

execute

ResourceAwareExecutor.execute
def execute(
tasks: dict[str, Callable],
) -> ExecutionResult

Execute tasks with resource-aware scheduling.

ParameterTypeDefaultDescription
tasksdict[str, Callable]requiredMap of node names to callable tasks.

Returns: ExecutionResult

import dagron

dag = (
dagron.DAG.builder()
.add_node("preprocess")
.add_node("train_model_a")
.add_node("train_model_b")
.add_node("evaluate")
.add_edge("preprocess", "train_model_a")
.add_edge("preprocess", "train_model_b")
.add_edge("train_model_a", "evaluate")
.add_edge("train_model_b", "evaluate")
.build()
)

pool = dagron.ResourcePool(capacities={"gpu": 2, "cpu": 8, "memory_mb": 16000})

requirements = {
"preprocess": dagron.ResourceRequirements(resources={"cpu": 2, "memory_mb": 2000}),
"train_model_a": dagron.ResourceRequirements(resources={"gpu": 1, "cpu": 4, "memory_mb": 8000}),
"train_model_b": dagron.ResourceRequirements(resources={"gpu": 1, "cpu": 4, "memory_mb": 8000}),
"evaluate": dagron.ResourceRequirements(resources={"cpu": 2, "memory_mb": 4000}),
}

executor = dagron.ResourceAwareExecutor(dag, pool, requirements)
result = executor.execute({
"preprocess": lambda: "data ready",
"train_model_a": lambda: "model A trained",
"train_model_b": lambda: "model B trained",
"evaluate": lambda: "evaluation complete",
})

print(result.succeeded) # 4

AsyncResourceAwareExecutor

AsyncResourceAwareExecutor
class AsyncResourceAwareExecutor(
dag: DAG,
resource_pool: ResourcePool,
requirements: dict[str, ResourceRequirements],
costs: dict[str, float] | None = None,
callbacks: ExecutionCallbacks | None = None,
fail_fast: bool = True,
enable_tracing: bool = False,
)

An async variant of ResourceAwareExecutor. Same interface but tasks must be async callables.

ParameterTypeDefaultDescription
dagDAGrequiredThe DAG whose topology drives execution order.
resource_poolResourcePoolrequiredThe pool of available resources.
requirementsdict[str, ResourceRequirements]requiredMap of node names to their resource requirements.
costsdict[str, float] | NoneNonePer-node cost estimates for scheduling priority.
callbacksExecutionCallbacks | NoneNoneOptional lifecycle callbacks.
fail_fastboolTrueIf True, skip downstream nodes when any node fails.
enable_tracingboolFalseIf True, record a Chrome-compatible execution trace.

execute (async)

AsyncResourceAwareExecutor.execute
async def execute(
tasks: dict[str, Callable[..., Awaitable]],
) -> ExecutionResult

Execute async tasks with resource-aware scheduling.

import asyncio
import dagron

async def main():
executor = dagron.AsyncResourceAwareExecutor(dag, pool, requirements)
result = await executor.execute({
"preprocess": lambda: preprocess_async(),
"train_model_a": lambda: train_async("model_a"),
"train_model_b": lambda: train_async("model_b"),
"evaluate": lambda: evaluate_async(),
})
print(result.succeeded)

asyncio.run(main())

ResourcePool

ResourcePool
class ResourcePool(
capacities: dict[str, float],
)

A pool of named resources with finite capacities. Resources are acquired before a node runs and released after it completes (or fails).

ParameterTypeDefaultDescription
capacitiesdict[str, float]requiredMap of resource names to their total capacities.

can_satisfy

ResourcePool.can_satisfy
def can_satisfy(
requirements: ResourceRequirements,
) -> bool

Return True if the pool's total capacity can satisfy the given requirements (ignoring current allocation). Useful for validating requirements before execution.

try_acquire

ResourcePool.try_acquire
def try_acquire(
requirements: ResourceRequirements,
) -> bool

Attempt to acquire resources without blocking. Returns True if successful, False if insufficient resources are currently available.

acquire

ResourcePool.acquire
def acquire(
requirements: ResourceRequirements,
timeout: float | None = None,
) -> bool

Acquire resources, blocking until they become available or the timeout expires. Returns True on success, False on timeout.

ParameterTypeDefaultDescription
requirementsResourceRequirementsrequiredThe resources to acquire.
timeoutfloat | NoneNoneMaximum time to wait in seconds. None means wait indefinitely.

release

ResourcePool.release
def release(
requirements: ResourceRequirements,
) -> None

Release previously acquired resources back to the pool.

Properties

ResourcePool.capacities
@property
def capacities() -> dict[str, float]

The total capacity of each resource.

ResourcePool.available
@property
def available() -> dict[str, float]

The currently available (unallocated) amount of each resource.

ResourcePool.allocated
@property
def allocated() -> dict[str, float]

The currently allocated amount of each resource.

ResourcePool.timeline
@property
def timeline() -> ResourceTimeline

A timeline of resource allocation events for visualization and debugging.

pool = dagron.ResourcePool(capacities={"gpu": 4, "cpu": 16, "memory_mb": 32000})

print(pool.capacities) # {"gpu": 4, "cpu": 16, "memory_mb": 32000}
print(pool.available) # {"gpu": 4, "cpu": 16, "memory_mb": 32000}

req = dagron.ResourceRequirements(resources={"gpu": 2})
pool.acquire(req)
print(pool.available) # {"gpu": 2, "cpu": 16, "memory_mb": 32000}
print(pool.allocated) # {"gpu": 2, "cpu": 0, "memory_mb": 0}

pool.release(req)
print(pool.available) # {"gpu": 4, "cpu": 16, "memory_mb": 32000}

ResourceRequirements

ResourceRequirements
class ResourceRequirements(
resources: dict[str, float],
)

A set of resource requirements for a single node.

ParameterTypeDefaultDescription
resourcesdict[str, float]requiredMap of resource names to required amounts.

Convenience Constructors

ResourceRequirements.gpu
@classmethod
def gpu(n: float) -> ResourceRequirements

Create a requirement for n GPUs.

ResourceRequirements.cpu
@classmethod
def cpu(n: float) -> ResourceRequirements

Create a requirement for n CPU cores.

ResourceRequirements.memory
@classmethod
def memory(mb: float) -> ResourceRequirements

Create a requirement for mb megabytes of memory.

fits

ResourceRequirements.fits
def fits(
available: dict[str, float],
) -> bool

Return True if the given available resources can satisfy this requirement.

ParameterTypeDefaultDescription
availabledict[str, float]requiredMap of resource names to available amounts.
# Explicit construction
req = dagron.ResourceRequirements(resources={"gpu": 2, "cpu": 4, "memory_mb": 8000})

# Convenience constructors
gpu_req = dagron.ResourceRequirements.gpu(1)
cpu_req = dagron.ResourceRequirements.cpu(4)
mem_req = dagron.ResourceRequirements.memory(4096)

# Check fit
print(req.fits({"gpu": 4, "cpu": 16, "memory_mb": 32000})) # True
print(req.fits({"gpu": 1, "cpu": 16, "memory_mb": 32000})) # False (needs 2 GPUs)

ResourceSnapshot

ResourceSnapshot
class ResourceSnapshot(
timestamp: float,
allocated: dict[str, float],
available: dict[str, float],
node_name: str | None,
event: str,
)

A point-in-time snapshot of resource allocation. Recorded by the ResourceTimeline.

PropertyTypeDescription
timestampfloatUnix timestamp of the snapshot.
allocateddict[str, float]Allocated resources at this point.
availabledict[str, float]Available resources at this point.
node_name`strNone`
eventstrEvent type: "acquire" or "release".

ResourceTimeline

ResourceTimeline
class ResourceTimeline

A recorded timeline of resource allocation events. Access via ResourcePool.timeline.

record

ResourceTimeline.record
def record(
snapshot: ResourceSnapshot,
) -> None

Manually record a snapshot (typically done automatically by the pool).

snapshots

ResourceTimeline.snapshots
@property
def snapshots() -> list[ResourceSnapshot]

All recorded snapshots in chronological order.

peak_utilization

ResourceTimeline.peak_utilization
def peak_utilization() -> dict[str, float]

Return the peak utilization (as a fraction 0.0-1.0) for each resource across the entire timeline.

result = executor.execute(tasks)

timeline = pool.timeline
print(f"Snapshots: {len(timeline.snapshots)}")
print(f"Peak utilization: {timeline.peak_utilization()}")
# {"gpu": 1.0, "cpu": 0.75, "memory_mb": 0.5}

for snap in timeline.snapshots:
print(f" t={snap.timestamp:.3f} {snap.event} {snap.node_name}: gpu={snap.allocated.get('gpu', 0)}")