Merge "Load full contexts from request headers"

This commit is contained in:
Zuul 2024-12-10 10:43:38 +00:00 committed by Gerrit Code Review
commit a599332fea
5 changed files with 41 additions and 45 deletions

View File

@ -356,7 +356,7 @@ class Alarm(base.Base):
"maximum": max_actions}
raise base.ClientSideError(error)
limited = rbac.get_limited_to_project(pecan.request.headers,
limited = rbac.get_limited_to_project(pecan.request,
pecan.request.enforcer)
for action in actions:
@ -568,7 +568,8 @@ class AlarmController(rest.RestController):
auth_project = pecan.request.headers.get('X-Project-Id')
filters = {'alarm_id': self._id}
if not rbac.is_admin(pecan.request.headers):
is_admin = rbac.is_admin(pecan.request, pecan.request.enforcer)
if not is_admin:
filters['project_id'] = auth_project
alarms = pecan.request.storage.get_alarms(**filters)
@ -578,7 +579,7 @@ class AlarmController(rest.RestController):
alarm = alarms[0]
target = {'user_id': alarm.user_id,
'project_id': alarm.project_id}
rbac.enforce(rbac_directive, pecan.request.headers,
rbac.enforce(rbac_directive, pecan.request,
pecan.request.enforcer, target)
return alarm
@ -652,7 +653,7 @@ class AlarmController(rest.RestController):
data.alarm_id = self._id
user, project = rbac.get_limited_to(pecan.request.headers,
user, project = rbac.get_limited_to(pecan.request,
pecan.request.enforcer)
if user:
data.user_id = user
@ -717,7 +718,7 @@ class AlarmController(rest.RestController):
# allow history to be returned for deleted alarms, but scope changes
# returned to those carried out on behalf of the auth'd tenant, to
# avoid inappropriate cross-tenant visibility of alarm history
auth_project = rbac.get_limited_to_project(pecan.request.headers,
auth_project = rbac.get_limited_to_project(pecan.request,
pecan.request.enforcer)
conn = pecan.request.storage
kwargs = v2_utils.query_to_kwargs(
@ -803,14 +804,14 @@ class AlarmsController(rest.RestController):
:param data: an alarm within the request body.
"""
rbac.enforce('create_alarm', pecan.request.headers,
rbac.enforce('create_alarm', pecan.request,
pecan.request.enforcer, {})
conn = pecan.request.storage
now = timeutils.utcnow()
data.alarm_id = uuidutils.generate_uuid()
user_limit, project_limit = rbac.get_limited_to(pecan.request.headers,
user_limit, project_limit = rbac.get_limited_to(pecan.request,
pecan.request.enforcer)
def _set_ownership(aspect, owner_limitation, header):
@ -864,8 +865,8 @@ class AlarmsController(rest.RestController):
:param marker: The pagination query marker.
"""
target = rbac.target_from_segregation_rule(
pecan.request.headers, pecan.request.enforcer)
rbac.enforce('get_alarms', pecan.request.headers,
pecan.request, pecan.request.enforcer)
rbac.enforce('get_alarms', pecan.request,
pecan.request.enforcer, target)
q = q or []
@ -888,12 +889,12 @@ class AlarmsController(rest.RestController):
if 'all_projects' in keys:
if v2_utils.get_query_value(q, 'all_projects', 'boolean'):
rbac.enforce('get_alarms:all_projects', pecan.request.headers,
rbac.enforce('get_alarms:all_projects', pecan.request,
pecan.request.enforcer, target)
keys.remove('all_projects')
else:
project_id = pecan.request.headers.get('X-Project-Id')
is_admin = rbac.is_admin(pecan.request.headers)
is_admin = rbac.is_admin(pecan.request, pecan.request.enforcer)
if not v2_utils.is_field_exist(q, 'project_id'):
q.append(

View File

@ -273,7 +273,7 @@ class ValidatedComplexQuery(object):
"and <visibility_field>=<tenant's project_id>" clause to the query.
"""
authorized_project = rbac.get_limited_to_project(
pecan.request.headers, pecan.request.enforcer)
pecan.request, pecan.request.enforcer)
is_admin = authorized_project is None
if not is_admin:
self._restrict_to_project(authorized_project, visibility_field)
@ -354,8 +354,8 @@ class QueryAlarmHistoryController(rest.RestController):
:param body: Query rules for the alarm history to be returned.
"""
target = rbac.target_from_segregation_rule(
pecan.request.headers, pecan.request.enforcer)
rbac.enforce('query_alarm_history', pecan.request.headers,
pecan.request, pecan.request.enforcer)
rbac.enforce('query_alarm_history', pecan.request,
pecan.request.enforcer, target)
query = ValidatedComplexQuery(body,
@ -380,8 +380,8 @@ class QueryAlarmsController(rest.RestController):
"""
target = rbac.target_from_segregation_rule(
pecan.request.headers, pecan.request.enforcer)
rbac.enforce('query_alarm', pecan.request.headers,
pecan.request, pecan.request.enforcer)
rbac.enforce('query_alarm', pecan.request,
pecan.request.enforcer, target)
query = ValidatedComplexQuery(body,

View File

@ -48,7 +48,7 @@ class QuotasController(rest.RestController):
"""
request_project = pecan.request.headers.get('X-Project-Id')
project_id = project_id if project_id else request_project
is_admin = rbac.is_admin(pecan.request.headers)
is_admin = rbac.is_admin(pecan.request, pecan.request.enforcer)
if project_id != request_project and not is_admin:
raise base.ProjectNotAuthorized(project_id)
@ -68,7 +68,7 @@ class QuotasController(rest.RestController):
@wsme_pecan.wsexpose(Quotas, body=Quotas, status_code=201)
def post(self, body):
"""Create or update quota."""
rbac.enforce('update_quotas', pecan.request.headers,
rbac.enforce('update_quotas', pecan.request,
pecan.request.enforcer, {})
params = body.to_dict()
@ -86,6 +86,6 @@ class QuotasController(rest.RestController):
@wsme_pecan.wsexpose(None, str, status_code=204)
def delete(self, project_id):
"""Delete quotas for the given project."""
rbac.enforce('delete_quotas', pecan.request.headers,
rbac.enforce('delete_quotas', pecan.request,
pecan.request.enforcer, {})
pecan.request.storage.delete_quotas(project_id)

View File

@ -39,7 +39,7 @@ def get_auth_project(on_behalf_of=None):
# Hence, for null auth_project (indicating admin-ness) we check if
# the creating tenant differs from the tenant on whose behalf the
# alarm is being created
auth_project = rbac.get_limited_to_project(pecan.request.headers,
auth_project = rbac.get_limited_to_project(pecan.request,
pecan.request.enforcer)
created_by = pecan.request.headers.get('X-Project-Id')
is_admin = auth_project is None
@ -76,7 +76,7 @@ def sanitize_query(query, db_func, on_behalf_of=None):
def _verify_query_segregation(query, auth_project=None):
"""Ensure non-admin queries are not constrained to another project."""
auth_project = (auth_project or
rbac.get_limited_to_project(pecan.request.headers,
rbac.get_limited_to_project(pecan.request,
pecan.request.enforcer))
if not auth_project:

View File

@ -17,44 +17,40 @@
"""Access Control Lists (ACL's) control access the API server."""
from oslo_context import context
import pecan
def target_from_segregation_rule(headers, enforcer):
def target_from_segregation_rule(req, enforcer):
"""Return a target corresponding to an alarm returned by segregation rule
This allows to use project_id: in an oslo_policy rule for query/listing.
:param headers: HTTP headers dictionary
:param req: Webob Request object
:param enforcer: policy enforcer
:returns: target
"""
project_id = get_limited_to_project(headers, enforcer)
project_id = get_limited_to_project(req, enforcer)
if project_id is not None:
return {'project_id': project_id}
return {}
def enforce(policy_name, headers, enforcer, target):
def enforce(policy_name, req, enforcer, target):
"""Return the user and project the request should be limited to.
:param policy_name: the policy name to validate authz against.
:param headers: HTTP headers dictionary
:param req: Webob Request object
:param enforcer: policy enforcer
:param target: the alarm or "auto" to
"""
rule_method = "telemetry:" + policy_name
ctxt = context.RequestContext.from_environ(req.environ)
credentials = {
'roles': headers.get('X-Roles', "").split(","),
'user_id': headers.get('X-User-Id'),
'project_id': headers.get('X-Project-Id'),
}
if not enforcer.enforce(rule_method, target, credentials):
if not enforcer.enforce(rule_method, target, ctxt.to_dict()):
pecan.core.abort(status_code=403,
detail='RBAC Authorization Failed')
@ -62,10 +58,10 @@ def enforce(policy_name, headers, enforcer, target):
# TODO(fabiog): these methods are still used because the scoping part is really
# convoluted and difficult to separate out.
def get_limited_to(headers, enforcer):
def get_limited_to(req, enforcer):
"""Return the user and project the request should be limited to.
:param headers: HTTP headers dictionary
:param req: Webob Request object
:param enforcer: policy enforcer
:return: A tuple of (user, project), set to None if there's no limit on
one of these.
@ -76,29 +72,28 @@ def get_limited_to(headers, enforcer):
# creating more enhanced rbac. But for now we enforce the
# scoping of request to the project-id, so...
target = {}
credentials = {
'roles': headers.get('X-Roles', "").split(","),
}
ctxt = context.RequestContext.from_environ(req.environ)
# maintain backward compat with Juno and previous by using context_is_admin
# rule if the segregation rule (added in Kilo) is not defined
rules = enforcer.rules.keys()
rule_name = 'segregation' if 'segregation' in rules else 'context_is_admin'
if not enforcer.enforce(rule_name, target, credentials):
return headers.get('X-User-Id'), headers.get('X-Project-Id')
if not enforcer.enforce(rule_name, target, ctxt.to_dict()):
return ctxt.user_id, ctxt.project_id
return None, None
def get_limited_to_project(headers, enforcer):
def get_limited_to_project(req, enforcer):
"""Return the project the request should be limited to.
:param headers: HTTP headers dictionary
:param req: Webob Request object
:param enforcer: policy enforcer
:return: A project, or None if there's no limit on it.
"""
return get_limited_to(headers, enforcer)[1]
return get_limited_to(req, enforcer)[1]
def is_admin(headers):
return 'admin' in headers.get('X-Roles', "").split(",")
def is_admin(req, enforcer):
ctxt = context.RequestContext.from_environ(req.environ)
return enforcer.enforce('context_is_admin', {}, ctxt.to_dict())