Skip to main content

Plugins

The plugin system provides extensible lifecycle hooks for DAG construction and execution. Plugins can observe and react to events such as node starts, completions, failures, and full execution boundaries. dagron includes a built-in DashboardPlugin that serves a live web dashboard backed by a Rust web server.

from dagron.plugins import (
DagronPlugin,
HookRegistry,
HookEvent,
HookContext,
PluginManager,
dagron_plugin,
)
from dagron.dashboard import DashboardPlugin

HookEvent

HookEvent
class HookEvent(Enum):
PRE_EXECUTE = "pre_execute"
POST_EXECUTE = "post_execute"
PRE_NODE = "pre_node"
POST_NODE = "post_node"
ON_ERROR = "on_error"
PRE_BUILD = "pre_build"
POST_BUILD = "post_build"

Events that hooks can subscribe to. These correspond to lifecycle moments during DAG construction and execution.

EventWhen it firesContext fields
PRE_EXECUTEBefore execution starts.dag
POST_EXECUTEAfter execution completes.dag, execution_result
PRE_NODEBefore a node task runs.dag, node_name
POST_NODEAfter a node task completes.dag, node_name, node_result
ON_ERRORWhen a node task fails.dag, node_name, error
PRE_BUILDBefore DAGBuilder.build().dag
POST_BUILDAfter DAGBuilder.build().dag

HookContext

HookContext
@dataclass
class HookContext:
event: HookEvent
dag: Any = None
node_name: str | None = None
node_result: Any = None
error: Exception | None = None
execution_result: Any = None
metadata: dict[str, Any] = field(default_factory=dict)

Context object passed to hook callbacks. Fields are populated based on the event type -- not all fields are set for every event.

ParameterTypeDefaultDescription
eventHookEventrequiredThe event that triggered this hook.
dagAnyNoneThe DAG being built or executed.
node_namestr | NoneNoneName of the current node (for node-level events).
node_resultAnyNoneThe node's result (for POST_NODE).
errorException | NoneNoneThe exception (for ON_ERROR).
execution_resultAnyNoneThe full execution result (for POST_EXECUTE).
metadatadict[str, Any]{}Arbitrary metadata that plugins can attach.

HookRegistry

HookRegistry
class HookRegistry:
def __init__(self) -> None: ...

Registry for event hooks with priority ordering. Hooks are fire-and-forget: exceptions in callbacks are caught and issued as RuntimeWarning, never propagated to the caller.

from dagron.plugins import HookRegistry, HookEvent, HookContext

hooks = HookRegistry()

# Register a simple logging hook
def log_node_start(ctx: HookContext):
print(f"Starting node: {ctx.node_name}")

unregister = hooks.register(HookEvent.PRE_NODE, log_node_start)

# Later, remove the hook
unregister()

Methods


HookRegistry.register

HookRegistry.register
def register(
self,
event: HookEvent,
callback: Callable[[HookContext], None],
priority: int = 0,
) -> Callable[[], None]

Register a hook callback for an event. Higher priority callbacks run first.

ParameterTypeDefaultDescription
eventHookEventrequiredThe event to subscribe to.
callbackCallable[[HookContext], None]requiredFunction called with a HookContext when the event fires.
priorityint0Priority ordering. Higher values run first. Callbacks with the same priority run in registration order.

Returns: Callable[[], None] -- An unregister function. Call it to remove this hook.

# High-priority hook runs first
hooks.register(HookEvent.PRE_EXECUTE, auth_check, priority=100)
hooks.register(HookEvent.PRE_EXECUTE, log_start, priority=0)

HookRegistry.fire

HookRegistry.fire
def fire(self, context: HookContext) -> None

Fire all hooks registered for the context's event. Callbacks are called in priority order (descending). Exceptions in callbacks are caught and issued as RuntimeWarning.

ParameterTypeDefaultDescription
contextHookContextrequiredThe context object describing the event.
from dagron.plugins import HookContext, HookEvent

ctx = HookContext(event=HookEvent.PRE_NODE, dag=dag, node_name="extract")
hooks.fire(ctx)

HookRegistry.clear

HookRegistry.clear
def clear(self, event: HookEvent | None = None) -> None

Clear all hooks, or hooks for a specific event.

ParameterTypeDefaultDescription
eventHookEvent | NoneNoneIf provided, only clear hooks for this event. If None, clear all hooks for all events.

HookRegistry.hook_count

HookRegistry.hook_count
def hook_count(self, event: HookEvent | None = None) -> int

Return the number of registered hooks.

ParameterTypeDefaultDescription
eventHookEvent | NoneNoneIf provided, count hooks for this event only. If None, count all hooks across all events.

Returns: int -- Number of registered hooks.


DagronPlugin

DagronPlugin
class DagronPlugin(ABC):
@property
@abstractmethod
def name(self) -> str: ...

@abstractmethod
def initialize(self, hooks: HookRegistry) -> None: ...

def teardown(self) -> None: ...

Abstract base class for dagron plugins. Subclass this and implement the name property and initialize() method. The teardown() method is optional.

Abstract members

MemberTypeDescription
nameproperty -> strUnique name for the plugin (abstract).
initialize(hooks)methodCalled when the plugin is initialized. Register hooks here (abstract).
teardown()methodCalled when the plugin is torn down. Clean up resources (optional, default no-op).
from dagron.plugins import DagronPlugin, HookRegistry, HookEvent, HookContext

class TimingPlugin(DagronPlugin):
@property
def name(self) -> str:
return "timing"

def initialize(self, hooks: HookRegistry) -> None:
self._starts: dict[str, float] = {}

def on_pre_node(ctx: HookContext):
import time
if ctx.node_name:
self._starts[ctx.node_name] = time.monotonic()

def on_post_node(ctx: HookContext):
import time
if ctx.node_name and ctx.node_name in self._starts:
elapsed = time.monotonic() - self._starts[ctx.node_name]
print(f"{ctx.node_name}: {elapsed:.3f}s")

hooks.register(HookEvent.PRE_NODE, on_pre_node)
hooks.register(HookEvent.POST_NODE, on_post_node)

def teardown(self) -> None:
self._starts.clear()

PluginManager

PluginManager
class PluginManager:
def __init__(
self,
hooks: HookRegistry | None = None,
) -> None: ...

Manages plugin discovery, initialization, and teardown. Shares a single HookRegistry across all managed plugins.

ParameterTypeDefaultDescription
hooksHookRegistry | NoneNoneHook registry to use. If None, a new HookRegistry is created.

Properties


PluginManager.hooks

PluginManager.hooks
@property
def hooks(self) -> HookRegistry

The hook registry shared by all plugins.


PluginManager.plugins

PluginManager.plugins
@property
def plugins(self) -> dict[str, DagronPlugin]

Currently registered plugins, as a copy of the internal mapping.

Returns: dict[str, DagronPlugin] -- Mapping of plugin names to plugin instances.

Methods


PluginManager.register

PluginManager.register
def register(self, plugin: DagronPlugin) -> None

Register a plugin instance. If a plugin with the same name is already registered, it is replaced with a RuntimeWarning.

ParameterTypeDefaultDescription
pluginDagronPluginrequiredThe plugin instance to register.

PluginManager.discover

PluginManager.discover
def discover(self) -> list[str]

Discover plugins via entry_points(group='dagron.plugins'). Each discovered entry point is loaded, instantiated, and registered.

Returns: list[str] -- Names of successfully discovered and registered plugins.

manager = PluginManager()
discovered = manager.discover()
print(f"Discovered plugins: {discovered}")

PluginManager.initialize_all

PluginManager.initialize_all
def initialize_all(self) -> None

Initialize all registered plugins that have not been initialized yet. Each plugin's initialize() method is called with the shared hook registry. Failures are caught and issued as RuntimeWarning.


PluginManager.teardown_all

PluginManager.teardown_all
def teardown_all(self) -> None

Tear down all initialized plugins by calling their teardown() methods. Failures are caught and issued as RuntimeWarning.

from dagron.plugins import PluginManager

manager = PluginManager()
manager.register(TimingPlugin())
manager.initialize_all()

# Use manager.hooks with an executor
executor = dagron.DAGExecutor(dag, hooks=manager.hooks)
result = executor.execute(tasks)

# Clean up
manager.teardown_all()

@dagron_plugin

dagron_plugin
def dagron_plugin(cls: type) -> type

Class decorator that registers a plugin class with the global plugin manager. The class must be a DagronPlugin subclass. It is instantiated and registered immediately when the decorator is applied.

ParameterTypeDefaultDescription
clstyperequiredA DagronPlugin subclass to register.

Returns: type -- The class, unmodified.

Raises: TypeError -- If cls is not a DagronPlugin subclass.

from dagron.plugins import dagron_plugin, DagronPlugin, HookRegistry

@dagron_plugin
class MyPlugin(DagronPlugin):
@property
def name(self) -> str:
return "my_plugin"

def initialize(self, hooks: HookRegistry) -> None:
# Register hooks here
pass

DashboardPlugin

DashboardPlugin
class DashboardPlugin(DagronPlugin):
def __init__(
self,
host: str = "127.0.0.1",
port: int = 8765,
gate_controller: GateController | None = None,
open_browser: bool = False,
) -> None: ...

A plugin that serves a live web dashboard showing real-time DAG execution status. The web server runs in Rust (axum + tokio) on a background OS thread for minimal Python overhead.

ParameterTypeDefaultDescription
hoststr"127.0.0.1"Hostname to bind the web server to.
portint8765Port number for the web server.
gate_controllerGateController | NoneNoneOptional gate controller for approval gate UI integration. When provided, the dashboard shows waiting gates and allows approve/reject actions.
open_browserboolFalseAutomatically open the dashboard URL in the default browser on initialization.
note

The DashboardPlugin requires dagron to be built with the dashboard Cargo feature. If the feature is not available, initialize() raises ImportError with instructions to rebuild.

Registered hooks

The dashboard automatically registers hooks for these events:

EventBehavior
PRE_EXECUTEResets the dashboard state, sends the DAG structure.
PRE_NODEMarks a node as "started" in the UI.
POST_NODEMarks a node as "completed" in the UI.
ON_ERRORMarks a node as "failed" with the error message.
POST_EXECUTESends final execution statistics.
import dagron
from dagron.dashboard import DashboardPlugin
from dagron.plugins import HookRegistry

# Set up hooks and dashboard
hooks = HookRegistry()
dashboard = DashboardPlugin(port=8765, open_browser=True)
dashboard.initialize(hooks)
# Prints: Dashboard: http://127.0.0.1:8765

# Execute with hooks
executor = dagron.DAGExecutor(dag, hooks=hooks)
result = executor.execute(tasks)

# Clean up
dashboard.teardown()

Complete example

import dagron
from dagron.plugins import (
DagronPlugin,
HookRegistry,
HookEvent,
HookContext,
PluginManager,
)

# Define a custom metrics plugin
class MetricsPlugin(DagronPlugin):
@property
def name(self) -> str:
return "metrics"

def initialize(self, hooks: HookRegistry) -> None:
self.node_count = 0
self.error_count = 0

def count_nodes(ctx: HookContext):
self.node_count += 1

def count_errors(ctx: HookContext):
self.error_count += 1

hooks.register(HookEvent.POST_NODE, count_nodes)
hooks.register(HookEvent.ON_ERROR, count_errors)

def teardown(self) -> None:
print(f"Metrics: {self.node_count} nodes executed, {self.error_count} errors")

# Use with PluginManager
manager = PluginManager()
manager.register(MetricsPlugin())
manager.initialize_all()

# Build and execute
dag = (
dagron.DAG.builder()
.add_edge("extract", "transform")
.add_edge("transform", "load")
.build()
)

tasks = {
"extract": lambda: [1, 2, 3],
"transform": lambda: [2, 4, 6],
"load": lambda: "done",
}

executor = dagron.DAGExecutor(dag, hooks=manager.hooks)
result = executor.execute(tasks)

# Teardown prints metrics
manager.teardown_all()
# Metrics: 3 nodes executed, 0 errors

See also

  • Execution -- the hooks parameter on DAGExecutor.
  • Tracing -- structured event recording during execution.
  • Reactive DAG -- push-based reactive execution with subscriptions.