Skip to main content

Approval Gates

Approval gates pause DAG execution at specific nodes until a human (or external system) explicitly approves or rejects the continuation. This enables human-in-the-loop workflows such as deployment approvals, data quality sign-offs, and compliance checks.

Gates integrate with any executor via ExecutionCallbacks and provide both synchronous and asynchronous waiting interfaces.

See the Approval Gates guide for end-to-end workflow patterns.


ApprovalGate

ApprovalGate
class ApprovalGate(
timeout: float | None = None,
auto_approve: bool = False,
)

A single approval gate that can be attached to a node. The gate starts in PENDING state, transitions to WAITING when the executor reaches it, and resolves to APPROVED, REJECTED, or TIMED_OUT.

ParameterTypeDefaultDescription
timeoutfloat | NoneNoneTimeout in seconds. If not approved or rejected within this time, the gate times out.
auto_approveboolFalseIf True, the gate is automatically approved when reached (useful for testing).

approve

ApprovalGate.approve
def approve() -> None

Approve the gate, allowing execution to proceed past this point.

reject

ApprovalGate.reject
def reject(reason: str | None = None) -> None

Reject the gate, causing the gated node to fail with a GateRejectedError.

ParameterTypeDefaultDescription
reasonstr | NoneNoneOptional human-readable reason for rejection.

wait_sync

ApprovalGate.wait_sync
def wait_sync() -> None

Block the current thread until the gate is resolved (approved, rejected, or timed out). Raises GateRejectedError on rejection and GateTimeoutError on timeout.

wait_async

ApprovalGate.wait_async
async def wait_async() -> None

Await gate resolution in an async context. Raises GateRejectedError on rejection and GateTimeoutError on timeout.

reset

ApprovalGate.reset
def reset() -> None

Reset the gate to PENDING state for reuse.

status

ApprovalGate.status
@property
def status() -> GateStatus

The current gate status.

reason

ApprovalGate.reason
@property
def reason() -> str | None

The rejection reason, or None if the gate was not rejected.

import dagron
import threading

gate = dagron.ApprovalGate(timeout=60.0)

# In another thread or process:
def approval_ui():
input("Press Enter to approve deployment...")
gate.approve()

threading.Thread(target=approval_ui, daemon=True).start()

# In the task:
gate.wait_sync() # blocks until approved
print(f"Gate status: {gate.status}") # GateStatus.APPROVED

GateController

GateController
class GateController(
gates: dict[str, ApprovalGate] | None = None,
)

A centralized controller for managing multiple gates. Provides a single interface for approving, rejecting, and querying the status of all gates in a pipeline.

ParameterTypeDefaultDescription
gatesdict[str, ApprovalGate] | NoneNoneInitial map of gate names to ApprovalGate instances.

add_gate

GateController.add_gate
def add_gate(
name: str,
gate: ApprovalGate | None = None,
) -> ApprovalGate

Add a gate to the controller. If gate is None, a new default ApprovalGate is created. Returns the gate instance.

ParameterTypeDefaultDescription
namestrrequiredUnique name for the gate.
gateApprovalGate | NoneNoneAn existing gate instance, or None to create a default one.

approve

GateController.approve
def approve(name: str) -> None

Approve a named gate.

reject

GateController.reject
def reject(name: str, reason: str | None = None) -> None

Reject a named gate.

status

GateController.status
def status(name: str) -> GateStatus

Return the status of a named gate.

waiting_gates

GateController.waiting_gates
def waiting_gates() -> list[str]

Return the names of all gates currently in WAITING status.

get_gate

GateController.get_gate
def get_gate(name: str) -> ApprovalGate

Return the ApprovalGate instance for a named gate.

has_gate

GateController.has_gate
def has_gate(name: str) -> bool

Return True if a gate with the given name exists.

wait_sync

GateController.wait_sync
def wait_sync(name: str) -> None

Block until a named gate is resolved.

wait_async

GateController.wait_async
async def wait_async(name: str) -> None

Await resolution of a named gate.

reset_all

GateController.reset_all
def reset_all() -> None

Reset all gates to PENDING status.

import dagron

controller = dagron.GateController()
controller.add_gate("qa_review", dagron.ApprovalGate(timeout=300))
controller.add_gate("deploy_prod", dagron.ApprovalGate(timeout=600))

# Check what's waiting
print(controller.waiting_gates()) # []

# Later, in a webhook handler:
controller.approve("qa_review")
print(controller.status("qa_review")) # GateStatus.APPROVED

controller.reject("deploy_prod", reason="Failed canary check")
print(controller.status("deploy_prod")) # GateStatus.REJECTED

GateStatus

GateStatus
class GateStatus(enum.Enum):
PENDING = "pending"
WAITING = "waiting"
APPROVED = "approved"
REJECTED = "rejected"
TIMED_OUT = "timed_out"

Enumeration of gate states.

ValueDescription
PENDINGGate has been created but execution has not reached it yet.
WAITINGExecution has reached the gate and is waiting for approval.
APPROVEDGate was approved; execution proceeds.
REJECTEDGate was rejected; the gated node fails.
TIMED_OUTGate was not resolved before its timeout expired.

GateRejectedError

GateRejectedError
class GateRejectedError(DagronError):
gate_name: str
reason: str | None

Raised when a gate is rejected. The gated node's task will receive this as its exception, and it will appear in the node's NodeResult.error.

ParameterTypeDefaultDescription
gate_namestrrequiredName of the rejected gate.
reasonstr | NonerequiredOptional rejection reason.
try:
gate.wait_sync()
except dagron.GateRejectedError as e:
print(f"Gate '{e.gate_name}' rejected: {e.reason}")

GateTimeoutError

GateTimeoutError
class GateTimeoutError(DagronError):
gate_name: str
timeout: float

Raised when a gate times out before being approved or rejected.

ParameterTypeDefaultDescription
gate_namestrrequiredName of the timed-out gate.
timeoutfloatrequiredThe timeout value in seconds that was exceeded.
try:
gate.wait_sync()
except dagron.GateTimeoutError as e:
print(f"Gate '{e.gate_name}' timed out after {e.timeout}s")

Complete Example: Deployment Pipeline

A deployment pipeline with QA approval and production deployment gates:

import dagron
import threading

# Build the DAG
dag = (
dagron.DAG.builder()
.add_node("build")
.add_node("test")
.add_node("qa_gate")
.add_node("deploy_staging")
.add_node("prod_gate")
.add_node("deploy_prod")
.add_edge("build", "test")
.add_edge("test", "qa_gate")
.add_edge("qa_gate", "deploy_staging")
.add_edge("deploy_staging", "prod_gate")
.add_edge("prod_gate", "deploy_prod")
.build()
)

# Set up gates
controller = dagron.GateController()
qa_gate = controller.add_gate("qa_gate", dagron.ApprovalGate(timeout=3600))
prod_gate = controller.add_gate("prod_gate", dagron.ApprovalGate(timeout=7200))

# Define tasks
tasks = {
"build": lambda: "artifact-v1.2.3",
"test": lambda: "42 tests passed",
"qa_gate": lambda: qa_gate.wait_sync(),
"deploy_staging": lambda: "deployed to staging",
"prod_gate": lambda: prod_gate.wait_sync(),
"deploy_prod": lambda: "deployed to production",
}

# Simulate external approval (in production, this would be a web UI or API)
def simulate_approvals():
import time
time.sleep(2)
print("QA approved!")
controller.approve("qa_gate")
time.sleep(2)
print("Prod approved!")
controller.approve("prod_gate")

threading.Thread(target=simulate_approvals, daemon=True).start()

# Execute with gate callbacks
result = dagron.DAGExecutor(
dag,
callbacks=dagron.ExecutionCallbacks(
on_gate_waiting=lambda name: print(f"Waiting for gate: {name}"),
on_gate_resolved=lambda name, status: print(f"Gate {name}: {status}"),
),
).execute(tasks)

print(f"\nPipeline: {result.succeeded} succeeded, {result.failed} failed")