Source code for luna_backport3.app.handlers.accounts_handler
from sanic.response import HTTPResponse
from app.handlers.base_handler import TOKEN_PERMISSIONS, BaseHandler
from app.handlers.handlers_mixin import HandlersMixin
from app.handlers.schemas import CREATE_ACCOUNT_SCHEMA
[docs]class AccountHandler(BaseHandler):
"""
Handler to get account information, only authorization is required.
"""
[docs] async def get(self) -> HTTPResponse:
"""
Account info handler. See `spec_get_account`_.
.. _spec_get_account:
_static/api.html#operation/getAccount
Returns:
json with email, organization name and account status
"""
isActive = await self.dbContext.checkAccountStatus(self.accountId)
account = (await self.lunaApiClient.getAccount(raiseError=True)).json
return self.success(
200,
outputJson={
"email": account["login"],
"organization_name": account["description"],
"suspended": not isActive,
},
)
[docs]class AccountsHandler(HandlersMixin, BaseHandler):
"""
Accounts handler
"""
[docs] async def post(self) -> HTTPResponse:
"""
Accounts registration. See `spec_create_account`_.
.. _spec_create_account:
_static/api.html#operation/createAccount
Returns:
json with token
"""
reqJson = self.request.json
self.validateJson(reqJson, CREATE_ACCOUNT_SCHEMA, useJsonSchema=False)
accountResponse = await self.lunaApiClient.createAccount(
login=reqJson["email"].lower(),
password=reqJson["password"],
accountType="user",
description=reqJson["organization_name"],
raiseError=True,
)
accountId = accountResponse.json["account_id"]
self.lunaApiClient.login, self.lunaApiClient.password = reqJson["email"].lower(), reqJson["password"]
tokenResponse = await self.lunaApiClient.createToken(
tokenPermissions=TOKEN_PERMISSIONS, visibilityArea="account", raiseError=True
)
tokenId = tokenResponse.json["token_id"]
token = tokenResponse.json["token"]
await self.dbContext.createAccount(accountId=accountId, tokenId=tokenId, token=token)
await self.createHandlers(accountId)
return self.success(201, outputJson={"token": tokenId})