"""
Authorization middleware
"""
import binascii
import re
import cachetools
from luna3.accounts.accounts import AccountsApi
from luna3.common.exceptions import LunaApiException
from stringcase import camelcase
from vlutils.cache.cache import cache
from vlutils.helpers import isUUID
from werkzeug.datastructures import Authorization
from app.auth.white_resources_list import LunaAccountIdWhiteLists, WhiteLists
from classes.credentials import RequestCredentials, VerificationRequestKwargs
from classes.enums import AccountType, VisibilityArea
from classes.token import EmitEvents, Permissions
from crutches_on_wheels.cow.errors.errors import Error, ErrorInfo
from crutches_on_wheels.cow.errors.exception import VLException
from crutches_on_wheels.cow.utils.log import Logger
# cache for credentials verification response
cacheTTL = cachetools.TTLCache(maxsize=128, ttl=10)
[docs]
def logWhoIsRequestBelong(logger: Logger, verificationKwargs: VerificationRequestKwargs) -> None:
"""
Log auth info who is request belong
Args:
logger: logger
verificationKwargs: verification kwargs
"""
if (decodedToken := verificationKwargs.getDecodedToken(logger)) is not None:
accountId, tokenId = decodedToken["accountId"], decodedToken["tokenId"]
else:
accountId, tokenId = verificationKwargs.accountId, None
accountIdLog = f"account_id: '{accountId}'" if accountId is not None else "account_id: null"
tokenIdLog = f" token_id: '{tokenId}'" if tokenId is not None else ""
logger.info(f"Request invoked by user ({accountIdLog}{tokenIdLog})")
[docs]
@cache(lambda: cacheTTL, keyGen=lambda accountsClient, verificationKwargs: verificationKwargs.__hash__())
async def verifyCredentials(accountsClient: AccountsApi, verificationKwargs: VerificationRequestKwargs) -> dict:
"""
Verify credentials
Args:
accountsClient: account client
verificationKwargs: authorization credentials
Returns:
dictionary with account-type and permissions (for token validation only)
"""
return (await accountsClient.verifyAccount(raiseError=True, asyncRequest=True, **verificationKwargs.asDict())).json
[docs]
def getVerificationRequestKwargs(request: "ApiRequest") -> VerificationRequestKwargs:
"""
Get kwargs for credentials verification from request
Args:
request: api request
Returns:
kwargs for credentials verification
Raises:
VLException(Error.BadAccountId, 400, isCriticalError=False) if specified `Luna-Account-Id` header content
does not match expected
VLException(Error.AccountIdAuthDisabled, 403, False) if `Luna-Account-Id` specified, but disabled by config
"""
# todo update/remove werkzeug after LUNA-5783
if (token := request.token) is not None:
if token.lower().startswith("basic"):
authData = None
try:
authData = Authorization.from_header(token)
except binascii.Error:
pass
if authData is None:
raise VLException(
Error.AuthorizationFailed.format("Bad format basic authorization header"),
401,
False,
headers=getAuthenticationHeader(),
)
return VerificationRequestKwargs(login=authData.username, password=authData.password)
else:
return VerificationRequestKwargs(token=token)
if (accountId := request.headers.get("Luna-Account-Id")) is not None:
if not isUUID(request.credentials.accountId):
raise VLException(Error.BadAccountId, 400, isCriticalError=False)
return VerificationRequestKwargs(accountId=accountId)
return VerificationRequestKwargs()
[docs]
async def getRequestCredentials(
verificationKwargs: VerificationRequestKwargs, request: "ApiRequest"
) -> RequestCredentials:
"""
Verify and get request credentials
Args:
verificationKwargs: keyword arguments for credentials verification request
request: api request
Returns:
verified request credentials
Raises:
VLException(Error.CorruptedToken, 400, False) if failed to decode jwt token
"""
if verificationKwargs.isEmpty:
return RequestCredentials()
respJson = dict()
isNeedAuthCredentialsLog = True
try:
respJson = await verifyCredentials(
accountsClient=request.app.ctx.luna3session.accountsSession.getClient(),
verificationKwargs=verificationKwargs,
)
except LunaApiException as exc:
if exc.statusCode == 500:
raise
if ErrorInfo.fromDict(exc.json) != Error.AccountNotFound:
isNeedAuthCredentialsLog = False
raise VLException(
Error.AuthorizationFailed.format(exc.json["detail"]),
401,
False,
headers=getAuthenticationHeader(sendBasicAuthInfo=verificationKwargs.login is not None),
)
finally:
if isNeedAuthCredentialsLog:
verificationKwargs.accountId = respJson.get("account_id", verificationKwargs.accountId)
logWhoIsRequestBelong(request.logger, verificationKwargs=verificationKwargs)
accountType = AccountType(respJson["account_type"])
if verificationKwargs.login is not None:
return RequestCredentials(
accountId=respJson["account_id"],
visibilityArea=VisibilityArea.account if accountType == AccountType.user else VisibilityArea.all,
accountType=accountType,
)
elif verificationKwargs.token is not None:
token = verificationKwargs.getDecodedToken(request.logger)
permissions = Permissions(
emitEvents=EmitEvents(
**{camelcase(key): value for key, value in respJson["permissions"]["emit_events"].items()}
),
**{camelcase(key): value for key, value in respJson["permissions"].items() if key != "emit_events"},
)
return RequestCredentials(
accountId=token["accountId"],
visibilityArea=VisibilityArea(token["visibilityArea"]),
permissions=permissions,
accountType=accountType,
)
return RequestCredentials(accountId=verificationKwargs.accountId)
[docs]
def checkVerificationKwargs(request: "ApiRequest", verificationRequestKwargs: VerificationRequestKwargs) -> None:
"""
Check verification kwargs according due to request path, method and system configuration
Args:
request: api request
verificationRequestKwargs: verification kwargs
Raises:
VLException(Error.BadHeaderAuth, 401, False) if no verification kwargs specified, service allows to use
`Luna-Account-Id` header and request required credentials
VLException(Error.BadHeaderAuth, 401, False) if no verification kwargs specified, service not allows use
`Luna-Account-Id` header and request required credentialst
VLException(Error.PermissionByTokenDenied, 403, False) if token kwargs specified, but request path and/or
method not allows access using token
VLException(Error.AccountIdAuthDisabled, 403, False) if `Luna-Account-Id` header if specified, but service
not allows use `Luna-Account-Id` header
"""
exclusiveBasicAuthRequiredRoutes = (f"/{request.app.ctx.apiVersion}/account",)
if verificationRequestKwargs.isEmpty:
if request.path in exclusiveBasicAuthRequiredRoutes:
raise VLException(Error.BadHeaderAuth, 401, False, headers=getAuthenticationHeader())
elif request.config.allowLunaAccountAuthHeader:
isWhiteListMethod = request.method in LunaAccountIdWhiteLists.methods
isWhiteListPath = bool(
request.path in LunaAccountIdWhiteLists.routes
or request.path in WhiteLists.routes
or re.match(WhiteLists.docsPathPattern, request.path)
or re.match(WhiteLists.lambdaDocsPathPattern, request.path),
)
if not (isWhiteListPath or isWhiteListMethod):
raise VLException(Error.BadHeaderAuth, 401, False, headers=getAuthenticationHeader())
else:
isWhiteListPath = bool(
request.path in WhiteLists.routes
or re.match(WhiteLists.docsPathPattern, request.path)
or re.match(WhiteLists.lambdaDocsPathPattern, request.path)
)
isWhiteListMethod = request.method in WhiteLists.methods
if not (isWhiteListPath or isWhiteListMethod):
raise VLException(Error.BadHeaderAuth, 401, False, headers=getAuthenticationHeader())
if verificationRequestKwargs.token is not None and request.path in exclusiveBasicAuthRequiredRoutes:
raise VLException(Error.PermissionByTokenDenied, 403, False)
if verificationRequestKwargs.accountId and not request.config.allowLunaAccountAuthHeader:
raise VLException(Error.AccountIdAuthDisabled, 403, False)
[docs]
def checkCredentialsRights(
verificationRequestKwargs: VerificationRequestKwargs, requestCredentials: RequestCredentials, request: "ApiRequest"
):
"""
Verify request credentials rights
Args:
verificationRequestKwargs: verification request kwargs (authorization data from headers)
requestCredentials: credentials for request
request: api request
Raises:
VLException(Error.AccountTokenPermissionError, 403, False) if specified token supposed access to any data, but
account type is `user` (does not imply access to other accoounts data)
VLException(Error.AccountQueryPermissionError, 403, False) if account type is `user` but specified query
parameter `account_id` (imply as data filter in other luna-* services) does not match the account id
from authorization/`Luna-Account-Id` header
"""
if verificationRequestKwargs.token is not None:
if (
requestCredentials.accountType == AccountType.user
and requestCredentials.visibilityArea == VisibilityArea.all
):
raise VLException(Error.AccountTokenPermissionError, 403, False)
if requestCredentials.accountType == AccountType.user:
if (queryAccountId := request.args.get("account_id")) is not None:
if queryAccountId != requestCredentials.accountId:
raise VLException(Error.AccountQueryPermissionError, 403, False)
[docs]
def morphRequest(request: "ApiRequest", requestCredentials: RequestCredentials) -> None:
"""
Morph request query/json/headers according to request method and request credentials
Args:
request: api request
requestCredentials: verified request credentials
"""
isMatchRequest = request.path.startswith(WhiteLists.matcherRoutesPrefix)
isVisibilityAreaIndependentRequest = request.method in ("GET", "HEAD") or isMatchRequest
if not isVisibilityAreaIndependentRequest or requestCredentials.visibilityArea is None:
# for backward compability with requests with `Luna-Account-Id` header
request.args["account_id"] = requestCredentials.accountId
elif requestCredentials.visibilityArea == VisibilityArea.account:
request.args["account_id"] = requestCredentials.accountId
elif not isVisibilityAreaIndependentRequest:
request.args["account_id"] = None
request.credentials = requestCredentials
[docs]
async def authMiddleware(request: "ApiRequest"):
"""
Authorization middleware, check a request account id
Args:
request: request
"""
isAccountCreationRequest = request.path == WhiteLists.accountCreationRoute and request.method == "POST"
isWhiteListMethod = request.method in set(WhiteLists.methods).intersection(LunaAccountIdWhiteLists.methods)
if isAccountCreationRequest or isWhiteListMethod:
logWhoIsRequestBelong(request.logger, verificationKwargs=VerificationRequestKwargs())
return
verificationRequestKwargs: VerificationRequestKwargs = getVerificationRequestKwargs(request=request)
if verificationRequestKwargs.isEmpty:
logWhoIsRequestBelong(request.logger, verificationKwargs=verificationRequestKwargs)
checkVerificationKwargs(verificationRequestKwargs=verificationRequestKwargs, request=request)
requestCredentials = await getRequestCredentials(verificationKwargs=verificationRequestKwargs, request=request)
checkCredentialsRights(
verificationRequestKwargs=verificationRequestKwargs, requestCredentials=requestCredentials, request=request
)
morphRequest(request=request, requestCredentials=requestCredentials)