"""
Authorization middleware
"""
import binascii
import re
from luna_auth.auth import VerificationRequestKwargs
from luna_auth.classes.credentials import RequestCredentials
from luna_auth.classes.enums import AccountType, VisibilityArea
from luna_auth.classes.errors import AccountNotFoundError, AuthorizationError, CorruptedToken
from sanic.headers import parse_credentials
from vlutils.helpers import isUUID
from werkzeug.datastructures import Authorization
from app.auth.white_resources_list import AuthIgnoreList, LunaAccountIdWhiteLists, WhiteLists
from crutches_on_wheels.cow.errors.errors import Error
from crutches_on_wheels.cow.errors.exception import VLException
from crutches_on_wheels.cow.utils.log import Logger
[docs]
def getDecodedToken(verificationKwargs: VerificationRequestKwargs, logger: Logger) -> dict:
"""
Get decoded token
Args:
verificationKwargs: request credentials to verify
logger: logger
Returns:
decoded token data
Raises:
VLException(Error.CorruptedToken) if failed to decode token
"""
try:
return verificationKwargs.decodedToken
except CorruptedToken:
logger.exception()
raise VLException(Error.CorruptedToken, 401, False, headers={"WWW-Authenticate": "Bearer"})
[docs]
def logWhoIsRequestBelong(logger: Logger, verificationKwargs: VerificationRequestKwargs) -> None:
"""
Log auth info who is request belong
Args:
logger: logger
verificationKwargs: verification kwargs
"""
if (decodedToken := getDecodedToken(verificationKwargs, logger)) is not None:
accountId, tokenId = decodedToken["accountId"], decodedToken["tokenId"]
else:
accountId, tokenId = verificationKwargs.accountId, None
accountIdLog = f"account_id: '{accountId}'" if accountId is not None else "account_id: null"
tokenIdLog = f" token_id: '{tokenId}'" if tokenId is not None else ""
logger.info(f"Request invoked by user ({accountIdLog}{tokenIdLog})")
[docs]
def getVerificationRequestKwargs(request: "ApiRequest") -> VerificationRequestKwargs:
"""
Get kwargs for credentials verification from request
Args:
request: api request
Returns:
kwargs for credentials verification
Raises:
VLException(Error.BadAccountId, 400, isCriticalError=False) if specified `Luna-Account-Id` header content
does not match expected
VLException(Error.AccountIdAuthDisabled, 403, False) if `Luna-Account-Id` specified, but disabled by config
"""
# todo update/remove werkzeug after LUNA-5783
if (token := request.token) is None:
_, token = parse_credentials(request.cookies.get("LUNA_AUTH_TOKEN"), ("Bearer", "Token"))
if token is not None:
if token.lower().startswith("basic"):
authData = None
try:
authData = Authorization.from_header(token)
except binascii.Error:
pass
if authData is None:
raise VLException(
Error.AuthorizationFailed.format("Bad format basic authorization header"),
401,
False,
headers=getAuthenticationHeader(),
)
return VerificationRequestKwargs(
login=authData.username, password=authData.password, basicAuthString=token[len("basic ") :]
)
else:
return VerificationRequestKwargs(token=token)
if (accountId := request.headers.get("Luna-Account-Id")) is not None:
if (
request.path == AuthIgnoreList.cookieLoginRoute
and request.method not in AuthIgnoreList.loginRouteNoAuthMethods
):
raise VLException(
Error.AuthorizationFailed.format("Only token or basic authorization is allowed for this action"),
401,
False,
headers=getAuthenticationHeader(),
)
if not isUUID(request.lunaApiCredentials.accountId):
raise VLException(Error.BadAccountId, 400, isCriticalError=False)
return VerificationRequestKwargs(accountId=accountId)
return VerificationRequestKwargs()
[docs]
async def getRequestCredentials(
verificationKwargs: VerificationRequestKwargs, request: "ApiRequest", ignoreAuthError: bool = False
) -> RequestCredentials:
"""
Verify and get request credentials
Args:
verificationKwargs: keyword arguments for credentials verification request
request: api request
ignoreAuthError: whether to ignore authorization credentials verification error
Returns:
verified request credentials
Raises:
VLException(Error.CorruptedToken, 400, False) if failed to decode jwt token
"""
if verificationKwargs.isEmpty:
return RequestCredentials()
isNeedAuthCredentialsLog = True
creds = None
try:
creds = await request.app.ctx.lunaAuth.verifyCredentials(verificationKwargs)
except AuthorizationError as err:
isNeedAuthCredentialsLog = isinstance(err, AccountNotFoundError)
if not ignoreAuthError:
raise VLException(
Error.AuthorizationFailed.format(str(err)),
401,
False,
headers=getAuthenticationHeader(
sendBasicAuthInfo=verificationKwargs.login is not None,
setCookieHeader=bool(request.cookies.get("LUNA_AUTH_TOKEN")),
),
)
finally:
if isNeedAuthCredentialsLog:
verificationKwargs.accountId = creds.accountId if creds is not None else None
logWhoIsRequestBelong(request.logger, verificationKwargs=verificationKwargs)
if ignoreAuthError and not creds:
return RequestCredentials()
return creds
[docs]
def checkVerificationKwargs(request: "ApiRequest", verificationRequestKwargs: VerificationRequestKwargs) -> None:
"""
Check verification kwargs according due to request path, method and system configuration
Args:
request: api request
verificationRequestKwargs: verification kwargs
Raises:
VLException(Error.BadHeaderAuth, 401, False) if no verification kwargs specified, service allows to use
`Luna-Account-Id` header and request required credentials
VLException(Error.BadHeaderAuth, 401, False) if no verification kwargs specified, service not allows use
`Luna-Account-Id` header and request required credentials
VLException(Error.PermissionByTokenDenied, 403, False) if token kwargs specified, but request path and/or
method not allows access using token
VLException(Error.AccountIdAuthDisabled, 403, False) if `Luna-Account-Id` header if specified, but service
not allows use `Luna-Account-Id` header
"""
exclusiveBasicAuthRequiredRoutes = (f"/{request.app.ctx.apiVersion}/account",)
if verificationRequestKwargs.isEmpty:
if request.path in exclusiveBasicAuthRequiredRoutes:
raise VLException(Error.BadHeaderAuth, 401, False, headers=getAuthenticationHeader())
elif request.config.allowLunaAccountAuthHeader:
isWhiteListMethod = request.method in LunaAccountIdWhiteLists.methods
isWhiteListPath = bool(
request.path in LunaAccountIdWhiteLists.routes
or request.path in WhiteLists.routes
or re.match(WhiteLists.docsPathPattern, request.path)
or re.match(WhiteLists.lambdaDocsPathPattern, request.path),
)
if not (isWhiteListPath or isWhiteListMethod):
raise VLException(Error.BadHeaderAuth, 401, False, headers=getAuthenticationHeader())
else:
isWhiteListPath = bool(
request.path in WhiteLists.routes
or re.match(WhiteLists.docsPathPattern, request.path)
or re.match(WhiteLists.lambdaDocsPathPattern, request.path)
)
isWhiteListMethod = request.method in WhiteLists.methods
if not (isWhiteListPath or isWhiteListMethod):
raise VLException(Error.BadHeaderAuth, 401, False, headers=getAuthenticationHeader())
if (
verificationRequestKwargs.token is not None
and request.path in exclusiveBasicAuthRequiredRoutes
and request.method not in ("GET", "HEAD")
):
raise VLException(Error.PermissionByTokenDenied, 403, False)
if verificationRequestKwargs.accountId and not request.config.allowLunaAccountAuthHeader:
raise VLException(Error.AccountIdAuthDisabled, 403, False)
[docs]
def checkCredentialsRights(
verificationRequestKwargs: VerificationRequestKwargs, requestCredentials: RequestCredentials, request: "ApiRequest"
):
"""
Verify request credentials rights
Args:
verificationRequestKwargs: verification request kwargs (authorization data from headers)
requestCredentials: credentials for request
request: api request
Raises:
VLException(Error.AccountTokenPermissionError, 403, False) if specified token supposed access to any data, but
account type is `user` (does not imply access to other accoounts data)
VLException(Error.AccountQueryPermissionError, 403, False) if account type is `user` but specified query
parameter `account_id` (imply as data filter in other luna-* services) does not match the account id
from authorization/`Luna-Account-Id` header
"""
if verificationRequestKwargs.token is not None:
if (
requestCredentials.accountType == AccountType.user
and requestCredentials.visibilityArea == VisibilityArea.all
):
raise VLException(Error.AccountTokenPermissionError, 403, False)
if requestCredentials.accountType == AccountType.user:
if (queryAccountId := request.args.get("account_id")) is not None:
if queryAccountId != requestCredentials.accountId:
raise VLException(Error.AccountQueryPermissionError, 403, False)
[docs]
def morphRequest(
request: "ApiRequest", requestCredentials: RequestCredentials, verificationRequestKwargs: VerificationRequestKwargs
) -> None:
"""
Morph request query/json/headers according to request method and request credentials
Args:
request: api request
requestCredentials: verified request credentials
verificationRequestKwargs: verification request kwargs
"""
isMatchRequest = request.path.startswith(WhiteLists.matcherRoutesPrefix)
isVisibilityAreaIndependentRequest = request.method in ("GET", "HEAD") or isMatchRequest
if verificationRequestKwargs.isEmpty:
# case of `Luna-Account-Id` header auth to skip replacing account_id filter from query parameters
...
elif not isVisibilityAreaIndependentRequest or requestCredentials.visibilityArea is None:
# for backward compatibility with requests with `Luna-Account-Id` header
request.args["account_id"] = requestCredentials.accountId
elif requestCredentials.visibilityArea == VisibilityArea.account:
request.args["account_id"] = requestCredentials.accountId
elif not isVisibilityAreaIndependentRequest:
request.args["account_id"] = None
request.lunaApiCredentials = requestCredentials
[docs]
def ignoreAuthError(request: "ApiRequest") -> bool:
"""
Ignore auth error
Args:
request: request
Returns:
ignoreAuthError flag
"""
return (
request.path in AuthIgnoreList.routes
or request.path.startswith(AuthIgnoreList.uiPrefix)
or (
request.path == AuthIgnoreList.cookieLoginRoute and request.method in AuthIgnoreList.loginRouteNoAuthMethods
)
)
[docs]
async def authMiddleware(request: "ApiRequest"):
"""
Authorization middleware, check a request account id
Args:
request: request
"""
isAccountCreationRequest = request.path == WhiteLists.accountCreationRoute and request.method == "POST"
isWhiteListMethod = request.method in set(WhiteLists.methods).intersection(LunaAccountIdWhiteLists.methods)
if isAccountCreationRequest or isWhiteListMethod:
logWhoIsRequestBelong(request.logger, verificationKwargs=VerificationRequestKwargs())
return
verificationRequestKwargs: VerificationRequestKwargs = getVerificationRequestKwargs(request=request)
if verificationRequestKwargs.isEmpty:
logWhoIsRequestBelong(request.logger, verificationKwargs=verificationRequestKwargs)
checkVerificationKwargs(verificationRequestKwargs=verificationRequestKwargs, request=request)
requestCredentials = await getRequestCredentials(
verificationKwargs=verificationRequestKwargs, request=request, ignoreAuthError=ignoreAuthError(request)
)
checkCredentialsRights(
verificationRequestKwargs=verificationRequestKwargs, requestCredentials=requestCredentials, request=request
)
morphRequest(
request=request, requestCredentials=requestCredentials, verificationRequestKwargs=verificationRequestKwargs
)
request.dataForMonitoring.tags.update({"account_id": requestCredentials.accountId})