Plugins
The plugin system provides extensible lifecycle hooks for DAG construction
and execution. Plugins can observe and react to events such as node starts,
completions, failures, and full execution boundaries. dagron includes a
built-in DashboardPlugin that serves a live web dashboard backed by a
Rust web server.
from dagron.plugins import (
DagronPlugin,
HookRegistry,
HookEvent,
HookContext,
PluginManager,
dagron_plugin,
)
from dagron.dashboard import DashboardPlugin
HookEvent
class HookEvent(Enum):
PRE_EXECUTE = "pre_execute"
POST_EXECUTE = "post_execute"
PRE_NODE = "pre_node"
POST_NODE = "post_node"
ON_ERROR = "on_error"
PRE_BUILD = "pre_build"
POST_BUILD = "post_build"
Events that hooks can subscribe to. These correspond to lifecycle moments during DAG construction and execution.
| Event | When it fires | Context fields |
|---|---|---|
PRE_EXECUTE | Before execution starts. | dag |
POST_EXECUTE | After execution completes. | dag, execution_result |
PRE_NODE | Before a node task runs. | dag, node_name |
POST_NODE | After a node task completes. | dag, node_name, node_result |
ON_ERROR | When a node task fails. | dag, node_name, error |
PRE_BUILD | Before DAGBuilder.build(). | dag |
POST_BUILD | After DAGBuilder.build(). | dag |
HookContext
@dataclass
class HookContext:
event: HookEvent
dag: Any = None
node_name: str | None = None
node_result: Any = None
error: Exception | None = None
execution_result: Any = None
metadata: dict[str, Any] = field(default_factory=dict)
Context object passed to hook callbacks. Fields are populated based on the event type -- not all fields are set for every event.
| Parameter | Type | Default | Description |
|---|---|---|---|
event | HookEvent | required | The event that triggered this hook. |
dag | Any | None | The DAG being built or executed. |
node_name | str | None | None | Name of the current node (for node-level events). |
node_result | Any | None | The node's result (for POST_NODE). |
error | Exception | None | None | The exception (for ON_ERROR). |
execution_result | Any | None | The full execution result (for POST_EXECUTE). |
metadata | dict[str, Any] | {} | Arbitrary metadata that plugins can attach. |
HookRegistry
class HookRegistry:
def __init__(self) -> None: ...
Registry for event hooks with priority ordering. Hooks are fire-and-forget:
exceptions in callbacks are caught and issued as RuntimeWarning, never
propagated to the caller.
from dagron.plugins import HookRegistry, HookEvent, HookContext
hooks = HookRegistry()
# Register a simple logging hook
def log_node_start(ctx: HookContext):
print(f"Starting node: {ctx.node_name}")
unregister = hooks.register(HookEvent.PRE_NODE, log_node_start)
# Later, remove the hook
unregister()
Methods
HookRegistry.register
def register(
self,
event: HookEvent,
callback: Callable[[HookContext], None],
priority: int = 0,
) -> Callable[[], None]
Register a hook callback for an event. Higher priority callbacks run first.
| Parameter | Type | Default | Description |
|---|---|---|---|
event | HookEvent | required | The event to subscribe to. |
callback | Callable[[HookContext], None] | required | Function called with a HookContext when the event fires. |
priority | int | 0 | Priority ordering. Higher values run first. Callbacks with the same priority run in registration order. |
Returns: Callable[[], None] -- An unregister function. Call it to remove this hook.
# High-priority hook runs first
hooks.register(HookEvent.PRE_EXECUTE, auth_check, priority=100)
hooks.register(HookEvent.PRE_EXECUTE, log_start, priority=0)
HookRegistry.fire
def fire(self, context: HookContext) -> None
Fire all hooks registered for the context's event. Callbacks are called
in priority order (descending). Exceptions in callbacks are caught and
issued as RuntimeWarning.
| Parameter | Type | Default | Description |
|---|---|---|---|
context | HookContext | required | The context object describing the event. |
from dagron.plugins import HookContext, HookEvent
ctx = HookContext(event=HookEvent.PRE_NODE, dag=dag, node_name="extract")
hooks.fire(ctx)
HookRegistry.clear
def clear(self, event: HookEvent | None = None) -> None
Clear all hooks, or hooks for a specific event.
| Parameter | Type | Default | Description |
|---|---|---|---|
event | HookEvent | None | None | If provided, only clear hooks for this event. If None, clear all hooks for all events. |
HookRegistry.hook_count
def hook_count(self, event: HookEvent | None = None) -> int
Return the number of registered hooks.
| Parameter | Type | Default | Description |
|---|---|---|---|
event | HookEvent | None | None | If provided, count hooks for this event only. If None, count all hooks across all events. |
Returns: int -- Number of registered hooks.
DagronPlugin
class DagronPlugin(ABC):
@property
@abstractmethod
def name(self) -> str: ...
@abstractmethod
def initialize(self, hooks: HookRegistry) -> None: ...
def teardown(self) -> None: ...
Abstract base class for dagron plugins. Subclass this and implement the
name property and initialize() method. The teardown() method is
optional.
Abstract members
| Member | Type | Description |
|---|---|---|
name | property -> str | Unique name for the plugin (abstract). |
initialize(hooks) | method | Called when the plugin is initialized. Register hooks here (abstract). |
teardown() | method | Called when the plugin is torn down. Clean up resources (optional, default no-op). |
from dagron.plugins import DagronPlugin, HookRegistry, HookEvent, HookContext
class TimingPlugin(DagronPlugin):
@property
def name(self) -> str:
return "timing"
def initialize(self, hooks: HookRegistry) -> None:
self._starts: dict[str, float] = {}
def on_pre_node(ctx: HookContext):
import time
if ctx.node_name:
self._starts[ctx.node_name] = time.monotonic()
def on_post_node(ctx: HookContext):
import time
if ctx.node_name and ctx.node_name in self._starts:
elapsed = time.monotonic() - self._starts[ctx.node_name]
print(f"{ctx.node_name}: {elapsed:.3f}s")
hooks.register(HookEvent.PRE_NODE, on_pre_node)
hooks.register(HookEvent.POST_NODE, on_post_node)
def teardown(self) -> None:
self._starts.clear()
PluginManager
class PluginManager:
def __init__(
self,
hooks: HookRegistry | None = None,
) -> None: ...
Manages plugin discovery, initialization, and teardown. Shares a single
HookRegistry across all managed plugins.
| Parameter | Type | Default | Description |
|---|---|---|---|
hooks | HookRegistry | None | None | Hook registry to use. If None, a new HookRegistry is created. |
Properties
PluginManager.hooks
@property
def hooks(self) -> HookRegistry
The hook registry shared by all plugins.
PluginManager.plugins
@property
def plugins(self) -> dict[str, DagronPlugin]
Currently registered plugins, as a copy of the internal mapping.
Returns: dict[str, DagronPlugin] -- Mapping of plugin names to plugin instances.
Methods
PluginManager.register
def register(self, plugin: DagronPlugin) -> None
Register a plugin instance. If a plugin with the same name is already
registered, it is replaced with a RuntimeWarning.
| Parameter | Type | Default | Description |
|---|---|---|---|
plugin | DagronPlugin | required | The plugin instance to register. |
PluginManager.discover
def discover(self) -> list[str]
Discover plugins via entry_points(group='dagron.plugins'). Each
discovered entry point is loaded, instantiated, and registered.
Returns: list[str] -- Names of successfully discovered and registered plugins.
manager = PluginManager()
discovered = manager.discover()
print(f"Discovered plugins: {discovered}")
PluginManager.initialize_all
def initialize_all(self) -> None
Initialize all registered plugins that have not been initialized yet.
Each plugin's initialize() method is called with the shared hook registry.
Failures are caught and issued as RuntimeWarning.
PluginManager.teardown_all
def teardown_all(self) -> None
Tear down all initialized plugins by calling their teardown() methods.
Failures are caught and issued as RuntimeWarning.
from dagron.plugins import PluginManager
manager = PluginManager()
manager.register(TimingPlugin())
manager.initialize_all()
# Use manager.hooks with an executor
executor = dagron.DAGExecutor(dag, hooks=manager.hooks)
result = executor.execute(tasks)
# Clean up
manager.teardown_all()
@dagron_plugin
def dagron_plugin(cls: type) -> type
Class decorator that registers a plugin class with the global plugin
manager. The class must be a DagronPlugin subclass. It is instantiated
and registered immediately when the decorator is applied.
| Parameter | Type | Default | Description |
|---|---|---|---|
cls | type | required | A DagronPlugin subclass to register. |
Returns: type -- The class, unmodified.
Raises: TypeError -- If cls is not a DagronPlugin subclass.
from dagron.plugins import dagron_plugin, DagronPlugin, HookRegistry
@dagron_plugin
class MyPlugin(DagronPlugin):
@property
def name(self) -> str:
return "my_plugin"
def initialize(self, hooks: HookRegistry) -> None:
# Register hooks here
pass
DashboardPlugin
class DashboardPlugin(DagronPlugin):
def __init__(
self,
host: str = "127.0.0.1",
port: int = 8765,
gate_controller: GateController | None = None,
open_browser: bool = False,
) -> None: ...
A plugin that serves a live web dashboard showing real-time DAG execution status. The web server runs in Rust (axum + tokio) on a background OS thread for minimal Python overhead.
| Parameter | Type | Default | Description |
|---|---|---|---|
host | str | "127.0.0.1" | Hostname to bind the web server to. |
port | int | 8765 | Port number for the web server. |
gate_controller | GateController | None | None | Optional gate controller for approval gate UI integration. When provided, the dashboard shows waiting gates and allows approve/reject actions. |
open_browser | bool | False | Automatically open the dashboard URL in the default browser on initialization. |
The DashboardPlugin requires dagron to be built with the dashboard
Cargo feature. If the feature is not available, initialize() raises
ImportError with instructions to rebuild.
Registered hooks
The dashboard automatically registers hooks for these events:
| Event | Behavior |
|---|---|
PRE_EXECUTE | Resets the dashboard state, sends the DAG structure. |
PRE_NODE | Marks a node as "started" in the UI. |
POST_NODE | Marks a node as "completed" in the UI. |
ON_ERROR | Marks a node as "failed" with the error message. |
POST_EXECUTE | Sends final execution statistics. |
import dagron
from dagron.dashboard import DashboardPlugin
from dagron.plugins import HookRegistry
# Set up hooks and dashboard
hooks = HookRegistry()
dashboard = DashboardPlugin(port=8765, open_browser=True)
dashboard.initialize(hooks)
# Prints: Dashboard: http://127.0.0.1:8765
# Execute with hooks
executor = dagron.DAGExecutor(dag, hooks=hooks)
result = executor.execute(tasks)
# Clean up
dashboard.teardown()
Complete example
import dagron
from dagron.plugins import (
DagronPlugin,
HookRegistry,
HookEvent,
HookContext,
PluginManager,
)
# Define a custom metrics plugin
class MetricsPlugin(DagronPlugin):
@property
def name(self) -> str:
return "metrics"
def initialize(self, hooks: HookRegistry) -> None:
self.node_count = 0
self.error_count = 0
def count_nodes(ctx: HookContext):
self.node_count += 1
def count_errors(ctx: HookContext):
self.error_count += 1
hooks.register(HookEvent.POST_NODE, count_nodes)
hooks.register(HookEvent.ON_ERROR, count_errors)
def teardown(self) -> None:
print(f"Metrics: {self.node_count} nodes executed, {self.error_count} errors")
# Use with PluginManager
manager = PluginManager()
manager.register(MetricsPlugin())
manager.initialize_all()
# Build and execute
dag = (
dagron.DAG.builder()
.add_edge("extract", "transform")
.add_edge("transform", "load")
.build()
)
tasks = {
"extract": lambda: [1, 2, 3],
"transform": lambda: [2, 4, 6],
"load": lambda: "done",
}
executor = dagron.DAGExecutor(dag, hooks=manager.hooks)
result = executor.execute(tasks)
# Teardown prints metrics
manager.teardown_all()
# Metrics: 3 nodes executed, 0 errors
See also
- Execution -- the
hooksparameter onDAGExecutor. - Tracing -- structured event recording during execution.
- Reactive DAG -- push-based reactive execution with subscriptions.