Approval Gates
Approval gates pause DAG execution at specific nodes until a human (or external system) explicitly approves or rejects the continuation. This enables human-in-the-loop workflows such as deployment approvals, data quality sign-offs, and compliance checks.
Gates integrate with any executor via ExecutionCallbacks and provide both synchronous and asynchronous waiting interfaces.
See the Approval Gates guide for end-to-end workflow patterns.
ApprovalGate
class ApprovalGate(
timeout: float | None = None,
auto_approve: bool = False,
)
A single approval gate that can be attached to a node. The gate starts in
PENDING state, transitions to WAITING when the executor reaches it, and
resolves to APPROVED, REJECTED, or TIMED_OUT.
| Parameter | Type | Default | Description |
|---|---|---|---|
timeout | float | None | None | Timeout in seconds. If not approved or rejected within this time, the gate times out. |
auto_approve | bool | False | If True, the gate is automatically approved when reached (useful for testing). |
approve
def approve() -> None
Approve the gate, allowing execution to proceed past this point.
reject
def reject(reason: str | None = None) -> None
Reject the gate, causing the gated node to fail with a GateRejectedError.
| Parameter | Type | Default | Description |
|---|---|---|---|
reason | str | None | None | Optional human-readable reason for rejection. |
wait_sync
def wait_sync() -> None
Block the current thread until the gate is resolved (approved, rejected, or
timed out). Raises GateRejectedError on rejection and GateTimeoutError on
timeout.
wait_async
async def wait_async() -> None
Await gate resolution in an async context. Raises GateRejectedError on
rejection and GateTimeoutError on timeout.
reset
def reset() -> None
Reset the gate to PENDING state for reuse.
status
@property
def status() -> GateStatus
The current gate status.
reason
@property
def reason() -> str | None
The rejection reason, or None if the gate was not rejected.
import dagron
import threading
gate = dagron.ApprovalGate(timeout=60.0)
# In another thread or process:
def approval_ui():
input("Press Enter to approve deployment...")
gate.approve()
threading.Thread(target=approval_ui, daemon=True).start()
# In the task:
gate.wait_sync() # blocks until approved
print(f"Gate status: {gate.status}") # GateStatus.APPROVED
GateController
class GateController(
gates: dict[str, ApprovalGate] | None = None,
)
A centralized controller for managing multiple gates. Provides a single interface for approving, rejecting, and querying the status of all gates in a pipeline.
| Parameter | Type | Default | Description |
|---|---|---|---|
gates | dict[str, ApprovalGate] | None | None | Initial map of gate names to ApprovalGate instances. |
add_gate
def add_gate(
name: str,
gate: ApprovalGate | None = None,
) -> ApprovalGate
Add a gate to the controller. If gate is None, a new default ApprovalGate
is created. Returns the gate instance.
| Parameter | Type | Default | Description |
|---|---|---|---|
name | str | required | Unique name for the gate. |
gate | ApprovalGate | None | None | An existing gate instance, or None to create a default one. |
approve
def approve(name: str) -> None
Approve a named gate.
reject
def reject(name: str, reason: str | None = None) -> None
Reject a named gate.
status
def status(name: str) -> GateStatus
Return the status of a named gate.
waiting_gates
def waiting_gates() -> list[str]
Return the names of all gates currently in WAITING status.
get_gate
def get_gate(name: str) -> ApprovalGate
Return the ApprovalGate instance for a named gate.
has_gate
def has_gate(name: str) -> bool
Return True if a gate with the given name exists.
wait_sync
def wait_sync(name: str) -> None
Block until a named gate is resolved.
wait_async
async def wait_async(name: str) -> None
Await resolution of a named gate.
reset_all
def reset_all() -> None
Reset all gates to PENDING status.
import dagron
controller = dagron.GateController()
controller.add_gate("qa_review", dagron.ApprovalGate(timeout=300))
controller.add_gate("deploy_prod", dagron.ApprovalGate(timeout=600))
# Check what's waiting
print(controller.waiting_gates()) # []
# Later, in a webhook handler:
controller.approve("qa_review")
print(controller.status("qa_review")) # GateStatus.APPROVED
controller.reject("deploy_prod", reason="Failed canary check")
print(controller.status("deploy_prod")) # GateStatus.REJECTED
GateStatus
class GateStatus(enum.Enum):
PENDING = "pending"
WAITING = "waiting"
APPROVED = "approved"
REJECTED = "rejected"
TIMED_OUT = "timed_out"
Enumeration of gate states.
| Value | Description |
|---|---|
PENDING | Gate has been created but execution has not reached it yet. |
WAITING | Execution has reached the gate and is waiting for approval. |
APPROVED | Gate was approved; execution proceeds. |
REJECTED | Gate was rejected; the gated node fails. |
TIMED_OUT | Gate was not resolved before its timeout expired. |
GateRejectedError
class GateRejectedError(DagronError):
gate_name: str
reason: str | None
Raised when a gate is rejected. The gated node's task will receive this as its
exception, and it will appear in the node's NodeResult.error.
| Parameter | Type | Default | Description |
|---|---|---|---|
gate_name | str | required | Name of the rejected gate. |
reason | str | None | required | Optional rejection reason. |
try:
gate.wait_sync()
except dagron.GateRejectedError as e:
print(f"Gate '{e.gate_name}' rejected: {e.reason}")
GateTimeoutError
class GateTimeoutError(DagronError):
gate_name: str
timeout: float
Raised when a gate times out before being approved or rejected.
| Parameter | Type | Default | Description |
|---|---|---|---|
gate_name | str | required | Name of the timed-out gate. |
timeout | float | required | The timeout value in seconds that was exceeded. |
try:
gate.wait_sync()
except dagron.GateTimeoutError as e:
print(f"Gate '{e.gate_name}' timed out after {e.timeout}s")
Complete Example: Deployment Pipeline
A deployment pipeline with QA approval and production deployment gates:
import dagron
import threading
# Build the DAG
dag = (
dagron.DAG.builder()
.add_node("build")
.add_node("test")
.add_node("qa_gate")
.add_node("deploy_staging")
.add_node("prod_gate")
.add_node("deploy_prod")
.add_edge("build", "test")
.add_edge("test", "qa_gate")
.add_edge("qa_gate", "deploy_staging")
.add_edge("deploy_staging", "prod_gate")
.add_edge("prod_gate", "deploy_prod")
.build()
)
# Set up gates
controller = dagron.GateController()
qa_gate = controller.add_gate("qa_gate", dagron.ApprovalGate(timeout=3600))
prod_gate = controller.add_gate("prod_gate", dagron.ApprovalGate(timeout=7200))
# Define tasks
tasks = {
"build": lambda: "artifact-v1.2.3",
"test": lambda: "42 tests passed",
"qa_gate": lambda: qa_gate.wait_sync(),
"deploy_staging": lambda: "deployed to staging",
"prod_gate": lambda: prod_gate.wait_sync(),
"deploy_prod": lambda: "deployed to production",
}
# Simulate external approval (in production, this would be a web UI or API)
def simulate_approvals():
import time
time.sleep(2)
print("QA approved!")
controller.approve("qa_gate")
time.sleep(2)
print("Prod approved!")
controller.approve("prod_gate")
threading.Thread(target=simulate_approvals, daemon=True).start()
# Execute with gate callbacks
result = dagron.DAGExecutor(
dag,
callbacks=dagron.ExecutionCallbacks(
on_gate_waiting=lambda name: print(f"Waiting for gate: {name}"),
on_gate_resolved=lambda name, status: print(f"Gate {name}: {status}"),
),
).execute(tasks)
print(f"\nPipeline: {result.succeeded} succeeded, {result.failed} failed")
Related
- DAGExecutor — the executor that integrates with gates via callbacks.
- Conditional Execution — automated branching (no human involved).
- Checkpointing — save and resume after gate rejection.
- Approval Gates guide — end-to-end workflow patterns.