diff --git a/trio2o/api/controllers/pod.py b/trio2o/api/controllers/pod.py index 6814bfa..bb22442 100644 --- a/trio2o/api/controllers/pod.py +++ b/trio2o/api/controllers/pod.py @@ -27,6 +27,7 @@ import trio2o.common.context as t_context import trio2o.common.exceptions as t_exc from trio2o.common.i18n import _ from trio2o.common.i18n import _LE +from trio2o.common import policy from trio2o.common import utils from trio2o.db import api as db_api @@ -45,8 +46,8 @@ class PodsController(rest.RestController): def post(self, **kw): context = t_context.extract_context_from_environ() - if not t_context.is_admin_context(context): - pecan.abort(400, _('Admin role required to create pods')) + if not policy.enforce(context, policy.ADMIN_API_PODS_CREATE): + pecan.abort(401, _('Unauthorized to create pods')) return if 'pod' not in kw: @@ -129,8 +130,8 @@ class PodsController(rest.RestController): def get_one(self, _id): context = t_context.extract_context_from_environ() - if not t_context.is_admin_context(context): - pecan.abort(400, _('Admin role required to show pods')) + if not policy.enforce(context, policy.ADMIN_API_PODS_SHOW): + pecan.abort(401, _('Unauthorized to show pods')) return try: @@ -143,8 +144,8 @@ class PodsController(rest.RestController): def get_all(self): context = t_context.extract_context_from_environ() - if not t_context.is_admin_context(context): - pecan.abort(400, _('Admin role required to list pods')) + if not policy.enforce(context, policy.ADMIN_API_PODS_LIST): + pecan.abort(401, _('Unauthorized to list pods')) return try: @@ -160,8 +161,8 @@ class PodsController(rest.RestController): def delete(self, _id): context = t_context.extract_context_from_environ() - if not t_context.is_admin_context(context): - pecan.abort(400, _('Admin role required to delete pods')) + if not policy.enforce(context, policy.ADMIN_API_PODS_DELETE): + pecan.abort(401, _('Unauthorized to delete pods')) return try: @@ -174,7 +175,6 @@ class PodsController(rest.RestController): az_ag.delete_ag(context, ag['id']) core.delete_resource(context, models.Pod, _id) pecan.response.status = 200 - return {} except t_exc.ResourceNotFound: return Response(_('Pod not found'), 404) except Exception as e: @@ -212,8 +212,8 @@ class BindingsController(rest.RestController): def post(self, **kw): context = t_context.extract_context_from_environ() - if not t_context.is_admin_context(context): - pecan.abort(400, _('Admin role required to create bindings')) + if not policy.enforce(context, policy.ADMIN_API_BINDINGS_CREATE): + pecan.abort(401, _('Unauthorized to create bindings')) return if 'pod_binding' not in kw: @@ -272,8 +272,8 @@ class BindingsController(rest.RestController): def get_one(self, _id): context = t_context.extract_context_from_environ() - if not t_context.is_admin_context(context): - pecan.abort(400, _('Admin role required to show bindings')) + if not policy.enforce(context, policy.ADMIN_API_BINDINGS_SHOW): + pecan.abort(401, _('Unauthorized to show bindings')) return try: @@ -290,8 +290,8 @@ class BindingsController(rest.RestController): def get_all(self): context = t_context.extract_context_from_environ() - if not t_context.is_admin_context(context): - pecan.abort(400, _('Admin role required to list bindings')) + if not policy.enforce(context, policy.ADMIN_API_BINDINGS_LIST): + pecan.abort(401, _('Unauthorized to list bindings')) return try: @@ -309,15 +309,14 @@ class BindingsController(rest.RestController): def delete(self, _id): context = t_context.extract_context_from_environ() - if not t_context.is_admin_context(context): - pecan.abort(400, _('Admin role required to delete bindings')) + if not policy.enforce(context, policy.ADMIN_API_BINDINGS_DELETE): + pecan.abort(401, _('Unauthorized to delete bindings')) return try: with context.session.begin(): core.delete_resource(context, models.PodBinding, _id) pecan.response.status = 200 - return {} except t_exc.ResourceNotFound: pecan.abort(404, _('Pod binding not found')) return diff --git a/trio2o/common/config.py b/trio2o/common/config.py index 2d1622e..79873e5 100644 --- a/trio2o/common/config.py +++ b/trio2o/common/config.py @@ -16,35 +16,35 @@ """ Routines for configuring trio2o, largely copy from Neutron """ - import sys from oslo_config import cfg import oslo_log.log as logging +from oslo_policy import opts as policy_opts from trio2o.common.i18n import _LI -# from trio2o import policy +from trio2o.common import policy from trio2o.common import rpc from trio2o.common import version +logging.register_options(cfg.CONF) LOG = logging.getLogger(__name__) +policy_opts.set_defaults(cfg.CONF, 'policy.json') + def init(opts, args, **kwargs): # Register the configuration options cfg.CONF.register_opts(opts) - # ks_session.Session.register_conf_options(cfg.CONF) - # auth.register_conf_options(cfg.CONF) - logging.register_options(cfg.CONF) - cfg.CONF(args=args, project='trio2o', version=version.version_info, **kwargs) _setup_logging() + _setup_policy() rpc.init(cfg.CONF) @@ -60,11 +60,23 @@ def _setup_logging(): LOG.debug("command line: %s", " ".join(sys.argv)) +def _setup_policy(): + + # if there is valid policy file, use policy file by oslo_policy + # otherwise, use the default policy value in policy.py + policy_file = cfg.CONF.oslo_policy.policy_file + if policy_file and cfg.CONF.find_file(policy_file): + # just return here, oslo_policy lib will use policy file by itself + return + + policy.populate_default_rules() + + def reset_service(): # Reset worker in case SIGHUP is called. # Note that this is called only in case a service is running in # daemon mode. _setup_logging() - # TODO(zhiyuan) enforce policy later - # policy.refresh() + policy.reset() + _setup_policy() diff --git a/trio2o/common/context.py b/trio2o/common/context.py index 88882e3..2dbbf7e 100644 --- a/trio2o/common/context.py +++ b/trio2o/common/context.py @@ -76,7 +76,7 @@ class ContextBase(oslo_ctx.RequestContext): def __init__(self, auth_token=None, user_id=None, tenant_id=None, is_admin=False, read_deleted="no", request_id=None, overwrite=True, user_name=None, tenant_name=None, - quota_class=None, **kwargs): + quota_class=None, roles=None, **kwargs): """Initialize RequestContext. :param read_deleted: 'no' indicates deleted records are hidden, 'yes' @@ -105,6 +105,7 @@ class ContextBase(oslo_ctx.RequestContext): self.read_deleted = read_deleted self.nova_micro_version = kwargs.get('nova_micro_version', constants.NOVA_APIGW_MIN_VERSION) + self.roles = roles or [] def _get_read_deleted(self): return self._read_deleted @@ -128,7 +129,8 @@ class ContextBase(oslo_ctx.RequestContext): 'tenant_name': self.tenant_name, 'tenant_id': self.tenant_id, 'project_id': self.project_id, - 'quota_class': self.quota_class + 'quota_class': self.quota_class, + 'roles': self.roles }) return ctx_dict @@ -175,6 +177,7 @@ class Context(ContextBase): def elevated(self, read_deleted=None, overwrite=False): """Return a version of this context with admin flag set.""" ctx = copy.copy(self) + ctx.roles = copy.deepcopy(self.roles) ctx.is_admin = True if read_deleted is not None: diff --git a/trio2o/common/exceptions.py b/trio2o/common/exceptions.py index 979a5af..94096bf 100644 --- a/trio2o/common/exceptions.py +++ b/trio2o/common/exceptions.py @@ -123,6 +123,10 @@ class AdminRequired(NotAuthorized): message = _("User does not have admin privileges") +class PolicyNotAuthorized(NotAuthorized): + message = _("Policy doesn't allow this operation to be performed.") + + class InUse(Trio2oException): message = _("The resource is inuse") diff --git a/trio2o/common/policy.py b/trio2o/common/policy.py new file mode 100644 index 0000000..7cbb438 --- /dev/null +++ b/trio2o/common/policy.py @@ -0,0 +1,186 @@ +# Copyright (c) Huawei Technologies Co., Ltd. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +"""Policy Engine For Trio2o.""" + +# Policy controlled API access mainly for the Trio2o Admin API. Regarding +# to Nova API-GW and Cinder API-GW, the API access control should be done at +# bottom OpenStack as far as possible if the API request will be forwarded +# to bottom OpenStack directly for further processing; only these APIs which +# only can interact with database for example flavor and volume type, because +# these APIs processing will be terminated at the Trio2o layer, so policy +# control should be done by Nova API-GW or Cinder API-GW. No work is required +# to do in the Trio2o Neutron Plugin for Neutron API server is there, +# Neutron API server will be responsible for policy control. + + +from oslo_config import cfg +import oslo_log.log as logging +from oslo_policy import policy + +from trio2o.common import exceptions as t_exec +from trio2o.common.i18n import _LE + +_ENFORCER = None +CONF = cfg.CONF +LOG = logging.getLogger(__name__) + +default_policies = [ + policy.RuleDefault('context_is_admin', 'role:admin'), + policy.RuleDefault('admin_api', 'is_admin:True', + description='cloud admin allowed'), + policy.RuleDefault('admin_or_owner', + 'is_admin:True or project_id:%(project_id)s', + description='cloud admin or project owner allowed'), + policy.RuleDefault('default', 'rule:admin_or_owner'), +] + +ADMIN_API_PODS_CREATE = 'admin_api:pods:create' +ADMIN_API_PODS_DELETE = 'admin_api:pods:delete' +ADMIN_API_PODS_SHOW = 'admin_api:pods:show' +ADMIN_API_PODS_LIST = 'admin_api:pods:list' + +ADMIN_API_BINDINGS_CREATE = 'admin_api:bindings:create' +ADMIN_API_BINDINGS_DELETE = 'admin_api:bindings:delete' +ADMIN_API_BINDINGS_SHOW = 'admin_api:bindings:show' +ADMIN_API_BINDINGS_LIST = 'admin_api:bindings:list' + +trio2o_admin_api_policies = [ + policy.RuleDefault(ADMIN_API_PODS_CREATE, + 'rule:admin_api', + description='Create pod'), + policy.RuleDefault(ADMIN_API_PODS_DELETE, + 'rule:admin_api', + description='Delete pod'), + policy.RuleDefault(ADMIN_API_PODS_SHOW, + 'rule:admin_api', + description='Show pod detail'), + policy.RuleDefault(ADMIN_API_PODS_LIST, + 'rule:admin_api', + description='List pods'), + + policy.RuleDefault(ADMIN_API_BINDINGS_CREATE, + 'rule:admin_api', + description='Create pod binding'), + policy.RuleDefault(ADMIN_API_BINDINGS_DELETE, + 'rule:admin_api', + description='Delete pod binding'), + policy.RuleDefault(ADMIN_API_BINDINGS_SHOW, + 'rule:admin_api', + description='Show pod binding detail'), + policy.RuleDefault(ADMIN_API_BINDINGS_LIST, + 'rule:admin_api', + description='List pod bindings'), +] + + +def list_policies(): + policies = (default_policies + + trio2o_admin_api_policies) + return policies + + +# we can get a policy enforcer by this init. +# oslo policy supports change policy rule dynamically. +# at present, policy.enforce will reload the policy rules when it checks +# the policy file has been touched. +def init(policy_file=None, rules=None, + default_rule=None, use_conf=True, overwrite=True): + """Init an Enforcer class. + + :param policy_file: Custom policy file to use, if none is + specified, ``conf.policy_file`` will be + used. + :param rules: Default dictionary / Rules to use. It will be + considered just in the first instantiation. If + :meth:`load_rules` with ``force_reload=True``, + :meth:`clear` or :meth:`set_rules` with + ``overwrite=True`` is called this will be overwritten. + :param default_rule: Default rule to use, conf.default_rule will + be used if none is specified. + :param use_conf: Whether to load rules from cache or config file. + :param overwrite: Whether to overwrite existing rules when reload rules + from config file. + """ + global _ENFORCER + if not _ENFORCER: + # http://docs.openstack.org/developer/oslo.policy/usage.html + _ENFORCER = policy.Enforcer(CONF, + policy_file=policy_file, + rules=rules, + default_rule=default_rule, + use_conf=use_conf, + overwrite=overwrite) + _ENFORCER.register_defaults(list_policies()) + return _ENFORCER + + +def set_rules(rules, overwrite=True, use_conf=False): + """Set rules based on the provided dict of rules. + + :param rules: New rules to use. It should be an instance of dict. + :param overwrite: Whether to overwrite current rules or update them + with the new rules. + :param use_conf: Whether to reload rules from config file. + """ + init(use_conf=False) + _ENFORCER.set_rules(rules, overwrite, use_conf) + + +def populate_default_rules(): + reset() + init(use_conf=False) + dict_rules = {} + for default in list_policies(): + dict_rules[default.name] = default.check_str + rules = policy.Rules.from_dict(dict_rules) + set_rules(rules) + + +def reset(): + global _ENFORCER + if _ENFORCER: + _ENFORCER.clear() + _ENFORCER = None + + +def enforce(context, rule=None, target=None, *args, **kwargs): + """Check authorization of a rule against the target and credentials. + + :param dict context: As much information about the user performing the + action as possible. + :param rule: The rule to evaluate. + :param dict target: As much information about the object being operated + on as possible. + :return: ``True`` if the policy allows the action. + ``False`` if the policy does not allow the action. + """ + enforcer = init() + credentials = context.to_dict() + if target is None: + target = {'project_id': context.project_id, + 'user_id': context.user_id} + + exc = t_exec.PolicyNotAuthorized + + try: + result = enforcer.enforce(rule, target, credentials, + do_raise=True, exc=exc, *args, **kwargs) + + except t_exec.PolicyNotAuthorized as e: + result = False + LOG.exception(_LE("%(msg)s, %(rule)s, %(target)s"), + {'msg': str(e), 'rule': rule, 'target': target}) + return result diff --git a/trio2o/tests/functional/api/controllers/test_pod.py b/trio2o/tests/functional/api/controllers/test_pod.py index 344fc00..d97bf83 100644 --- a/trio2o/tests/functional/api/controllers/test_pod.py +++ b/trio2o/tests/functional/api/controllers/test_pod.py @@ -24,6 +24,7 @@ import oslo_db.exception as db_exc from trio2o.api import app from trio2o.common import az_ag from trio2o.common import context +from trio2o.common import policy from trio2o.common import utils from trio2o.db import core from trio2o.tests import base @@ -33,8 +34,14 @@ OPT_GROUP_NAME = 'keystone_authtoken' cfg.CONF.import_group(OPT_GROUP_NAME, "keystonemiddleware.auth_token") -def fake_is_admin(ctx): - return True +def fake_admin_context(): + context_paras = {'is_admin': True} + return context.Context(**context_paras) + + +def fake_non_admin_context(): + context_paras = {} + return context.Context(**context_paras) class API_FunctionalTest(base.TestCase): @@ -44,6 +51,7 @@ class API_FunctionalTest(base.TestCase): self.addCleanup(set_config, {}, overwrite=True) + cfg.CONF.clear() cfg.CONF.register_opts(app.common_opts) self.CONF = self.useFixture(fixture_config.Config()).conf @@ -56,6 +64,8 @@ class API_FunctionalTest(base.TestCase): self.context = context.get_admin_context() + policy.populate_default_rules() + self.app = self._make_app() def _make_app(self, enable_acl=False): @@ -78,13 +88,14 @@ class API_FunctionalTest(base.TestCase): cfg.CONF.unregister_opts(app.common_opts) pecan.set_config({}, overwrite=True) core.ModelBase.metadata.drop_all(core.get_engine()) + policy.reset() class TestPodController(API_FunctionalTest): """Test version listing on root URI.""" - @patch.object(context, 'is_admin_context', - new=fake_is_admin) + @patch.object(context, 'extract_context_from_environ', + new=fake_admin_context) def test_post_no_input(self): pods = [ # missing pod @@ -109,8 +120,8 @@ class TestPodController(API_FunctionalTest): def fake_create_ag_az(context, ag_name, az_name): raise db_exc.DBDuplicateEntry - @patch.object(context, 'is_admin_context', - new=fake_is_admin) + @patch.object(context, 'extract_context_from_environ', + new=fake_admin_context) @patch.object(az_ag, 'create_ag_az', new=fake_create_ag_az) def test_post_dup_db_exception(self): @@ -132,8 +143,8 @@ class TestPodController(API_FunctionalTest): def fake_create_ag_az_exp(context, ag_name, az_name): raise Exception - @patch.object(context, 'is_admin_context', - new=fake_is_admin) + @patch.object(context, 'extract_context_from_environ', + new=fake_admin_context) @patch.object(core, 'create_resource', new=fake_create_ag_az_exp) def test_post_exception(self): @@ -152,8 +163,8 @@ class TestPodController(API_FunctionalTest): self._test_and_check(pods) - @patch.object(context, 'is_admin_context', - new=fake_is_admin) + @patch.object(context, 'extract_context_from_environ', + new=fake_admin_context) def test_post_invalid_input(self): pods = [ @@ -230,8 +241,8 @@ class TestPodController(API_FunctionalTest): self._test_and_check(pods) - @patch.object(context, 'is_admin_context', - new=fake_is_admin) + @patch.object(context, 'extract_context_from_environ', + new=fake_admin_context) def test_post_duplicate_top_region(self): pods = [ @@ -261,8 +272,8 @@ class TestPodController(API_FunctionalTest): self._test_and_check(pods) - @patch.object(context, 'is_admin_context', - new=fake_is_admin) + @patch.object(context, 'extract_context_from_environ', + new=fake_admin_context) def test_post_duplicate_pod(self): pods = [ @@ -293,8 +304,8 @@ class TestPodController(API_FunctionalTest): self._test_and_check(pods) - @patch.object(context, 'is_admin_context', - new=fake_is_admin) + @patch.object(context, 'extract_context_from_environ', + new=fake_admin_context) def test_post_pod_duplicate_top_region(self): pods = [ @@ -336,8 +347,8 @@ class TestPodController(API_FunctionalTest): self.assertEqual(response.status_int, test_pod['expected_error']) - @patch.object(context, 'is_admin_context', - new=fake_is_admin) + @patch.object(context, 'extract_context_from_environ', + new=fake_admin_context) def test_get_all(self): pods = [ @@ -387,12 +398,9 @@ class TestPodController(API_FunctionalTest): self.assertIn('Pod1', response) self.assertIn('Pod2', response) - @patch.object(context, 'is_admin_context', - new=fake_is_admin) - @patch.object(context, 'extract_context_from_environ') - def test_get_delete_one(self, mock_context): - - mock_context.return_value = self.context + @patch.object(context, 'extract_context_from_environ', + new=fake_admin_context) + def test_get_delete_one(self): pods = [ @@ -480,12 +488,42 @@ class TestPodController(API_FunctionalTest): ag = az_ag.get_ag_by_name(self.context, ag_name) self.assertIsNone(ag) + @patch.object(context, 'extract_context_from_environ', + new=fake_non_admin_context) + def test_non_admin_action(self): + + pods = [ + { + "pod": + { + "pod_name": "Pod1", + "pod_az_name": "az1", + "dc_name": "dc2", + "az_name": "AZ1" + }, + "expected_error": 401, + }, + ] + self._test_and_check(pods) + + response = self.app.get('/v1.0/pods/1234567890', + expect_errors=True) + self.assertEqual(response.status_int, 401) + + response = self.app.get('/v1.0/pods', + expect_errors=True) + self.assertEqual(response.status_int, 401) + + response = self.app.delete('/v1.0/pods/1234567890', + expect_errors=True) + self.assertEqual(response.status_int, 401) + class TestBindingController(API_FunctionalTest): """Test version listing on root URI.""" - @patch.object(context, 'is_admin_context', - new=fake_is_admin) + @patch.object(context, 'extract_context_from_environ', + new=fake_admin_context) def test_post_no_input(self): pod_bindings = [ # missing pod_binding @@ -507,8 +545,8 @@ class TestBindingController(API_FunctionalTest): self.assertEqual(response.status_int, test_pod['expected_error']) - @patch.object(context, 'is_admin_context', - new=fake_is_admin) + @patch.object(context, 'extract_context_from_environ', + new=fake_admin_context) def test_post_invalid_input(self): pod_bindings = [ @@ -552,8 +590,8 @@ class TestBindingController(API_FunctionalTest): self._test_and_check(pod_bindings) - @patch.object(context, 'is_admin_context', - new=fake_is_admin) + @patch.object(context, 'extract_context_from_environ', + new=fake_admin_context) def test_bindings(self): pods = [ @@ -665,3 +703,31 @@ class TestBindingController(API_FunctionalTest): self.assertEqual(response.status_int, test_pod['expected_error']) + + @patch.object(context, 'extract_context_from_environ', + new=fake_non_admin_context) + def test_non_admin_action(self): + pod_bindings = [ + { + "pod_binding": + { + "tenant_id": "dddddd", + "pod_id": "0ace0db2-ef33-43a6-a150-42703ffda643" + }, + "expected_error": 401 + }, + ] + + self._test_and_check(pod_bindings) + + response = self.app.get('/v1.0/bindings/1234567890', + expect_errors=True) + self.assertEqual(response.status_int, 401) + + response = self.app.get('/v1.0/bindings', + expect_errors=True) + self.assertEqual(response.status_int, 401) + + response = self.app.delete('/v1.0/bindings/1234567890', + expect_errors=True) + self.assertEqual(response.status_int, 401) diff --git a/trio2o/tests/unit/api/controllers/test_pod.py b/trio2o/tests/unit/api/controllers/test_pod.py index 2c7daed..8c74b49 100644 --- a/trio2o/tests/unit/api/controllers/test_pod.py +++ b/trio2o/tests/unit/api/controllers/test_pod.py @@ -21,6 +21,7 @@ import pecan from trio2o.api.controllers import pod from trio2o.common import context +from trio2o.common import policy from trio2o.common import utils from trio2o.db import core from trio2o.db import models @@ -32,6 +33,7 @@ class PodsControllerTest(unittest.TestCase): core.ModelBase.metadata.create_all(core.get_engine()) self.controller = pod.PodsController() self.context = context.get_admin_context() + policy.populate_default_rules() @patch.object(context, 'extract_context_from_environ') def test_post_top_pod(self, mock_context): @@ -133,3 +135,4 @@ class PodsControllerTest(unittest.TestCase): def tearDown(self): core.ModelBase.metadata.drop_all(core.get_engine()) + policy.reset() diff --git a/trio2o/tests/unit/common/test_policy.py b/trio2o/tests/unit/common/test_policy.py new file mode 100644 index 0000000..6e34ccf --- /dev/null +++ b/trio2o/tests/unit/common/test_policy.py @@ -0,0 +1,123 @@ +# Copyright 2016 Huawei Technologies Co., Ltd. +# All Rights Reserved +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import unittest + +from oslo_policy import policy as oslo_policy + +from trio2o.common import context +from trio2o.common import policy + + +class PolicyTestCase(unittest.TestCase): + def setUp(self): + super(PolicyTestCase, self).setUp() + rules = oslo_policy.Rules.from_dict({ + "true": '@', + "example:allowed": '@', + "example:denied": "!", + "example:my_file": "role:admin or " + "project_id:%(project_id)s", + "example:early_and_fail": "! and @", + "example:early_or_success": "@ or !", + "example:lowercase_admin": "role:admin or role:sysadmin", + "example:uppercase_admin": "role:ADMIN or role:sysadmin", + }) + policy.reset() + policy.init() + policy.set_rules(rules) + self.context = context.Context(user_id='fake', + tenant_id='fake', + roles=['member']) + self.target = None + + def test_enforce_nonexistent_action_throws(self): + action = "example:non_exist" + result = policy.enforce(self.context, action, self.target) + self.assertEqual(result, False) + + def test_enforce_bad_action_throws(self): + action = "example:denied" + result = policy.enforce(self.context, action, self.target) + self.assertEqual(result, False) + + def test_enforce_good_action(self): + action = "example:allowed" + result = policy.enforce(self.context, action, self.target) + self.assertEqual(result, True) + + def test_templatized_enforcement(self): + target_mine = {'project_id': 'fake'} + target_not_mine = {'project_id': 'another'} + action = "example:my_file" + result = policy.enforce(self.context, action, target_mine) + self.assertEqual(result, True) + result = policy.enforce(self.context, action, target_not_mine) + self.assertEqual(result, False) + + def test_early_AND_enforcement(self): + action = "example:early_and_fail" + result = policy.enforce(self.context, action, self.target) + self.assertEqual(result, False) + + def test_early_OR_enforcement(self): + action = "example:early_or_success" + result = policy.enforce(self.context, action, self.target) + self.assertEqual(result, True) + + def test_ignore_case_role_check(self): + lowercase_action = "example:lowercase_admin" + uppercase_action = "example:uppercase_admin" + admin_context = context.Context(user_id='fake', + tenant_id='fake', + roles=['AdMiN']) + result = policy.enforce(admin_context, lowercase_action, self.target) + self.assertEqual(result, True) + result = policy.enforce(admin_context, uppercase_action, self.target) + self.assertEqual(result, True) + + +class DefaultPolicyTestCase(unittest.TestCase): + + def setUp(self): + super(DefaultPolicyTestCase, self).setUp() + + self.rules = oslo_policy.Rules.from_dict({ + "default": '', + "example:exist": "!", + }) + + self._set_rules('default') + + self.context = context.Context(user_id='fake', + tenant_id='fake') + + def _set_rules(self, default_rule): + policy.reset() + policy.init(rules=self.rules, default_rule=default_rule, + use_conf=False) + + def test_policy_called(self): + result = policy.enforce(self.context, "example:exist", {}) + self.assertEqual(result, False) + + def test_not_found_policy_calls_default(self): + result = policy.enforce(self.context, "example:noexist", {}) + self.assertEqual(result, True) + + def test_default_not_found(self): + self._set_rules("default_noexist") + result = policy.enforce(self.context, "example:noexist", {}) + self.assertEqual(result, False)