Source code for luna_api.app.auth.auth_middleware

"""
Authorization middleware
"""
import binascii
import re

import cachetools
from luna3.accounts.accounts import AccountsApi
from luna3.common.exceptions import LunaApiException
from stringcase import camelcase
from vlutils.cache.cache import cache
from vlutils.helpers import isUUID
from werkzeug.datastructures import Authorization

from app.auth.white_resources_list import LunaAccountIdWhiteLists, WhiteLists
from classes.credentials import RequestCredentials, VerificationRequestKwargs
from classes.enums import AccountType, VisibilityArea
from classes.token import EmitEvents, Permissions
from crutches_on_wheels.cow.errors.errors import Error, ErrorInfo
from crutches_on_wheels.cow.errors.exception import VLException
from crutches_on_wheels.cow.utils.log import Logger

# cache for credentials verification response
cacheTTL = cachetools.TTLCache(maxsize=128, ttl=10)


[docs]def logWhoIsRequestBelong(logger: Logger, verificationKwargs: VerificationRequestKwargs) -> None: """ Log auth info who is request belong Args: logger: logger verificationKwargs: verification kwargs """ if (decodedToken := verificationKwargs.getDecodedToken(logger)) is not None: accountId, tokenId = decodedToken["accountId"], decodedToken["tokenId"] else: accountId, tokenId = verificationKwargs.accountId, None accountIdLog = f"account_id: '{accountId}'" if accountId is not None else "account_id: null" tokenIdLog = f" token_id: '{tokenId}'" if tokenId is not None else "" logger.info(f"Request invoked by user ({accountIdLog}{tokenIdLog})")
[docs]def getAuthenticationHeader(sendBasicAuthInfo: bool = True) -> dict[str, str]: """ Get authentication header for 401 response Args: sendBasicAuthInfo: whether to send basic auth info, otherwise bearer Returns: dict with `WWW-Authenticate` """ return {"WWW-Authenticate": "Basic realm='login:password'" if sendBasicAuthInfo else "Bearer"}
[docs]@cache(lambda: cacheTTL, keyGen=lambda accountsClient, verificationKwargs: verificationKwargs.__hash__()) async def verifyCredentials(accountsClient: AccountsApi, verificationKwargs: VerificationRequestKwargs) -> dict: """ Verify credentials Args: accountsClient: account client verificationKwargs: authorization credentials Returns: dictionary with account-type and permissions (for token validation only) """ return (await accountsClient.verifyAccount(raiseError=True, asyncRequest=True, **verificationKwargs.asDict())).json
[docs]def getVerificationRequestKwargs(request: "ApiRequest") -> VerificationRequestKwargs: """ Get kwargs for credentials verification from request Args: request: api request Returns: kwargs for credentials verification Raises: VLException(Error.BadAccountId, 400, isCriticalError=False) if specified `Luna-Account-Id` header content does not match expected VLException(Error.AccountIdAuthDisabled, 403, False) if `Luna-Account-Id` specified, but disabled by config """ # todo update/remove werkzeug after LUNA-5783 if (token := request.token) is not None: if token.lower().startswith("basic"): authData = None try: authData = Authorization.from_header(token) except binascii.Error: pass if authData is None: raise VLException( Error.AuthorizationFailed.format("Bad format basic authorization header"), 401, False, headers=getAuthenticationHeader(), ) return VerificationRequestKwargs(login=authData.username, password=authData.password) else: return VerificationRequestKwargs(token=token) if (accountId := request.headers.get("Luna-Account-Id")) is not None: if not isUUID(request.credentials.accountId): raise VLException(Error.BadAccountId, 400, isCriticalError=False) return VerificationRequestKwargs(accountId=accountId) return VerificationRequestKwargs()
[docs]async def getRequestCredentials( verificationKwargs: VerificationRequestKwargs, request: "ApiRequest" ) -> RequestCredentials: """ Verify and get request credentials Args: verificationKwargs: keyword arguments for credentials verification request request: api request Returns: verified request credentials Raises: VLException(Error.CorruptedToken, 400, False) if failed to decode jwt token """ if verificationKwargs.isEmpty: return RequestCredentials() respJson = dict() isNeedAuthCredentialsLog = True try: respJson = await verifyCredentials( accountsClient=request.app.ctx.luna3session.accountsSession.getClient(), verificationKwargs=verificationKwargs, ) except LunaApiException as exc: if exc.statusCode == 500: raise if ErrorInfo.fromDict(exc.json) != Error.AccountNotFound: isNeedAuthCredentialsLog = False raise VLException( Error.AuthorizationFailed.format(exc.json["detail"]), 401, False, headers=getAuthenticationHeader(sendBasicAuthInfo=verificationKwargs.login is not None), ) finally: if isNeedAuthCredentialsLog: verificationKwargs.accountId = respJson.get("account_id", verificationKwargs.accountId) logWhoIsRequestBelong(request.logger, verificationKwargs=verificationKwargs) accountType = AccountType(respJson["account_type"]) if verificationKwargs.login is not None: return RequestCredentials( accountId=respJson["account_id"], visibilityArea=VisibilityArea.account if accountType == AccountType.user else VisibilityArea.all, accountType=accountType, ) elif verificationKwargs.token is not None: token = verificationKwargs.getDecodedToken(request.logger) permissions = Permissions( emitEvents=EmitEvents( **{camelcase(key): value for key, value in respJson["permissions"]["emit_events"].items()} ), **{camelcase(key): value for key, value in respJson["permissions"].items() if key != "emit_events"}, ) return RequestCredentials( accountId=token["accountId"], visibilityArea=VisibilityArea(token["visibilityArea"]), permissions=permissions, accountType=accountType, ) return RequestCredentials(accountId=verificationKwargs.accountId)
[docs]def checkVerificationKwargs(request: "ApiRequest", verificationRequestKwargs: VerificationRequestKwargs) -> None: """ Check verification kwargs according due to request path, method and system configuration Args: request: api request verificationRequestKwargs: verification kwargs Raises: VLException(Error.BadHeaderAuth, 401, False) if no verification kwargs specified, service allows to use `Luna-Account-Id` header and request required credentials VLException(Error.BadHeaderAuth, 401, False) if no verification kwargs specified, service not allows use `Luna-Account-Id` header and request required credentialst VLException(Error.PermissionByTokenDenied, 403, False) if token kwargs specified, but request path and/or method not allows access using token VLException(Error.AccountIdAuthDisabled, 403, False) if `Luna-Account-Id` header if specified, but service not allows use `Luna-Account-Id` header """ exclusiveBasicAuthRequiredRoutes = (f"/{request.app.ctx.apiVersion}/account",) if verificationRequestKwargs.isEmpty: if request.path in exclusiveBasicAuthRequiredRoutes: raise VLException(Error.BadHeaderAuth, 401, False, headers=getAuthenticationHeader()) elif request.config.allowLunaAccountAuthHeader: isWhiteListMethod = request.method in LunaAccountIdWhiteLists.methods isWhiteListPath = ( request.path in LunaAccountIdWhiteLists.routes or request.path in WhiteLists.routes or re.match(WhiteLists.docsPathPattern, request.path) ) if not (isWhiteListPath or isWhiteListMethod): raise VLException(Error.BadHeaderAuth, 401, False, headers=getAuthenticationHeader()) else: isWhiteListPath = request.path in WhiteLists.routes or re.match(WhiteLists.docsPathPattern, request.path) isWhiteListMethod = request.method in WhiteLists.methods if not (isWhiteListPath or isWhiteListMethod): raise VLException(Error.BadHeaderAuth, 401, False, headers=getAuthenticationHeader()) if verificationRequestKwargs.token is not None and request.path in exclusiveBasicAuthRequiredRoutes: raise VLException(Error.PermissionByTokenDenied, 403, False) if verificationRequestKwargs.accountId and not request.config.allowLunaAccountAuthHeader: raise VLException(Error.AccountIdAuthDisabled, 403, False)
[docs]def checkCredentialsRights( verificationRequestKwargs: VerificationRequestKwargs, requestCredentials: RequestCredentials, request: "ApiRequest" ): """ Verify request credentials rights Args: verificationRequestKwargs: verification request kwargs (authorization data from headers) requestCredentials: credentials for request request: api request Raises: VLException(Error.AccountTokenPermissionError, 403, False) if specified token supposed access to any data, but account type is `user` (does not imply access to other accoounts data) VLException(Error.AccountQueryPermissionError, 403, False) if account type is `user` but specified query parameter `account_id` (imply as data filter in other luna-* services) does not match the account id from authorization/`Luna-Account-Id` header """ if verificationRequestKwargs.token is not None: if ( requestCredentials.accountType == AccountType.user and requestCredentials.visibilityArea == VisibilityArea.all ): raise VLException(Error.AccountTokenPermissionError, 403, False) if requestCredentials.accountType == AccountType.user: if (queryAccountId := request.args.get("account_id")) is not None: if queryAccountId != requestCredentials.accountId: raise VLException(Error.AccountQueryPermissionError, 403, False)
[docs]def morphRequest(request: "ApiRequest", requestCredentials: RequestCredentials) -> None: """ Morph request query/json/headers according to request method and request credentials Args: request: api request requestCredentials: verified request credentials """ isMatchRequest = request.path.startswith(WhiteLists.matcherRoutesPrefix) isVisibilityAreaIndependentRequest = request.method in ("GET", "HEAD") or isMatchRequest if not isVisibilityAreaIndependentRequest or requestCredentials.visibilityArea is None: # for backward compability with requests with `Luna-Account-Id` header request.args["account_id"] = requestCredentials.accountId elif requestCredentials.visibilityArea == VisibilityArea.account: request.args["account_id"] = requestCredentials.accountId elif not isVisibilityAreaIndependentRequest: request.args["account_id"] = None request.credentials = requestCredentials
[docs]async def authMiddleware(request: "ApiRequest"): """ Authorization middleware, check a request account id Args: request: request """ isAccountCreationRequest = request.path == WhiteLists.accountCreationRoute and request.method == "POST" isWhiteListMethod = request.method in set(WhiteLists.methods).intersection(LunaAccountIdWhiteLists.methods) if isAccountCreationRequest or isWhiteListMethod: logWhoIsRequestBelong(request.logger, verificationKwargs=VerificationRequestKwargs()) return verificationRequestKwargs: VerificationRequestKwargs = getVerificationRequestKwargs(request=request) if verificationRequestKwargs.isEmpty: logWhoIsRequestBelong(request.logger, verificationKwargs=verificationRequestKwargs) checkVerificationKwargs(verificationRequestKwargs=verificationRequestKwargs, request=request) requestCredentials = await getRequestCredentials(verificationKwargs=verificationRequestKwargs, request=request) checkCredentialsRights( verificationRequestKwargs=verificationRequestKwargs, requestCredentials=requestCredentials, request=request ) morphRequest(request=request, requestCredentials=requestCredentials)