Source code for luna_backport3.app.handlers.accounts_handler

from sanic.response import HTTPResponse

from app.handlers.base_handler import TOKEN_PERMISSIONS, BaseHandler
from app.handlers.handlers_mixin import HandlersMixin
from app.handlers.schemas import CREATE_ACCOUNT_SCHEMA


[docs]class AccountHandler(BaseHandler): """ Handler to get account information, only authorization is required. """
[docs] async def get(self) -> HTTPResponse: """ Account info handler. See `spec_get_account`_. .. _spec_get_account: _static/api.html#operation/getAccount Returns: json with email, organization name and account status """ isActive = await self.dbContext.checkAccountStatus(self.accountId) account = (await self.lunaApiClient.getAccount(raiseError=True)).json return self.success( 200, outputJson={ "email": account["login"], "organization_name": account["description"], "suspended": not isActive, }, )
[docs]class AccountsHandler(HandlersMixin, BaseHandler): """ Accounts handler """
[docs] async def post(self) -> HTTPResponse: """ Accounts registration. See `spec_create_account`_. .. _spec_create_account: _static/api.html#operation/createAccount Returns: json with token """ reqJson = self.request.json self.validateJson(reqJson, CREATE_ACCOUNT_SCHEMA, useJsonSchema=False) accountResponse = await self.lunaApiClient.createAccount( login=reqJson["email"].lower(), password=reqJson["password"], accountType="user", description=reqJson["organization_name"], raiseError=True, ) accountId = accountResponse.json["account_id"] self.lunaApiClient.login, self.lunaApiClient.password = reqJson["email"].lower(), reqJson["password"] tokenResponse = await self.lunaApiClient.createToken( tokenPermissions=TOKEN_PERMISSIONS, visibilityArea="account", raiseError=True ) tokenId = tokenResponse.json["token_id"] token = tokenResponse.json["token"] await self.dbContext.createAccount(accountId=accountId, tokenId=tokenId, token=token) await self.createHandlers(accountId) return self.success(201, outputJson={"token": tokenId})