Source code for luna_api.app.auth.auth_middleware

"""
Authorization middleware
"""

import binascii
import re

import cachetools
from luna3.accounts.accounts import AccountsApi
from luna3.common.exceptions import LunaApiException
from sanic.headers import parse_credentials
from stringcase import camelcase
from vlutils.cache.cache import cache
from vlutils.helpers import isUUID
from werkzeug.datastructures import Authorization

from app.auth.white_resources_list import AuthIgnoreList, LunaAccountIdWhiteLists, WhiteLists
from classes.credentials import RequestCredentials, VerificationRequestKwargs
from classes.enums import AccountType, VisibilityArea
from classes.token import EmitEvents, Permissions, Verify
from crutches_on_wheels.cow.errors.errors import Error, ErrorInfo
from crutches_on_wheels.cow.errors.exception import VLException
from crutches_on_wheels.cow.utils.log import Logger

# cache for credentials verification response
cacheTTL = cachetools.TTLCache(maxsize=128, ttl=10)


[docs] def logWhoIsRequestBelong(logger: Logger, verificationKwargs: VerificationRequestKwargs) -> None: """ Log auth info who is request belong Args: logger: logger verificationKwargs: verification kwargs """ if (decodedToken := verificationKwargs.getDecodedToken(logger)) is not None: accountId, tokenId = decodedToken["accountId"], decodedToken["tokenId"] else: accountId, tokenId = verificationKwargs.accountId, None accountIdLog = f"account_id: '{accountId}'" if accountId is not None else "account_id: null" tokenIdLog = f" token_id: '{tokenId}'" if tokenId is not None else "" logger.info(f"Request invoked by user ({accountIdLog}{tokenIdLog})")
[docs] def getAuthenticationHeader(sendBasicAuthInfo: bool = True, setCookieHeader: bool = False) -> dict[str, str]: """ Get authentication header for 401 response Args: sendBasicAuthInfo: whether to send basic auth info, otherwise bearer setCookieHeader: cookie header flag Returns: dict with headers """ headers = {"WWW-Authenticate": "Basic realm='login:password'" if sendBasicAuthInfo else "Bearer"} if setCookieHeader: headers["Set-Cookie"] = 'LUNA_AUTH_TOKEN=""; Path=/; Max-Age=0' return headers
[docs] @cache(lambda: cacheTTL, keyGen=lambda accountsClient, verificationKwargs: verificationKwargs.__hash__()) async def verifyCredentials(accountsClient: AccountsApi, verificationKwargs: VerificationRequestKwargs) -> dict: """ Verify credentials Args: accountsClient: account client verificationKwargs: authorization credentials Returns: dictionary with account-type and permissions (for token validation only) """ return (await accountsClient.verifyAccount(raiseError=True, asyncRequest=True, **verificationKwargs.asDict())).json
[docs] def getVerificationRequestKwargs(request: "ApiRequest") -> VerificationRequestKwargs: """ Get kwargs for credentials verification from request Args: request: api request Returns: kwargs for credentials verification Raises: VLException(Error.BadAccountId, 400, isCriticalError=False) if specified `Luna-Account-Id` header content does not match expected VLException(Error.AccountIdAuthDisabled, 403, False) if `Luna-Account-Id` specified, but disabled by config """ # todo update/remove werkzeug after LUNA-5783 if (token := request.token) is None: _, token = parse_credentials(request.cookies.get("LUNA_AUTH_TOKEN"), ("Bearer", "Token")) if token is not None: if token.lower().startswith("basic"): authData = None try: authData = Authorization.from_header(token) except binascii.Error: pass if authData is None: raise VLException( Error.AuthorizationFailed.format("Bad format basic authorization header"), 401, False, headers=getAuthenticationHeader(), ) return VerificationRequestKwargs(login=authData.username, password=authData.password) else: return VerificationRequestKwargs(token=token) if (accountId := request.headers.get("Luna-Account-Id")) is not None: if ( request.path == AuthIgnoreList.cookieLoginRoute and request.method not in AuthIgnoreList.loginRouteNoAuthMethods ): raise VLException( Error.AuthorizationFailed.format("Only token or basic authorization is allowed for this action"), 401, False, headers=getAuthenticationHeader(), ) if not isUUID(request.lunaApiCredentials.accountId): raise VLException(Error.BadAccountId, 400, isCriticalError=False) return VerificationRequestKwargs(accountId=accountId) return VerificationRequestKwargs()
[docs] async def getRequestCredentials( verificationKwargs: VerificationRequestKwargs, request: "ApiRequest", ignoreAuthError: bool = False ) -> RequestCredentials: """ Verify and get request credentials Args: verificationKwargs: keyword arguments for credentials verification request request: api request ignoreAuthError: whether to ignore authorization credentials verification error Returns: verified request credentials Raises: VLException(Error.CorruptedToken, 400, False) if failed to decode jwt token """ if verificationKwargs.isEmpty: return RequestCredentials() respJson = dict() isNeedAuthCredentialsLog = True try: respJson = await verifyCredentials( accountsClient=request.app.ctx.luna3session.accountsSession.getClient(), verificationKwargs=verificationKwargs, ) except LunaApiException as exc: if exc.statusCode == 500: raise if ErrorInfo.fromDict(exc.json) != Error.AccountNotFound: isNeedAuthCredentialsLog = False if not ignoreAuthError: raise VLException( Error.AuthorizationFailed.format(exc.json["detail"]), 401, False, headers=getAuthenticationHeader( sendBasicAuthInfo=verificationKwargs.login is not None, setCookieHeader=bool(request.cookies.get("LUNA_AUTH_TOKEN")), ), ) finally: if isNeedAuthCredentialsLog: verificationKwargs.accountId = respJson.get("account_id", verificationKwargs.accountId) logWhoIsRequestBelong(request.logger, verificationKwargs=verificationKwargs) if ignoreAuthError and not respJson: return RequestCredentials() accountType = AccountType(respJson["account_type"]) if verificationKwargs.login is not None: return RequestCredentials( accountId=respJson["account_id"], visibilityArea=VisibilityArea.account if accountType == AccountType.user else VisibilityArea.all, accountType=accountType, ) elif verificationKwargs.token is not None: token = verificationKwargs.getDecodedToken(request.logger) permissions = Permissions( emitEvents=EmitEvents( **{camelcase(key): value for key, value in respJson["permissions"]["emit_events"].items()} ), verify=Verify(**{camelcase(key): value for key, value in respJson["permissions"]["verify"].items()}), **{ camelcase(key): value for key, value in respJson["permissions"].items() if key not in ("emit_events", "verify") }, ) return RequestCredentials( accountId=token["accountId"], visibilityArea=VisibilityArea(token["visibilityArea"]), permissions=permissions, accountType=accountType, ) return RequestCredentials(accountId=verificationKwargs.accountId)
[docs] def checkVerificationKwargs(request: "ApiRequest", verificationRequestKwargs: VerificationRequestKwargs) -> None: """ Check verification kwargs according due to request path, method and system configuration Args: request: api request verificationRequestKwargs: verification kwargs Raises: VLException(Error.BadHeaderAuth, 401, False) if no verification kwargs specified, service allows to use `Luna-Account-Id` header and request required credentials VLException(Error.BadHeaderAuth, 401, False) if no verification kwargs specified, service not allows use `Luna-Account-Id` header and request required credentialst VLException(Error.PermissionByTokenDenied, 403, False) if token kwargs specified, but request path and/or method not allows access using token VLException(Error.AccountIdAuthDisabled, 403, False) if `Luna-Account-Id` header if specified, but service not allows use `Luna-Account-Id` header """ exclusiveBasicAuthRequiredRoutes = (f"/{request.app.ctx.apiVersion}/account",) if verificationRequestKwargs.isEmpty: if request.path in exclusiveBasicAuthRequiredRoutes: raise VLException(Error.BadHeaderAuth, 401, False, headers=getAuthenticationHeader()) elif request.config.allowLunaAccountAuthHeader: isWhiteListMethod = request.method in LunaAccountIdWhiteLists.methods isWhiteListPath = bool( request.path in LunaAccountIdWhiteLists.routes or request.path in WhiteLists.routes or re.match(WhiteLists.docsPathPattern, request.path) or re.match(WhiteLists.lambdaDocsPathPattern, request.path), ) if not (isWhiteListPath or isWhiteListMethod): raise VLException(Error.BadHeaderAuth, 401, False, headers=getAuthenticationHeader()) else: isWhiteListPath = bool( request.path in WhiteLists.routes or re.match(WhiteLists.docsPathPattern, request.path) or re.match(WhiteLists.lambdaDocsPathPattern, request.path) ) isWhiteListMethod = request.method in WhiteLists.methods if not (isWhiteListPath or isWhiteListMethod): raise VLException(Error.BadHeaderAuth, 401, False, headers=getAuthenticationHeader()) if ( verificationRequestKwargs.token is not None and request.path in exclusiveBasicAuthRequiredRoutes and request.method not in ("GET", "HEAD") ): raise VLException(Error.PermissionByTokenDenied, 403, False) if verificationRequestKwargs.accountId and not request.config.allowLunaAccountAuthHeader: raise VLException(Error.AccountIdAuthDisabled, 403, False)
[docs] def checkCredentialsRights( verificationRequestKwargs: VerificationRequestKwargs, requestCredentials: RequestCredentials, request: "ApiRequest" ): """ Verify request credentials rights Args: verificationRequestKwargs: verification request kwargs (authorization data from headers) requestCredentials: credentials for request request: api request Raises: VLException(Error.AccountTokenPermissionError, 403, False) if specified token supposed access to any data, but account type is `user` (does not imply access to other accoounts data) VLException(Error.AccountQueryPermissionError, 403, False) if account type is `user` but specified query parameter `account_id` (imply as data filter in other luna-* services) does not match the account id from authorization/`Luna-Account-Id` header """ if verificationRequestKwargs.token is not None: if ( requestCredentials.accountType == AccountType.user and requestCredentials.visibilityArea == VisibilityArea.all ): raise VLException(Error.AccountTokenPermissionError, 403, False) if requestCredentials.accountType == AccountType.user: if (queryAccountId := request.args.get("account_id")) is not None: if queryAccountId != requestCredentials.accountId: raise VLException(Error.AccountQueryPermissionError, 403, False)
[docs] def morphRequest( request: "ApiRequest", requestCredentials: RequestCredentials, verificationRequestKwargs: VerificationRequestKwargs ) -> None: """ Morph request query/json/headers according to request method and request credentials Args: request: api request requestCredentials: verified request credentials verificationRequestKwargs: verification request kwargs """ isMatchRequest = request.path.startswith(WhiteLists.matcherRoutesPrefix) isVisibilityAreaIndependentRequest = request.method in ("GET", "HEAD") or isMatchRequest if verificationRequestKwargs.isEmpty: # case of `Luna-Account-Id` header auth to skip replacing account_id filter from query parameters ... elif not isVisibilityAreaIndependentRequest or requestCredentials.visibilityArea is None: # for backward compatibility with requests with `Luna-Account-Id` header request.args["account_id"] = requestCredentials.accountId elif requestCredentials.visibilityArea == VisibilityArea.account: request.args["account_id"] = requestCredentials.accountId elif not isVisibilityAreaIndependentRequest: request.args["account_id"] = None request.lunaApiCredentials = requestCredentials
[docs] def ignoreAuthError(request: "ApiRequest") -> bool: """ Ignore auth error Args: request: request Returns: ignoreAuthError flag """ return ( request.path in AuthIgnoreList.routes or request.path.startswith(AuthIgnoreList.uiPrefix) or ( request.path == AuthIgnoreList.cookieLoginRoute and request.method in AuthIgnoreList.loginRouteNoAuthMethods ) )
[docs] async def authMiddleware(request: "ApiRequest"): """ Authorization middleware, check a request account id Args: request: request """ isAccountCreationRequest = request.path == WhiteLists.accountCreationRoute and request.method == "POST" isWhiteListMethod = request.method in set(WhiteLists.methods).intersection(LunaAccountIdWhiteLists.methods) if isAccountCreationRequest or isWhiteListMethod: logWhoIsRequestBelong(request.logger, verificationKwargs=VerificationRequestKwargs()) return verificationRequestKwargs: VerificationRequestKwargs = getVerificationRequestKwargs(request=request) if verificationRequestKwargs.isEmpty: logWhoIsRequestBelong(request.logger, verificationKwargs=verificationRequestKwargs) checkVerificationKwargs(verificationRequestKwargs=verificationRequestKwargs, request=request) requestCredentials = await getRequestCredentials( verificationKwargs=verificationRequestKwargs, request=request, ignoreAuthError=ignoreAuthError(request) ) checkCredentialsRights( verificationRequestKwargs=verificationRequestKwargs, requestCredentials=requestCredentials, request=request ) morphRequest( request=request, requestCredentials=requestCredentials, verificationRequestKwargs=verificationRequestKwargs ) request.dataForMonitoring.tags.update({"account_id": requestCredentials.accountId})