"""
Authorization middleware
"""
import binascii
import re
import cachetools
from luna3.accounts.accounts import AccountsApi
from luna3.common.exceptions import LunaApiException
from sanic.headers import parse_credentials
from stringcase import camelcase
from vlutils.cache.cache import cache
from vlutils.helpers import isUUID
from werkzeug.datastructures import Authorization
from app.auth.white_resources_list import AuthIgnoreList, LunaAccountIdWhiteLists, WhiteLists
from classes.credentials import RequestCredentials, VerificationRequestKwargs
from classes.enums import AccountType, VisibilityArea
from classes.token import EmitEvents, Permissions, Verify
from crutches_on_wheels.cow.errors.errors import Error, ErrorInfo
from crutches_on_wheels.cow.errors.exception import VLException
from crutches_on_wheels.cow.utils.log import Logger
# cache for credentials verification response
cacheTTL = cachetools.TTLCache(maxsize=128, ttl=10)
[docs]
def logWhoIsRequestBelong(logger: Logger, verificationKwargs: VerificationRequestKwargs) -> None:
"""
Log auth info who is request belong
Args:
logger: logger
verificationKwargs: verification kwargs
"""
if (decodedToken := verificationKwargs.getDecodedToken(logger)) is not None:
accountId, tokenId = decodedToken["accountId"], decodedToken["tokenId"]
else:
accountId, tokenId = verificationKwargs.accountId, None
accountIdLog = f"account_id: '{accountId}'" if accountId is not None else "account_id: null"
tokenIdLog = f" token_id: '{tokenId}'" if tokenId is not None else ""
logger.info(f"Request invoked by user ({accountIdLog}{tokenIdLog})")
[docs]
@cache(lambda: cacheTTL, keyGen=lambda accountsClient, verificationKwargs: verificationKwargs.__hash__())
async def verifyCredentials(accountsClient: AccountsApi, verificationKwargs: VerificationRequestKwargs) -> dict:
"""
Verify credentials
Args:
accountsClient: account client
verificationKwargs: authorization credentials
Returns:
dictionary with account-type and permissions (for token validation only)
"""
return (await accountsClient.verifyAccount(raiseError=True, asyncRequest=True, **verificationKwargs.asDict())).json
[docs]
def getVerificationRequestKwargs(request: "ApiRequest") -> VerificationRequestKwargs:
"""
Get kwargs for credentials verification from request
Args:
request: api request
Returns:
kwargs for credentials verification
Raises:
VLException(Error.BadAccountId, 400, isCriticalError=False) if specified `Luna-Account-Id` header content
does not match expected
VLException(Error.AccountIdAuthDisabled, 403, False) if `Luna-Account-Id` specified, but disabled by config
"""
# todo update/remove werkzeug after LUNA-5783
if (token := request.token) is None:
_, token = parse_credentials(request.cookies.get("LUNA_AUTH_TOKEN"), ("Bearer", "Token"))
if token is not None:
if token.lower().startswith("basic"):
authData = None
try:
authData = Authorization.from_header(token)
except binascii.Error:
pass
if authData is None:
raise VLException(
Error.AuthorizationFailed.format("Bad format basic authorization header"),
401,
False,
headers=getAuthenticationHeader(),
)
return VerificationRequestKwargs(login=authData.username, password=authData.password)
else:
return VerificationRequestKwargs(token=token)
if (accountId := request.headers.get("Luna-Account-Id")) is not None:
if (
request.path == AuthIgnoreList.cookieLoginRoute
and request.method not in AuthIgnoreList.loginRouteNoAuthMethods
):
raise VLException(
Error.AuthorizationFailed.format("Only token or basic authorization is allowed for this action"),
401,
False,
headers=getAuthenticationHeader(),
)
if not isUUID(request.lunaApiCredentials.accountId):
raise VLException(Error.BadAccountId, 400, isCriticalError=False)
return VerificationRequestKwargs(accountId=accountId)
return VerificationRequestKwargs()
[docs]
async def getRequestCredentials(
verificationKwargs: VerificationRequestKwargs, request: "ApiRequest", ignoreAuthError: bool = False
) -> RequestCredentials:
"""
Verify and get request credentials
Args:
verificationKwargs: keyword arguments for credentials verification request
request: api request
ignoreAuthError: whether to ignore authorization credentials verification error
Returns:
verified request credentials
Raises:
VLException(Error.CorruptedToken, 400, False) if failed to decode jwt token
"""
if verificationKwargs.isEmpty:
return RequestCredentials()
respJson = dict()
isNeedAuthCredentialsLog = True
try:
respJson = await verifyCredentials(
accountsClient=request.app.ctx.luna3session.accountsSession.getClient(),
verificationKwargs=verificationKwargs,
)
except LunaApiException as exc:
if exc.statusCode == 500:
raise
if ErrorInfo.fromDict(exc.json) != Error.AccountNotFound:
isNeedAuthCredentialsLog = False
if not ignoreAuthError:
raise VLException(
Error.AuthorizationFailed.format(exc.json["detail"]),
401,
False,
headers=getAuthenticationHeader(
sendBasicAuthInfo=verificationKwargs.login is not None,
setCookieHeader=bool(request.cookies.get("LUNA_AUTH_TOKEN")),
),
)
finally:
if isNeedAuthCredentialsLog:
verificationKwargs.accountId = respJson.get("account_id", verificationKwargs.accountId)
logWhoIsRequestBelong(request.logger, verificationKwargs=verificationKwargs)
if ignoreAuthError and not respJson:
return RequestCredentials()
accountType = AccountType(respJson["account_type"])
if verificationKwargs.login is not None:
return RequestCredentials(
accountId=respJson["account_id"],
visibilityArea=VisibilityArea.account if accountType == AccountType.user else VisibilityArea.all,
accountType=accountType,
)
elif verificationKwargs.token is not None:
token = verificationKwargs.getDecodedToken(request.logger)
permissions = Permissions(
emitEvents=EmitEvents(
**{camelcase(key): value for key, value in respJson["permissions"]["emit_events"].items()}
),
verify=Verify(**{camelcase(key): value for key, value in respJson["permissions"]["verify"].items()}),
**{
camelcase(key): value
for key, value in respJson["permissions"].items()
if key not in ("emit_events", "verify")
},
)
return RequestCredentials(
accountId=token["accountId"],
visibilityArea=VisibilityArea(token["visibilityArea"]),
permissions=permissions,
accountType=accountType,
)
return RequestCredentials(accountId=verificationKwargs.accountId)
[docs]
def checkVerificationKwargs(request: "ApiRequest", verificationRequestKwargs: VerificationRequestKwargs) -> None:
"""
Check verification kwargs according due to request path, method and system configuration
Args:
request: api request
verificationRequestKwargs: verification kwargs
Raises:
VLException(Error.BadHeaderAuth, 401, False) if no verification kwargs specified, service allows to use
`Luna-Account-Id` header and request required credentials
VLException(Error.BadHeaderAuth, 401, False) if no verification kwargs specified, service not allows use
`Luna-Account-Id` header and request required credentialst
VLException(Error.PermissionByTokenDenied, 403, False) if token kwargs specified, but request path and/or
method not allows access using token
VLException(Error.AccountIdAuthDisabled, 403, False) if `Luna-Account-Id` header if specified, but service
not allows use `Luna-Account-Id` header
"""
exclusiveBasicAuthRequiredRoutes = (f"/{request.app.ctx.apiVersion}/account",)
if verificationRequestKwargs.isEmpty:
if request.path in exclusiveBasicAuthRequiredRoutes:
raise VLException(Error.BadHeaderAuth, 401, False, headers=getAuthenticationHeader())
elif request.config.allowLunaAccountAuthHeader:
isWhiteListMethod = request.method in LunaAccountIdWhiteLists.methods
isWhiteListPath = bool(
request.path in LunaAccountIdWhiteLists.routes
or request.path in WhiteLists.routes
or re.match(WhiteLists.docsPathPattern, request.path)
or re.match(WhiteLists.lambdaDocsPathPattern, request.path),
)
if not (isWhiteListPath or isWhiteListMethod):
raise VLException(Error.BadHeaderAuth, 401, False, headers=getAuthenticationHeader())
else:
isWhiteListPath = bool(
request.path in WhiteLists.routes
or re.match(WhiteLists.docsPathPattern, request.path)
or re.match(WhiteLists.lambdaDocsPathPattern, request.path)
)
isWhiteListMethod = request.method in WhiteLists.methods
if not (isWhiteListPath or isWhiteListMethod):
raise VLException(Error.BadHeaderAuth, 401, False, headers=getAuthenticationHeader())
if (
verificationRequestKwargs.token is not None
and request.path in exclusiveBasicAuthRequiredRoutes
and request.method not in ("GET", "HEAD")
):
raise VLException(Error.PermissionByTokenDenied, 403, False)
if verificationRequestKwargs.accountId and not request.config.allowLunaAccountAuthHeader:
raise VLException(Error.AccountIdAuthDisabled, 403, False)
[docs]
def checkCredentialsRights(
verificationRequestKwargs: VerificationRequestKwargs, requestCredentials: RequestCredentials, request: "ApiRequest"
):
"""
Verify request credentials rights
Args:
verificationRequestKwargs: verification request kwargs (authorization data from headers)
requestCredentials: credentials for request
request: api request
Raises:
VLException(Error.AccountTokenPermissionError, 403, False) if specified token supposed access to any data, but
account type is `user` (does not imply access to other accoounts data)
VLException(Error.AccountQueryPermissionError, 403, False) if account type is `user` but specified query
parameter `account_id` (imply as data filter in other luna-* services) does not match the account id
from authorization/`Luna-Account-Id` header
"""
if verificationRequestKwargs.token is not None:
if (
requestCredentials.accountType == AccountType.user
and requestCredentials.visibilityArea == VisibilityArea.all
):
raise VLException(Error.AccountTokenPermissionError, 403, False)
if requestCredentials.accountType == AccountType.user:
if (queryAccountId := request.args.get("account_id")) is not None:
if queryAccountId != requestCredentials.accountId:
raise VLException(Error.AccountQueryPermissionError, 403, False)
[docs]
def morphRequest(
request: "ApiRequest", requestCredentials: RequestCredentials, verificationRequestKwargs: VerificationRequestKwargs
) -> None:
"""
Morph request query/json/headers according to request method and request credentials
Args:
request: api request
requestCredentials: verified request credentials
verificationRequestKwargs: verification request kwargs
"""
isMatchRequest = request.path.startswith(WhiteLists.matcherRoutesPrefix)
isVisibilityAreaIndependentRequest = request.method in ("GET", "HEAD") or isMatchRequest
if verificationRequestKwargs.isEmpty:
# case of `Luna-Account-Id` header auth to skip replacing account_id filter from query parameters
...
elif not isVisibilityAreaIndependentRequest or requestCredentials.visibilityArea is None:
# for backward compatibility with requests with `Luna-Account-Id` header
request.args["account_id"] = requestCredentials.accountId
elif requestCredentials.visibilityArea == VisibilityArea.account:
request.args["account_id"] = requestCredentials.accountId
elif not isVisibilityAreaIndependentRequest:
request.args["account_id"] = None
request.lunaApiCredentials = requestCredentials
[docs]
def ignoreAuthError(request: "ApiRequest") -> bool:
"""
Ignore auth error
Args:
request: request
Returns:
ignoreAuthError flag
"""
return (
request.path in AuthIgnoreList.routes
or request.path.startswith(AuthIgnoreList.uiPrefix)
or (
request.path == AuthIgnoreList.cookieLoginRoute and request.method in AuthIgnoreList.loginRouteNoAuthMethods
)
)
[docs]
async def authMiddleware(request: "ApiRequest"):
"""
Authorization middleware, check a request account id
Args:
request: request
"""
isAccountCreationRequest = request.path == WhiteLists.accountCreationRoute and request.method == "POST"
isWhiteListMethod = request.method in set(WhiteLists.methods).intersection(LunaAccountIdWhiteLists.methods)
if isAccountCreationRequest or isWhiteListMethod:
logWhoIsRequestBelong(request.logger, verificationKwargs=VerificationRequestKwargs())
return
verificationRequestKwargs: VerificationRequestKwargs = getVerificationRequestKwargs(request=request)
if verificationRequestKwargs.isEmpty:
logWhoIsRequestBelong(request.logger, verificationKwargs=verificationRequestKwargs)
checkVerificationKwargs(verificationRequestKwargs=verificationRequestKwargs, request=request)
requestCredentials = await getRequestCredentials(
verificationKwargs=verificationRequestKwargs, request=request, ignoreAuthError=ignoreAuthError(request)
)
checkCredentialsRights(
verificationRequestKwargs=verificationRequestKwargs, requestCredentials=requestCredentials, request=request
)
morphRequest(
request=request, requestCredentials=requestCredentials, verificationRequestKwargs=verificationRequestKwargs
)
request.dataForMonitoring.tags.update({"account_id": requestCredentials.accountId})