Source code for luna_api.app.auth.auth_middleware

"""
Authorization middleware
"""
import re
from dataclasses import asdict

import cachetools
import jwt
from luna3.accounts.accounts import AccountsApi
from luna3.common.exceptions import LunaApiException
from stringcase import camelcase
from vlutils.cache.cache import cache
from vlutils.helpers import isUUID
from werkzeug.http import parse_authorization_header

from app.auth.white_resources_list import LunaAccountIdWhiteLists, WhiteLists
from classes.credentials import RequestCredentials, VerificationRequestKwargs
from classes.enums import AccountType, VisibilityArea
from classes.token import EmitEvents, Permissions
from crutches_on_wheels.cow.errors.errors import Error
from crutches_on_wheels.cow.errors.exception import VLException

# cache for credentials verification response
cacheTTL = cachetools.TTLCache(maxsize=128, ttl=10)


[docs]def getAuthenticationHeader(sendBasicAuthInfo: bool = True) -> dict[str, str]: """ Get authentication header for 401 response Args: sendBasicAuthInfo: whether to send basic auth info, otherwise bearer Returns: dict with `WWW-Authenticate` """ return {"WWW-Authenticate": "Basic realm='login:password'" if sendBasicAuthInfo else "Bearer"}
[docs]@cache(lambda: cacheTTL, keyGen=lambda accountsClient, verificationKwargs: verificationKwargs.__hash__()) async def verifyCredentials(accountsClient: AccountsApi, verificationKwargs: VerificationRequestKwargs) -> dict: """ Verify credentials Args: accountsClient: account client verificationKwargs: authorization credentials Returns: dictionary with account-type and permissions (for token validation only) """ return (await accountsClient.verifyAccount(raiseError=True, asyncRequest=True, **asdict(verificationKwargs))).json
[docs]def getVerificationRequestKwargs(request: "ApiRequest") -> VerificationRequestKwargs: """ Get kwargs for credentials verification from request Args: request: api request Returns: kwargs for credentials verification Raises: VLException(Error.BadAccountId, 400, isCriticalError=False) if specified `Luna-Account-Id` header content does not match expected VLException(Error.AccountIdAuthDisabled, 403, False) if `Luna-Account-Id` specified, but disabled by config """ # todo update/remove werkzeug after LUNA-5783 if (token := request.token) is not None: if token.lower().startswith("basic"): if (authData := parse_authorization_header(token)) is None: raise VLException( Error.AuthorizationFailed.format("Bad format basic authorization header"), 401, False, headers=getAuthenticationHeader(), ) return VerificationRequestKwargs(login=authData.username, password=authData.password) else: return VerificationRequestKwargs(token=token) if (accountId := request.headers.get("Luna-Account-Id")) is not None: if not isUUID(request.credentials.accountId): raise VLException(Error.BadAccountId, 400, isCriticalError=False) return VerificationRequestKwargs(accountId=accountId) return VerificationRequestKwargs()
[docs]async def getRequestCredentials( verificationKwargs: VerificationRequestKwargs, request: "ApiRequest" ) -> RequestCredentials: """ Verify and get request credentials Args: verificationKwargs: keyword arguments for credentials verification request request: api request Returns: verified request credentials Raises: VLException(Error.CorruptedToken, 400, False) if failed to decode jwt token """ if verificationKwargs.isEmpty: return RequestCredentials() try: respJson = await verifyCredentials( accountsClient=request.app.ctx.luna3session.accountsSession.getClient(), verificationKwargs=verificationKwargs, ) except LunaApiException as exc: raise VLException( Error.AuthorizationFailed.format(exc.json["detail"]), 401, False, headers=getAuthenticationHeader(sendBasicAuthInfo=verificationKwargs.login is not None), ) accountType = AccountType(respJson["account_type"]) if verificationKwargs.login is not None: return RequestCredentials( accountId=respJson["account_id"], visibilityArea=VisibilityArea.account if accountType == AccountType.user else VisibilityArea.all, accountType=accountType, ) elif verificationKwargs.token is not None: try: token = jwt.decode(verificationKwargs.token, algorithms=["HS256"], options={"verify_signature": False}) except jwt.PyJWTError: request.logger.exception() raise VLException(Error.CorruptedToken, 401, False, headers=getAuthenticationHeader(False)) permissions = Permissions( emitEvents=EmitEvents( **{camelcase(key): value for key, value in respJson["permissions"]["emit_events"].items()} ), **{camelcase(key): value for key, value in respJson["permissions"].items() if key != "emit_events"}, ) return RequestCredentials( accountId=token["accountId"], visibilityArea=VisibilityArea(token["visibilityArea"]), permissions=permissions, accountType=accountType, ) return RequestCredentials(accountId=verificationKwargs.accountId)
[docs]def checkVerificationKwargs(request: "ApiRequest", verificationRequestKwargs: VerificationRequestKwargs) -> None: """ Check verification kwargs according due to request path, method and system configuration Args: request: api request verificationRequestKwargs: verification kwargs Raises: VLException(Error.BadHeaderAuth, 401, False) if no verification kwargs specified, service allows to use `Luna-Account-Id` header and request required credentials VLException(Error.BadHeaderAuth, 401, False) if no verification kwargs specified, service not allows use `Luna-Account-Id` header and request required credentialst VLException(Error.PermissionByTokenDenied, 403, False) if token kwargs specified, but request path and/or method not allows access using token VLException(Error.AccountIdAuthDisabled, 403, False) if `Luna-Account-Id` header if specified, but service not allows use `Luna-Account-Id` header """ exclusiveBasicAuthRequiredRoutes = (f"/{request.app.ctx.apiVersion}/account",) if verificationRequestKwargs.isEmpty: if request.path in exclusiveBasicAuthRequiredRoutes: raise VLException(Error.BadHeaderAuth, 401, False, headers=getAuthenticationHeader()) elif request.config.allowLunaAccountAuthHeader: isWhiteListMethod = request.method in LunaAccountIdWhiteLists.methods isWhiteListPath = ( request.path in LunaAccountIdWhiteLists.routes or request.path in WhiteLists.routes or re.match(WhiteLists.docsPathPattern, request.path) ) if not (isWhiteListPath or isWhiteListMethod): raise VLException(Error.BadHeaderAuth, 401, False, headers=getAuthenticationHeader()) else: isWhiteListPath = request.path in WhiteLists.routes or re.match(WhiteLists.docsPathPattern, request.path) isWhiteListMethod = request.method in WhiteLists.methods if not (isWhiteListPath or isWhiteListMethod): raise VLException(Error.BadHeaderAuth, 401, False, headers=getAuthenticationHeader()) if verificationRequestKwargs.token is not None and request.path in exclusiveBasicAuthRequiredRoutes: raise VLException(Error.PermissionByTokenDenied, 403, False) if verificationRequestKwargs.accountId and not request.config.allowLunaAccountAuthHeader: raise VLException(Error.AccountIdAuthDisabled, 403, False)
[docs]def checkCredentialsRights( verificationRequestKwargs: VerificationRequestKwargs, requestCredentials: RequestCredentials, request: "ApiRequest" ): """ Verify request credentials rights Args: verificationRequestKwargs: verification request kwargs (authorization data from headers) requestCredentials: credentials for request request: api request Raises: VLException(Error.AccountTokenPermissionError, 403, False) if specified token supposed access to any data, but account type is `user` (does not imply access to other accoounts data) VLException(Error.AccountQueryPermissionError, 403, False) if account type is `user` but specified query parameter `account_id` (imply as data filter in other luna-* services) does not match the account id from authorization/`Luna-Account-Id` header """ if verificationRequestKwargs.token is not None: if ( requestCredentials.accountType == AccountType.user and requestCredentials.visibilityArea == VisibilityArea.all ): raise VLException(Error.AccountTokenPermissionError, 403, False) if requestCredentials.accountType == AccountType.user: if (queryAccountId := request.args.get("account_id")) is not None: if queryAccountId != requestCredentials.accountId: raise VLException(Error.AccountQueryPermissionError, 403, False)
[docs]def morphRequest(request: "ApiRequest", requestCredentials: RequestCredentials) -> None: """ Morph request query/json/headers according to request method and request credentials Args: request: api request requestCredentials: verified request credentials """ isMatchRequest = request.path.startswith(WhiteLists.matcherRoutesPrefix) isVisibilityAreaIndependentRequest = request.method in ("GET", "HEAD") or isMatchRequest if not isVisibilityAreaIndependentRequest or requestCredentials.visibilityArea is None: # for backward compability with requests with `Luna-Account-Id` header request.args["account_id"] = requestCredentials.accountId elif requestCredentials.visibilityArea == VisibilityArea.account: request.args["account_id"] = requestCredentials.accountId elif not isVisibilityAreaIndependentRequest: request.args["account_id"] = None request.credentials = requestCredentials
[docs]async def authMiddleware(request: "ApiRequest"): """ Authorization middleware, check a request account id Args: request: request """ isAccountCreationRequest = request.path == WhiteLists.accountCreationRoute and request.method == "POST" isWhiteListMethod = request.method in set(WhiteLists.methods).intersection(LunaAccountIdWhiteLists.methods) if isAccountCreationRequest or isWhiteListMethod: return verificationRequestKwargs: VerificationRequestKwargs = getVerificationRequestKwargs(request=request) checkVerificationKwargs(verificationRequestKwargs=verificationRequestKwargs, request=request) requestCredentials = await getRequestCredentials(verificationKwargs=verificationRequestKwargs, request=request) checkCredentialsRights( verificationRequestKwargs=verificationRequestKwargs, requestCredentials=requestCredentials, request=request ) morphRequest(request=request, requestCredentials=requestCredentials)