Source code for cow.plugins.plugin_examples.background_plugin_example
"""
Module realizes background plugin example
"""
import asyncio
from asyncio import Task
from luna_plugins.base.plugins_meta.base_plugins import BaseBackgroundHandler, BaseBackgroundPlugin, pluginHTTPResponse
[docs]
class HandlerExample(BaseBackgroundHandler):
"""
Handler example
"""
[docs]
async def get(self, request): # pylint: disable=unused-argument
"""
Method get example.
Returns:
response
"""
return self.response(body="I am teapot", headers={"Content-Type": "text/plain"}, statusCode=418)
[docs]
def anotherHandlerExample(request): # pylint: disable=unused-argument
"""Standalone handler example"""
return pluginHTTPResponse(statusCode=200, body="T800", headers={"Content-Type": "text/plain"})
[docs]
class BackgroundPluginExample(BaseBackgroundPlugin):
"""
Background plugin example.
Create background task and add a route.
"""
def __init__(self, app: "LunaApplication"):
super().__init__(app)
self.task: Task | None = None
self.temperature = 0
[docs]
async def initialize(self):
"""
Initialize plugin
"""
self.addRoute("/teapot", HandlerExample)
self.addRoute("/teapot/version", anotherHandlerExample, methods={"get"})
[docs]
async def close(self):
"""
Stop background process
Returns:
"""
if self.task:
self.task.cancel()
[docs]
async def usefulJob(self):
"""
Some useful async work
"""
while True:
await asyncio.sleep(1)
self.temperature = min(100, self.temperature + 1)
if self.temperature < 100:
self.app.ctx.logger.info(f"I boil water, temperature: {self.temperature}")
else:
self.app.ctx.logger.info("boiling water is ready, would you care for a cup of tea?")
[docs]
async def start(self):
"""
Run background process
.. warning::
The function suppose that the process is handle in this coroutine. The coroutine must start
the process only without awaiting a end of the process
"""
self.task = asyncio.create_task(self.usefulJob())