Source code for luna_api.app.auth.auth_middleware

"""
Authorization middleware
"""

import binascii
import re

from luna_auth.auth import VerificationRequestKwargs
from luna_auth.classes.credentials import RequestCredentials
from luna_auth.classes.enums import AccountType, VisibilityArea
from luna_auth.classes.errors import AccountNotFoundError, AuthorizationError, CorruptedToken
from sanic.headers import parse_credentials
from vlutils.helpers import isUUID
from werkzeug.datastructures import Authorization

from app.auth.white_resources_list import AuthIgnoreList, LunaAccountIdWhiteLists, WhiteLists
from crutches_on_wheels.cow.errors.errors import Error
from crutches_on_wheels.cow.errors.exception import VLException
from crutches_on_wheels.cow.utils.log import Logger


[docs] def getDecodedToken(verificationKwargs: VerificationRequestKwargs, logger: Logger) -> dict: """ Get decoded token Args: verificationKwargs: request credentials to verify logger: logger Returns: decoded token data Raises: VLException(Error.CorruptedToken) if failed to decode token """ try: return verificationKwargs.decodedToken except CorruptedToken: logger.exception() raise VLException(Error.CorruptedToken, 401, False, headers={"WWW-Authenticate": "Bearer"})
[docs] def logWhoIsRequestBelong(logger: Logger, verificationKwargs: VerificationRequestKwargs) -> None: """ Log auth info who is request belong Args: logger: logger verificationKwargs: verification kwargs """ if (decodedToken := getDecodedToken(verificationKwargs, logger)) is not None: accountId, tokenId = decodedToken["accountId"], decodedToken["tokenId"] else: accountId, tokenId = verificationKwargs.accountId, None accountIdLog = f"account_id: '{accountId}'" if accountId is not None else "account_id: null" tokenIdLog = f" token_id: '{tokenId}'" if tokenId is not None else "" logger.info(f"Request invoked by user ({accountIdLog}{tokenIdLog})")
[docs] def getAuthenticationHeader(sendBasicAuthInfo: bool = True, setCookieHeader: bool = False) -> dict[str, str]: """ Get authentication header for 401 response Args: sendBasicAuthInfo: whether to send basic auth info, otherwise bearer setCookieHeader: cookie header flag Returns: dict with headers """ headers = {"WWW-Authenticate": "Basic realm='login:password'" if sendBasicAuthInfo else "Bearer"} if setCookieHeader: headers["Set-Cookie"] = 'LUNA_AUTH_TOKEN=""; Path=/; Max-Age=0' return headers
[docs] def getVerificationRequestKwargs(request: "ApiRequest") -> VerificationRequestKwargs: """ Get kwargs for credentials verification from request Args: request: api request Returns: kwargs for credentials verification Raises: VLException(Error.BadAccountId, 400, isCriticalError=False) if specified `Luna-Account-Id` header content does not match expected VLException(Error.AccountIdAuthDisabled, 403, False) if `Luna-Account-Id` specified, but disabled by config """ # todo update/remove werkzeug after LUNA-5783 if (token := request.token) is None: _, token = parse_credentials(request.cookies.get("LUNA_AUTH_TOKEN"), ("Bearer", "Token")) if token is not None: if token.lower().startswith("basic"): authData = None try: authData = Authorization.from_header(token) except binascii.Error: pass if authData is None: raise VLException( Error.AuthorizationFailed.format("Bad format basic authorization header"), 401, False, headers=getAuthenticationHeader(), ) return VerificationRequestKwargs( login=authData.username, password=authData.password, basicAuthString=token[len("basic ") :] ) else: return VerificationRequestKwargs(token=token) if (accountId := request.headers.get("Luna-Account-Id")) is not None: if ( request.path == AuthIgnoreList.cookieLoginRoute and request.method not in AuthIgnoreList.loginRouteNoAuthMethods ): raise VLException( Error.AuthorizationFailed.format("Only token or basic authorization is allowed for this action"), 401, False, headers=getAuthenticationHeader(), ) if not isUUID(request.lunaApiCredentials.accountId): raise VLException(Error.BadAccountId, 400, isCriticalError=False) return VerificationRequestKwargs(accountId=accountId) return VerificationRequestKwargs()
[docs] async def getRequestCredentials( verificationKwargs: VerificationRequestKwargs, request: "ApiRequest", ignoreAuthError: bool = False ) -> RequestCredentials: """ Verify and get request credentials Args: verificationKwargs: keyword arguments for credentials verification request request: api request ignoreAuthError: whether to ignore authorization credentials verification error Returns: verified request credentials Raises: VLException(Error.CorruptedToken, 400, False) if failed to decode jwt token """ if verificationKwargs.isEmpty: return RequestCredentials() isNeedAuthCredentialsLog = True creds = None try: creds = await request.app.ctx.lunaAuth.verifyCredentials(verificationKwargs) except AuthorizationError as err: isNeedAuthCredentialsLog = isinstance(err, AccountNotFoundError) if not ignoreAuthError: raise VLException( Error.AuthorizationFailed.format(str(err)), 401, False, headers=getAuthenticationHeader( sendBasicAuthInfo=verificationKwargs.login is not None, setCookieHeader=bool(request.cookies.get("LUNA_AUTH_TOKEN")), ), ) finally: if isNeedAuthCredentialsLog: verificationKwargs.accountId = creds.accountId if creds is not None else None logWhoIsRequestBelong(request.logger, verificationKwargs=verificationKwargs) if ignoreAuthError and not creds: return RequestCredentials() return creds
[docs] def checkVerificationKwargs(request: "ApiRequest", verificationRequestKwargs: VerificationRequestKwargs) -> None: """ Check verification kwargs according due to request path, method and system configuration Args: request: api request verificationRequestKwargs: verification kwargs Raises: VLException(Error.BadHeaderAuth, 401, False) if no verification kwargs specified, service allows to use `Luna-Account-Id` header and request required credentials VLException(Error.BadHeaderAuth, 401, False) if no verification kwargs specified, service not allows use `Luna-Account-Id` header and request required credentials VLException(Error.PermissionByTokenDenied, 403, False) if token kwargs specified, but request path and/or method not allows access using token VLException(Error.AccountIdAuthDisabled, 403, False) if `Luna-Account-Id` header if specified, but service not allows use `Luna-Account-Id` header """ exclusiveBasicAuthRequiredRoutes = (f"/{request.app.ctx.apiVersion}/account",) if verificationRequestKwargs.isEmpty: if request.path in exclusiveBasicAuthRequiredRoutes: raise VLException(Error.BadHeaderAuth, 401, False, headers=getAuthenticationHeader()) elif request.config.allowLunaAccountAuthHeader: isWhiteListMethod = request.method in LunaAccountIdWhiteLists.methods isWhiteListPath = bool( request.path in LunaAccountIdWhiteLists.routes or request.path in WhiteLists.routes or re.match(WhiteLists.docsPathPattern, request.path) or re.match(WhiteLists.lambdaDocsPathPattern, request.path), ) if not (isWhiteListPath or isWhiteListMethod): raise VLException(Error.BadHeaderAuth, 401, False, headers=getAuthenticationHeader()) else: isWhiteListPath = bool( request.path in WhiteLists.routes or re.match(WhiteLists.docsPathPattern, request.path) or re.match(WhiteLists.lambdaDocsPathPattern, request.path) ) isWhiteListMethod = request.method in WhiteLists.methods if not (isWhiteListPath or isWhiteListMethod): raise VLException(Error.BadHeaderAuth, 401, False, headers=getAuthenticationHeader()) if ( verificationRequestKwargs.token is not None and request.path in exclusiveBasicAuthRequiredRoutes and request.method not in ("GET", "HEAD") ): raise VLException(Error.PermissionByTokenDenied, 403, False) if verificationRequestKwargs.accountId and not request.config.allowLunaAccountAuthHeader: raise VLException(Error.AccountIdAuthDisabled, 403, False)
[docs] def checkCredentialsRights( verificationRequestKwargs: VerificationRequestKwargs, requestCredentials: RequestCredentials, request: "ApiRequest" ): """ Verify request credentials rights Args: verificationRequestKwargs: verification request kwargs (authorization data from headers) requestCredentials: credentials for request request: api request Raises: VLException(Error.AccountTokenPermissionError, 403, False) if specified token supposed access to any data, but account type is `user` (does not imply access to other accoounts data) VLException(Error.AccountQueryPermissionError, 403, False) if account type is `user` but specified query parameter `account_id` (imply as data filter in other luna-* services) does not match the account id from authorization/`Luna-Account-Id` header """ if verificationRequestKwargs.token is not None: if ( requestCredentials.accountType == AccountType.user and requestCredentials.visibilityArea == VisibilityArea.all ): raise VLException(Error.AccountTokenPermissionError, 403, False) if requestCredentials.accountType == AccountType.user: if (queryAccountId := request.args.get("account_id")) is not None: if queryAccountId != requestCredentials.accountId: raise VLException(Error.AccountQueryPermissionError, 403, False)
[docs] def morphRequest( request: "ApiRequest", requestCredentials: RequestCredentials, verificationRequestKwargs: VerificationRequestKwargs ) -> None: """ Morph request query/json/headers according to request method and request credentials Args: request: api request requestCredentials: verified request credentials verificationRequestKwargs: verification request kwargs """ isMatchRequest = request.path.startswith(WhiteLists.matcherRoutesPrefix) isVisibilityAreaIndependentRequest = request.method in ("GET", "HEAD") or isMatchRequest if verificationRequestKwargs.isEmpty: # case of `Luna-Account-Id` header auth to skip replacing account_id filter from query parameters ... elif not isVisibilityAreaIndependentRequest or requestCredentials.visibilityArea is None: # for backward compatibility with requests with `Luna-Account-Id` header request.args["account_id"] = requestCredentials.accountId elif requestCredentials.visibilityArea == VisibilityArea.account: request.args["account_id"] = requestCredentials.accountId elif not isVisibilityAreaIndependentRequest: request.args["account_id"] = None request.lunaApiCredentials = requestCredentials
[docs] def ignoreAuthError(request: "ApiRequest") -> bool: """ Ignore auth error Args: request: request Returns: ignoreAuthError flag """ return ( request.path in AuthIgnoreList.routes or request.path.startswith(AuthIgnoreList.uiPrefix) or ( request.path == AuthIgnoreList.cookieLoginRoute and request.method in AuthIgnoreList.loginRouteNoAuthMethods ) )
[docs] async def authMiddleware(request: "ApiRequest"): """ Authorization middleware, check a request account id Args: request: request """ isAccountCreationRequest = request.path == WhiteLists.accountCreationRoute and request.method == "POST" isWhiteListMethod = request.method in set(WhiteLists.methods).intersection(LunaAccountIdWhiteLists.methods) if isAccountCreationRequest or isWhiteListMethod: logWhoIsRequestBelong(request.logger, verificationKwargs=VerificationRequestKwargs()) return verificationRequestKwargs: VerificationRequestKwargs = getVerificationRequestKwargs(request=request) if verificationRequestKwargs.isEmpty: logWhoIsRequestBelong(request.logger, verificationKwargs=verificationRequestKwargs) checkVerificationKwargs(verificationRequestKwargs=verificationRequestKwargs, request=request) requestCredentials = await getRequestCredentials( verificationKwargs=verificationRequestKwargs, request=request, ignoreAuthError=ignoreAuthError(request) ) checkCredentialsRights( verificationRequestKwargs=verificationRequestKwargs, requestCredentials=requestCredentials, request=request ) morphRequest( request=request, requestCredentials=requestCredentials, verificationRequestKwargs=verificationRequestKwargs ) request.dataForMonitoring.tags.update({"account_id": requestCredentials.accountId})