You can find the the code used in this post here. If you just need a webhook receiver for Netbox follow the instructions in the README.md file in that repository.
This is a follow-up to my earlier blog post Netbox webhook receiver with Python, Celery and FastAPI. Here we see two major improvements over the previous iteration:
- Consolidating webhook configuration on the Netbox side into a single webhook
- Implementing a plugin system, which allows users to share their integrations.
Consolidate webhook configuration ๐
The first implementation of the task registry class from the last blog post looked like this.
class TaskRegistry:
def __init__(self):
self.registry: Dict[str, Dict[str, Set[Task]]] = {}
def register(self, model: str, action: str) -> Callable[[Callable], Callable]:
if model not in self.registry:
self.registry[model] = {}
if action not in self.registry[model]:
self.registry[model][action] = set()
def decorator(function):
self.registry[model][action].add(function)
return function
return decorator
async def execute(self, request: Request, model: str, action: str) -> Optional[List[Any]]:
try:
tasks_to_run = self.registry[model][action]
except KeyError:
logger.warning(
f"No tasks configured for model {model} and action {action}."
)
return None
return [f.delay(await request.json()) for f in tasks_to_run]
When receiving a webhook on the FastAPI side, you had to call
task_registry.execute(request, 'model', 'action')
which is redundant, because the request body already contains the
model and the action. The model and action was also present in the url (e.g. webhooks.example.local/tenant/create
),
which required specific webhook configuration on the Netbox side for every model/action combination you wanted to
utilize. Furthermore, this registry is celery-unaware, which meant that all the tasks to be run on the webhook triggers
had to be decorated with both the @celery.task
decorator as well as @task_registry.register
. The following task
registry implementation solves both of these problems (changes highlighted in bold):
class TaskRegistry:
"""
Registry for tasks to be executed when model and action conditions are met.
"""
def __init__(self, celery: Celery):
self.celery = celery
self.registry: Dict[str, Dict[str, Set[Task]]] = {}
def register(self, model: str, event: str) -> Callable[[Callable], Callable]:
"""
Decorator to register tasks in the registry.
:param model: Model to register the task for.
:param event: Event to register the task for.
:return: A decorator adding the decorated function to the registry.
"""
if model not in self.registry:
self.registry[model] = {}
if event not in self.registry[model]:
self.registry[model][event] = set()
def decorator(function: Callable):
task = self.celery.register_task(self.celery.task(function))
self.registry[model][event].add(task)
return function
return decorator
async def execute(self, request: Request) -> Optional[List[Any]]:
"""
Execute tasks in the registry that are assigned to the model and the action.
:param request: The request to pass into the task.
:param model: The model to filter the tasks.
:param action: The action to filters the tasks.
:return: A (possibly empty) list of task results or None if no tasks ran.
"""
data = await request.json()
try:
tasks_to_run = self.registry[data['model']][data['event']]
except KeyError:
logger.warning(
f"No tasks configured for model {data['model']} and action {data['event']}."
)
return None
return [f.delay(data) for f in tasks_to_run]
Implement plugin functionality ๐
We now need the following things to achieve the plugin functionality:
- Register plugin tasks without them knowing of the celery instance
- Register plugins with a root registry
- Name plugins to allow for useful logging
First, we abstract common code from the plugin and the root registry into a base class. Because the plugin registry will hold callables while the root registry holds celery tasks, we also create a typing generic:
# Generic typevar for task registries
T = TypeVar("T")
class _TaskRegistryBase(Generic[T]):
"""
Base class for all plugin and root task registries.
"""
def __init__(self):
self.registry: Dict[str, Dict[str, Set[T]]] = {}
def _ensure_present(self, model: str, event: str) -> None:
"""
Ensure the necessary dictionary keys are present
:param model: Model key to be present.
:param event: Event key to be present.
"""
if model not in self.registry:
self.registry[model] = {}
if event not in self.registry[model]:
self.registry[model][event] = set()
From this base we can inherit the root task registry, which is celery-aware and therefore holds celery tasks:
class RootTaskRegistry(_TaskRegistryBase[Task]):
"""
Registry for tasks to be executed when model and action conditions are met.
"""
Note how the class inherits _TaskRegistryBase[Task]
, this indicated that the registry holds celery tasks.
The __init__
, execute
and register
methods don’t need to be changed for this. Only a new method is added to allow
for the registration of plugin registries. This method iterates over the task registry of the plugin, adds all the
tasks to the root registry and then manually applies the register
decorator (which in turn also applies the
celery.task
which then turns the functions into tasks that can be run by celery):
def register_plugin(self, plugin: PluginTaskRegistry) -> None:
"""
Register the tasks a plugin has collected with the root registry.
:param plugin: The plugin.
"""
logger.info(f"Registering plugin with name {plugin.name}.")
for model, events in plugin.registry.items():
for event, tasks in plugin.registry[model].items():
for task in tasks:
# Manually apply decorator
self.register(model, event)(task)
Finally, the plugin registry has to be created. We pass a name
parameter to the __init__
method in order to
identify the plugin when logging and then create the register decorator. Since the class is unaware of celery, this
only registers the task in the plugin task registry to later be registered with the root registry (as evident by the
Callable
in the inherited class).
class PluginTaskRegistry(_TaskRegistryBase[Callable]):
"""
Handles plugin task registration.
Instantiate inside of plugin in order to designate tasks for registration
in the task registry.
"""
def __init__(self, name: str):
"""
:param name: Name of the plugin
"""
self.name = name
super(PluginTaskRegistry, self).__init__()
def register(self, model: str, event: str):
self._ensure_present(model, event)
def decorator(function):
self.registry[model][event].add(function)
return function
return decorator
The FastAPI app now has to import plugins dynamically. The idea for this is taken from the Python documentation itself and is similar to the way its handled by Flask. A plugin_prefix
setting is added to the Pydantic Settings object in order to denote the prefix, plugin modules are then imported as follows. Just like that, all your plugins will run on the correct webhook triggers.
class Settings(BaseSettings):
secret: str = ""
encoding: str = "utf-8"
digestmod: str = "sha512"
plugin_prefix: str = "nbintegrate_"
celery_broker: str = "redis://redis:6379"
celery_backend: str = ""
app = FastAPI(title=__file__)
settings = Settings()
celery: Celery = Celery(
__file__, broker=settings.celery_broker, backend=settings.celery_backend
)
registry: RootTaskRegistry = RootTaskRegistry(celery=celery)
[...]
# Discover plugins by name
discovered_plugins = {
name: importlib.import_module(name)
for finder, name, ispkg in pkgutil.iter_modules()
if name.startswith(settings.plugin_prefix)
}
# Register the plugins in the root registry
for name, plugin_module in discovered_plugins.items():
logger.info(f"Found plugin {name}.")
try:
registry.register_plugin(plugin_module.plugin_task_registry)
except AttributeError:
logger.error(
f'Plugin {name} does not have a member called "plugin_task_registry". '
"Not loading plugin."
)
Example plugin code ๐
Finally, we want to take a look at an actual plugin. The example task code can be found in the ngintegrate_example
folder in the git repository. Note how
there is now only one decorator on the tasks instead of two, which is much cleaner and less verbose.
import logging
from typing import Any
from webhook_receiver.task_registry import PluginTaskRegistry
logging.basicConfig()
logger = logging.getLogger(__file__)
# An object with this exact name has to be present in every plugin.
# It is automatically imported
plugin_task_registry = PluginTaskRegistry(name="Example")
@plugin_task_registry.register(model="tenant", event="created")
def example_create_tenant(request: Any) -> None:
"""
Example that fires on /tenant/create.
:param request: The result of the request.json() call on the original request.
"""
logger.warning(f"Tenant {request['data']['name']} was created.")