Source code for cow.plugins.plugin_examples.background_plugin_example

"""
Module realizes background plugin example
"""

import asyncio
from asyncio import Task

from luna_plugins.base.plugins_meta.base_plugins import BaseBackgroundHandler, BaseBackgroundPlugin, pluginHTTPResponse


[docs] class HandlerExample(BaseBackgroundHandler): """ Handler example """
[docs] async def get(self, request): # pylint: disable=unused-argument """ Method get example. Returns: response """ return self.response(body="I am teapot", headers={"Content-Type": "text/plain"}, statusCode=418)
[docs] def anotherHandlerExample(request): # pylint: disable=unused-argument """Standalone handler example""" return pluginHTTPResponse(statusCode=200, body="T800", headers={"Content-Type": "text/plain"})
[docs] class BackgroundPluginExample(BaseBackgroundPlugin): """ Background plugin example. Create background task and add a route. """ def __init__(self, app: "LunaApplication"): super().__init__(app) self.task: Task | None = None self.temperature = 0
[docs] async def initialize(self): """ Initialize plugin """ self.addRoute("/teapot", HandlerExample) self.addRoute("/teapot/version", anotherHandlerExample, methods={"get"})
[docs] async def close(self): """ Stop background process Returns: """ if self.task: self.task.cancel()
[docs] async def usefulJob(self): """ Some useful async work """ while True: await asyncio.sleep(1) self.temperature = min(100, self.temperature + 1) if self.temperature < 100: self.app.ctx.logger.info(f"I boil water, temperature: {self.temperature}") else: self.app.ctx.logger.info("boiling water is ready, would you care for a cup of tea?")
[docs] async def start(self): """ Run background process .. warning:: The function suppose that the process is handle in this coroutine. The coroutine must start the process only without awaiting a end of the process """ self.task = asyncio.create_task(self.usefulJob())