diff --git a/quark/cache/redis_base.py b/quark/cache/redis_base.py index 8b5ec5c..458e06b 100644 --- a/quark/cache/redis_base.py +++ b/quark/cache/redis_base.py @@ -45,6 +45,10 @@ quark_opts = [ cfg.StrOpt("redis_db", default="0", help=("The database number to use")), + cfg.IntOpt("redis_min_other_sentinels", + default=2, + help=_("The minimum number of sentinels required for quorum." + " Set this to 0 if you only have one sentinel.")), cfg.FloatOpt("redis_socket_timeout", default=0.1, help=("Timeout for Redis socket operations"))] @@ -83,7 +87,8 @@ class ClientBase(object): sentinel_kwargs = TwiceRedis.DEFAULT_SENTINEL_KWARGS pool_kwargs['socket_timeout'] = CONF.QUARK.redis_socket_timeout pool_kwargs['socket_keepalive'] = False - sentinel_kwargs['min_other_sentinels'] = 2 + sentinel_kwargs['min_other_sentinels'] = \ + CONF.QUARK.redis_min_other_sentinels return TwiceRedis(master_name=CONF.QUARK.redis_sentinel_master, sentinels=sentinels, password=CONF.QUARK.redis_password, diff --git a/quark/db/api.py b/quark/db/api.py index e729404..b213c79 100644 --- a/quark/db/api.py +++ b/quark/db/api.py @@ -95,7 +95,7 @@ def _model_query(context, model, filters, fields=None): model_filters = [] eq_filters = ["address", "cidr", "deallocated", "ip_version", "service", "mac_address_range_id", "transaction_id", "lock_id", - "address_type", "completed"] + "address_type", "completed", "resource_id"] in_filters = ["device_id", "device_owner", "group_id", "id", "mac_address", "name", "network_id", "segment_id", "subnet_id", "used_by_tenant_id", "version", "project_id"] @@ -889,6 +889,19 @@ def dns_delete(context, dns): context.session.delete(dns) +def sg_gather_associated_ports(context, group): + """Gather all ports associated to security group. + + Returns: + * list, or None + """ + if not group: + return None + if not hasattr(group, "ports") or len(group.ports) <= 0: + return [] + return group.ports + + @scoped def security_group_find(context, **filters): query = context.session.query(models.SecurityGroup).options( @@ -1172,25 +1185,52 @@ def segment_allocation_range_delete(context, sa_range): @scoped def async_transaction_find(context, lock_mode=False, **filters): query = context.session.query(models.AsyncTransactions) + mod = models.AsyncTransactions if lock_mode: query = query.with_lockmode("update") - model_filters = _model_query( - context, models.AsyncTransactions, filters) + model_filters = _model_query(context, mod, filters) + + if 'transaction_id' in filters: + query = query.filter(or_( + mod.id == filters['transaction_id'], + and_(*model_filters))) + else: + query = query.filter(*model_filters) - query = query.filter(*model_filters) return query +def _check_transaction_completion(context, transaction, just_made=False): + # TODO(roaet): this pegs the DB pretty often and there may be a better way + completed = 0 + for sub in transaction.subtransactions: + if sub.get('completed'): + completed += 1 + if len(transaction.subtransactions) == completed: + completed = True + if just_made: + completed = False + data = {'completed': completed} + async_transaction_update(context, transaction, **data) + + def async_transaction_create(context, **transaction_dict): tx = models.AsyncTransactions() tx.update(transaction_dict) context.session.add(tx) + if tx.transaction_id is not None: + parent_tx = async_transaction_find(context, id=tx.transaction_id, + scope=ONE) + _check_transaction_completion(context, parent_tx, just_made=True) return tx def async_transaction_update(context, transaction, **kwargs): transaction.update(kwargs) + tx = transaction.transaction + if transaction.completed and tx is not None: + _check_transaction_completion(context, tx) context.session.add(transaction) return transaction diff --git a/quark/db/migration/alembic/versions/HEAD b/quark/db/migration/alembic/versions/HEAD index 90c3934..7483287 100644 --- a/quark/db/migration/alembic/versions/HEAD +++ b/quark/db/migration/alembic/versions/HEAD @@ -1 +1 @@ -79b768afed65 +da46a8b30bd8 diff --git a/quark/db/migration/alembic/versions/da46a8b30bd8_create_sub_async_transaction_relationship.py b/quark/db/migration/alembic/versions/da46a8b30bd8_create_sub_async_transaction_relationship.py new file mode 100644 index 0000000..9160d91 --- /dev/null +++ b/quark/db/migration/alembic/versions/da46a8b30bd8_create_sub_async_transaction_relationship.py @@ -0,0 +1,29 @@ +"""create sub async transaction relationship +Revision ID: da46a8b30bd8 +Revises: 79b768afed65 +Create Date: 2016-08-12 09:24:29.941684 + +""" + +# revision identifiers, used by Alembic. +revision = 'da46a8b30bd8' +down_revision = '79b768afed65' + +from alembic import op +import sqlalchemy as sa + + +def upgrade(): + op.add_column('quark_async_transactions', + sa.Column('status', sa.String(255), nullable=True)) + op.add_column('quark_async_transactions', + sa.Column('resource_id', sa.String(255), nullable=True)) + op.add_column('quark_async_transactions', + sa.Column('transaction_id', sa.String(255), nullable=True)) + op.add_column('quark_async_transactions', + sa.Column('parent_task_id', sa.String(255), nullable=True)) + + +def downgrade(): + op.drop_column('quark_async_transactions', 'resource_id') + op.drop_column('quark_async_transactions', 'status') diff --git a/quark/db/models.py b/quark/db/models.py index 06f1662..8475a78 100644 --- a/quark/db/models.py +++ b/quark/db/models.py @@ -595,3 +595,26 @@ class AsyncTransactions(BASEV2, HasId, HasTenant): __tablename__ = "quark_async_transactions" action = sa.Column(sa.String(255)) completed = sa.Column(sa.Boolean(), default=False) + status = sa.Column(sa.String(255)) + resource_id = sa.Column(sa.String(36)) + + parent_task_id = sa.Column( + sa.String(36), + sa.ForeignKey("quark_async_transactions.id", ondelete="cascade"), + nullable=True) + transaction_id = sa.Column( + sa.String(36), + sa.ForeignKey("quark_async_transactions.id", ondelete="cascade"), + nullable=True) + + parent = orm.relationship( + "AsyncTransactions", + backref=orm.backref("children", cascade="all, delete"), + foreign_keys=[parent_task_id], single_parent=True, + remote_side="AsyncTransactions.id") + + transaction = orm.relationship( + "AsyncTransactions", + backref=orm.backref("subtransactions", cascade="all, delete"), + foreign_keys=[transaction_id], single_parent=True, + remote_side="AsyncTransactions.id") diff --git a/quark/environment.py b/quark/environment.py index a84b85f..9f5ecf5 100644 --- a/quark/environment.py +++ b/quark/environment.py @@ -24,6 +24,7 @@ class Capabilities(object): EGRESS = "egress" TENANT_NETWORK_SG = "tenant_network_sg" IP_BILLING = "ip_billing" + SG_UPDATE_ASYNC = "security_groups_update_async" quark_opts = [ diff --git a/quark/plugin_modules/jobs.py b/quark/plugin_modules/jobs.py index 8fe6db7..6b1b815 100644 --- a/quark/plugin_modules/jobs.py +++ b/quark/plugin_modules/jobs.py @@ -25,6 +25,15 @@ CONF = cfg.CONF LOG = logging.getLogger(__name__) +def add_job_to_context(context, job_id): + """Adds job to neutron context for use later.""" + db_job = db_api.async_transaction_find( + context, id=job_id, scope=db_api.ONE) + if not db_job: + return + context.async_job = {"job": v._make_job_dict(db_job)} + + def get_jobs(context, **filters): LOG.info("get_jobs for tenant %s" % context.tenant_id) if not filters: @@ -44,14 +53,40 @@ def get_job(context, id): def create_job(context, body): + """Creates a job with support for subjobs. + + If parent_id is not in the body: + * the job is considered a parent job + * it will have a NULL transaction id + * its transaction id == its id + * all subjobs will use its transaction id as theirs + + Else: + * the job is a sub job + * the parent id is the id passed in + * the transaction id is the root of the job tree + """ LOG.info("create_job for tenant %s" % context.tenant_id) if not context.is_admin: raise n_exc.NotAuthorized() job = body.get('job') + if 'parent_id' in job: + parent_id = job['parent_id'] + if not parent_id: + raise q_exc.JobNotFound(job_id=parent_id) + parent_job = db_api.async_transaction_find( + context, id=parent_id, scope=db_api.ONE) + if not parent_job: + raise q_exc.JobNotFound(job_id=parent_id) + tid = parent_id + if parent_job.get('transaction_id'): + tid = parent_job.get('transaction_id') + job['transaction_id'] = tid + if not job: raise n_exc.BadRequest(resource="job", msg="Invalid request body.") - with context.session.begin(): + with context.session.begin(subtransactions=True): new_job = db_api.async_transaction_create(context, **job) return v._make_job_dict(new_job) diff --git a/quark/plugin_modules/ports.py b/quark/plugin_modules/ports.py index 8abf286..921bb60 100644 --- a/quark/plugin_modules/ports.py +++ b/quark/plugin_modules/ports.py @@ -478,6 +478,9 @@ def update_port(context, id, port): # a future patch where we have time to solve it well. kwargs = {} if new_security_groups is not None: + # TODO(anyone): this is kind of silly (when testing), because it will + # modify the incoming dict. Probably should be a copy or + # something. kwargs["security_groups"] = security_group_mods net_driver.update_port(context, port_id=port_db["backend_key"], mac_address=port_db["mac_address"], diff --git a/quark/plugin_modules/security_groups.py b/quark/plugin_modules/security_groups.py index 91e47e3..aad12d3 100644 --- a/quark/plugin_modules/security_groups.py +++ b/quark/plugin_modules/security_groups.py @@ -21,16 +21,21 @@ from oslo_log import log as logging from oslo_utils import uuidutils from quark.db import api as db_api -from quark.environment import Capabilities +from quark import environment as env from quark import exceptions as q_exc +from quark.plugin_modules import jobs as job_api from quark import plugin_views as v from quark import protocols +from quark.worker_plugins import sg_update_worker as sg_rpc_api + CONF = cfg.CONF LOG = logging.getLogger(__name__) DEFAULT_SG_UUID = "00000000-0000-0000-0000-000000000000" GROUP_NAME_MAX_LENGTH = 255 GROUP_DESCRIPTION_MAX_LENGTH = 255 +RULE_CREATE = 'create' +RULE_DELETE = 'delete' def _validate_security_group_rule(context, rule): @@ -40,17 +45,20 @@ def _validate_security_group_rule(context, rule): error_message="Remote groups are not currently supported") direction = rule.get("direction") - if direction == Capabilities.EGRESS: - if Capabilities.EGRESS not in CONF.QUARK.environment_capabilities: + if direction == env.Capabilities.EGRESS: + if env.Capabilities.EGRESS not in CONF.QUARK.environment_capabilities: raise q_exc.EgressSecurityGroupRulesNotEnabled() protocol = rule.pop('protocol') - port_range_min = rule['port_range_min'] - port_range_max = rule['port_range_max'] - ethertype = protocols.translate_ethertype(rule["ethertype"]) + # NOTE(roaet): these are not required by spec + port_range_min = rule.get('port_range_min') + port_range_max = rule.get('port_range_max') + # TODO(anyone): this will error as None, so defaulting to ipv4 + et = rule.get('ethertype', 'IPv4') + ethertype = protocols.translate_ethertype(et) if protocol: - protocol = protocols.translate_protocol(protocol, rule["ethertype"]) + protocol = protocols.translate_protocol(protocol, et) protocols.validate_protocol_with_port_ranges(ethertype, protocol, port_range_min, @@ -100,26 +108,17 @@ def create_security_group(context, security_group): return v._make_security_group_dict(dbgroup) -def create_security_group_rule(context, security_group_rule): - LOG.info("create_security_group for tenant %s" % - (context.tenant_id)) +def update_security_group(context, id, security_group): + if id == DEFAULT_SG_UUID: + raise sg_ext.SecurityGroupCannotUpdateDefault() + new_group = security_group["security_group"] + _validate_security_group(new_group) + with context.session.begin(): - rule = _validate_security_group_rule( - context, security_group_rule["security_group_rule"]) - rule["id"] = uuidutils.generate_uuid() + group = db_api.security_group_find(context, id=id, scope=db_api.ONE) + db_group = db_api.security_group_update(context, group, **new_group) - group_id = rule["security_group_id"] - group = db_api.security_group_find(context, id=group_id, - scope=db_api.ONE) - if not group: - raise sg_ext.SecurityGroupNotFound(id=group_id) - - quota.QUOTAS.limit_check( - context, context.tenant_id, - security_rules_per_group=len(group.get("rules", [])) + 1) - - new_rule = db_api.security_group_rule_create(context, **rule) - return v._make_security_group_rule_dict(new_rule) + return v._make_security_group_dict(db_group) def delete_security_group(context, id): @@ -139,7 +138,81 @@ def delete_security_group(context, id): db_api.security_group_delete(context, group) +def get_security_group(context, id, fields=None): + LOG.info("get_security_group %s for tenant %s" % + (id, context.tenant_id)) + group = db_api.security_group_find(context, id=id, scope=db_api.ONE) + if not group: + raise sg_ext.SecurityGroupNotFound(id=id) + return v._make_security_group_dict(group, fields) + + +def get_security_groups(context, filters=None, fields=None, + sorts=None, limit=None, marker=None, + page_reverse=False): + LOG.info("get_security_groups for tenant %s" % + (context.tenant_id)) + groups = db_api.security_group_find(context, **filters) + return [v._make_security_group_dict(group) for group in groups] + + +@env.has_capability(env.Capabilities.SG_UPDATE_ASYNC) +def _perform_async_update_rule(context, id, db_sg_group, rule_id, action): + """Updates a SG rule async and return the job information. + + Only happens if the security group has associated ports. If the async + connection fails the update continues (legacy mode). + """ + rpc_reply = None + sg_rpc = sg_rpc_api.QuarkSGAsyncProcessClient() + ports = db_api.sg_gather_associated_ports(context, db_sg_group) + if len(ports) > 0: + rpc_reply = sg_rpc.start_update(context, id, rule_id, action) + if rpc_reply: + job_id = rpc_reply['job_id'] + job_api.add_job_to_context(context, job_id) + else: + LOG.error("Async update failed. Is the worker running?") + + +def create_security_group_rule(context, security_group_rule): + """Creates a rule and updates the ports (async) if enabled.""" + LOG.info("create_security_group for tenant %s" % + (context.tenant_id)) + with context.session.begin(): + rule = _validate_security_group_rule( + context, security_group_rule["security_group_rule"]) + rule["id"] = uuidutils.generate_uuid() + + group_id = rule["security_group_id"] + group = db_api.security_group_find(context, id=group_id, + scope=db_api.ONE) + if not group: + raise sg_ext.SecurityGroupNotFound(id=group_id) + + quota.QUOTAS.limit_check( + context, context.tenant_id, + security_rules_per_group=len(group.get("rules", [])) + 1) + + new_rule = db_api.security_group_rule_create(context, **rule) + if group: + _perform_async_update_rule(context, group_id, group, new_rule.id, + RULE_CREATE) + return v._make_security_group_rule_dict(new_rule) + + +def get_security_group_rule(context, id, fields=None): + LOG.info("get_security_group_rule %s for tenant %s" % + (id, context.tenant_id)) + rule = db_api.security_group_rule_find(context, id=id, + scope=db_api.ONE) + if not rule: + raise sg_ext.SecurityGroupRuleNotFound(id=id) + return v._make_security_group_rule_dict(rule, fields) + + def delete_security_group_rule(context, id): + """Deletes a rule and updates the ports (async) if enabled.""" LOG.info("delete_security_group %s for tenant %s" % (id, context.tenant_id)) with context.session.begin(): @@ -155,34 +228,8 @@ def delete_security_group_rule(context, id): rule["id"] = id db_api.security_group_rule_delete(context, rule) - - -def get_security_group(context, id, fields=None): - LOG.info("get_security_group %s for tenant %s" % - (id, context.tenant_id)) - group = db_api.security_group_find(context, id=id, scope=db_api.ONE) - if not group: - raise sg_ext.SecurityGroupNotFound(id=id) - return v._make_security_group_dict(group, fields) - - -def get_security_group_rule(context, id, fields=None): - LOG.info("get_security_group_rule %s for tenant %s" % - (id, context.tenant_id)) - rule = db_api.security_group_rule_find(context, id=id, - scope=db_api.ONE) - if not rule: - raise sg_ext.SecurityGroupRuleNotFound(id=id) - return v._make_security_group_rule_dict(rule, fields) - - -def get_security_groups(context, filters=None, fields=None, - sorts=None, limit=None, marker=None, - page_reverse=False): - LOG.info("get_security_groups for tenant %s" % - (context.tenant_id)) - groups = db_api.security_group_find(context, **filters) - return [v._make_security_group_dict(group) for group in groups] + if group: + _perform_async_update_rule(context, group.id, group, id, RULE_DELETE) def get_security_group_rules(context, filters=None, fields=None, @@ -192,15 +239,3 @@ def get_security_group_rules(context, filters=None, fields=None, (context.tenant_id)) rules = db_api.security_group_rule_find(context, **filters) return [v._make_security_group_rule_dict(rule) for rule in rules] - - -def update_security_group(context, id, security_group): - if id == DEFAULT_SG_UUID: - raise sg_ext.SecurityGroupCannotUpdateDefault() - new_group = security_group["security_group"] - _validate_security_group(new_group) - - with context.session.begin(): - group = db_api.security_group_find(context, id=id, scope=db_api.ONE) - db_group = db_api.security_group_update(context, group, **new_group) - return v._make_security_group_dict(db_group) diff --git a/quark/plugin_views.py b/quark/plugin_views.py index 05d4b59..06b1fb8 100644 --- a/quark/plugin_views.py +++ b/quark/plugin_views.py @@ -362,8 +362,28 @@ def _make_scaling_ip_dict(flip): def _make_job_dict(job): - return {"id": job.get('id'), + """Creates the view for a job while calculating progress. + + Since a root job does not have a transaction id (TID) it will return its + id as the TID. + """ + body = {"id": job.get('id'), "action": job.get('action'), "completed": job.get('completed'), "tenant_id": job.get('tenant_id'), - "created_at": job.get('created_at')} + "created_at": job.get('created_at'), + "transaction_id": job.get('transaction_id'), + "parent_id": job.get('parent_id', None)} + if not body['transaction_id']: + body['transaction_id'] = job.get('id') + completed = 0 + for sub in job.subtransactions: + if sub.get('completed'): + completed += 1 + pct = 100 if job.get('completed') else 0 + if len(job.subtransactions) > 0: + pct = float(completed) / len(job.subtransactions) * 100.0 + body['transaction_percent'] = int(pct) + body['completed_subtransactions'] = completed + body['subtransactions'] = len(job.subtransactions) + return body diff --git a/quark/tests/functional/plugin_modules/test_jobs.py b/quark/tests/functional/plugin_modules/test_jobs.py index 09936d8..ac13cc9 100644 --- a/quark/tests/functional/plugin_modules/test_jobs.py +++ b/quark/tests/functional/plugin_modules/test_jobs.py @@ -39,6 +39,70 @@ class QuarkJobs(BaseFunctionalTest): self.assertFalse(job['completed']) self.assertEqual(self.tenant_id, job['tenant_id']) self.assertEqual(self.action, job['action']) + self.assertEqual(None, job['parent_id']) + self.assertEqual(job['id'], job['transaction_id']) + self.assertEqual(0, job['subtransactions']) + self.assertEqual(0, job['completed_subtransactions']) + + def test_add_job_to_context(self): + job_body = dict(tenant_id=self.tenant_id, action=self.action, + completed=False) + job_body = dict(job=job_body) + job = job_api.create_job(self.admin_context, job_body) + self.assertIsNotNone(job) + self.assertFalse(hasattr(self.admin_context, 'async_job')) + job_api.add_job_to_context(self.admin_context, job['id']) + self.assertTrue(hasattr(self.admin_context, 'async_job')) + self.assertEqual(self.admin_context.async_job['job']['id'], + job['id']) + + def test_add_missing_job_to_context(self): + self.assertFalse(hasattr(self.admin_context, 'async_job')) + job_api.add_job_to_context(self.admin_context, 0) + self.assertFalse(hasattr(self.admin_context, 'async_job')) + + def test_create_job_with_parent_job(self): + job_body = dict(tenant_id=self.tenant_id, action=self.action, + completed=False) + job_body = dict(job=job_body) + parent_job = job_api.create_job(self.admin_context, job_body) + job_body = dict(tenant_id=self.tenant_id, action=self.action, + completed=False, parent_id=parent_job['id']) + job_body = dict(job=job_body) + job = job_api.create_job(self.admin_context, job_body) + self.assertIsNotNone(job) + self.assertFalse(job['completed']) + self.assertEqual(self.tenant_id, job['tenant_id']) + self.assertEqual(self.action, job['action']) + self.assertEqual(parent_job['id'], job['parent_id']) + self.assertEqual(parent_job['id'], job['transaction_id'], + "transaction id should be outer most parent id") + tx_job = job_api.get_job(self.admin_context, parent_job['id']) + self.assertEqual(1, tx_job['subtransactions']) + self.assertEqual(0, tx_job['completed_subtransactions']) + + def test_create_deep_job_list(self): + parent_job = None + transaction = None + for i in xrange(4): + job_body = dict(tenant_id=self.tenant_id, action=self.action, + completed=False) + if parent_job: + job_body['parent_id'] = parent_job + job_body = dict(job=job_body) + job = job_api.create_job(self.admin_context, job_body) + self.assertIsNotNone(job) + if parent_job: + self.assertEqual(parent_job, job['parent_id']) + if transaction is None: + self.assertIsNotNone(job['transaction_id']) + transaction = job['id'] + else: + self.assertEqual(transaction, job['transaction_id']) + parent_job = job['id'] + tx_job = job_api.get_job(self.admin_context, transaction) + self.assertEqual(3, tx_job['subtransactions']) + self.assertEqual(0, job['completed_subtransactions']) def test_create_job_fail_non_admin(self): job_body = dict(tenant_id=self.tenant_id, action=self.action, @@ -49,17 +113,24 @@ class QuarkJobs(BaseFunctionalTest): def test_get_jobs(self): job_body = dict(tenant_id=self.tenant_id, action=self.action, - completed=False) + completed=False, resource_id='foo') job_body = dict(job=job_body) job1 = job_api.create_job(self.admin_context, job_body) self.assertIsNotNone(job1) job_body = dict(tenant_id=self.tenant_id2, action=self.action, - completed=True) + completed=False, resource_id='bar') job_body = dict(job=job_body) job2 = job_api.create_job(self.admin_context, job_body) self.assertIsNotNone(job2) + job_body = dict(tenant_id=self.tenant_id2, action=self.action, + completed=False, resource_id='bar', + parent_id=job2['id']) + job_body = dict(job=job_body) + job3 = job_api.create_job(self.admin_context, job_body) + self.assertIsNotNone(job3) + jobs = job_api.get_job(self.admin_context, job1['id']) self.assertFalse(type(jobs) in [list, tuple]) job = jobs @@ -68,7 +139,7 @@ class QuarkJobs(BaseFunctionalTest): self.assertEqual(self.action, job['action']) job = job_api.get_job(self.admin_context, job2['id']) - self.assertTrue(job['completed']) + self.assertFalse(job['completed']) self.assertEqual(self.tenant_id2, job['tenant_id']) self.assertEqual(self.action, job['action']) @@ -77,15 +148,19 @@ class QuarkJobs(BaseFunctionalTest): jobs = job_api.get_jobs(self.admin_context) self.assertTrue(type(jobs) in [list, tuple]) + self.assertEqual(3, len(jobs)) + + jobs = job_api.get_jobs(self.admin_context, transaction_id=job2['id']) + self.assertTrue(type(jobs) in [list, tuple]) self.assertEqual(2, len(jobs)) jobs = job_api.get_jobs(self.admin_context, completed=True) self.assertTrue(type(jobs) in [list, tuple]) - self.assertEqual(1, len(jobs)) + self.assertEqual(0, len(jobs)) jobs = job_api.get_jobs(self.admin_context, completed=False) self.assertTrue(type(jobs) in [list, tuple]) - self.assertEqual(1, len(jobs)) + self.assertEqual(3, len(jobs)) jobs = job_api.get_jobs(self.admin_context, completed='hello') self.assertTrue(type(jobs) in [list, tuple]) @@ -99,6 +174,18 @@ class QuarkJobs(BaseFunctionalTest): self.assertTrue(type(jobs) in [list, tuple]) self.assertEqual(0, len(jobs)) + jobs = job_api.get_jobs(self.admin_context, resource_id='foo') + self.assertTrue(type(jobs) in [list, tuple]) + self.assertEqual(1, len(jobs)) + + jobs = job_api.get_jobs(self.admin_context, resource_id='bar') + self.assertTrue(type(jobs) in [list, tuple]) + self.assertEqual(2, len(jobs)) + + jobs = job_api.get_jobs(self.admin_context, resource_id='asdf') + self.assertTrue(type(jobs) in [list, tuple]) + self.assertEqual(0, len(jobs)) + def test_get_job_different_non_admin(self): job_body = dict(tenant_id=self.context.tenant_id, action=self.action, completed=False) @@ -185,6 +272,38 @@ class QuarkJobs(BaseFunctionalTest): with self.assertRaises(q_exc.JobNotFound): job_api.delete_job(self.admin_context, job1['id']) + def test_delete_job_with_children(self): + job_body = dict(tenant_id=self.tenant_id, action=self.action, + completed=False) + job_body = dict(job=job_body) + parent_job = job_api.create_job(self.admin_context, job_body) + parent_job = job_api.get_job(self.admin_context, parent_job['id']) + self.assertIsNotNone(parent_job) + job_body = dict(tenant_id=self.tenant_id, action=self.action, + completed=False, parent_id=parent_job['id']) + job_body = dict(job=job_body) + job = job_api.create_job(self.admin_context, job_body) + job = job_api.get_job(self.admin_context, job['id']) + self.assertIsNotNone(job) + + job_body = dict(tenant_id=self.tenant_id, action=self.action, + completed=False, parent_id=job['id']) + job_body = dict(job=job_body) + subjob = job_api.create_job(self.admin_context, job_body) + subjob = job_api.get_job(self.admin_context, subjob['id']) + self.assertIsNotNone(job) + + job_api.delete_job(self.admin_context, parent_job['id']) + + with self.assertRaises(q_exc.JobNotFound): + job_api.get_job(self.admin_context, parent_job['id']) + + with self.assertRaises(q_exc.JobNotFound): + job_api.get_job(self.admin_context, job['id']) + + with self.assertRaises(q_exc.JobNotFound): + job_api.get_job(self.admin_context, subjob['id']) + def test_delete_job_fail_non_admin(self): with self.assertRaises(n_exc.NotAuthorized): job_api.delete_job(self.context, 'derp') @@ -205,3 +324,51 @@ class QuarkJobs(BaseFunctionalTest): with self.assertRaises(q_exc.JobNotFound): job_api.get_job(self.context, job1['id']) + + def test_transaction_completion_percent(self): + job_body = dict(tenant_id=self.tenant_id, action=self.action, + completed=False) + job_body = dict(job=job_body) + parent_job = job_api.create_job(self.admin_context, job_body) + parent_job = job_api.get_job(self.admin_context, parent_job['id']) + self.assertIsNotNone(parent_job) + job_body = dict(tenant_id=self.tenant_id, action=self.action, + completed=False, parent_id=parent_job['id']) + job_body = dict(job=job_body) + job = job_api.create_job(self.admin_context, job_body) + job = job_api.get_job(self.admin_context, job['id']) + self.assertIsNotNone(job) + + job_body = dict(tenant_id=self.tenant_id, action=self.action, + completed=False, parent_id=job['id']) + job_body = dict(job=job_body) + subjob = job_api.create_job(self.admin_context, job_body) + subjob = job_api.get_job(self.admin_context, subjob['id']) + self.assertIsNotNone(job) + + parent_job = job_api.get_job(self.admin_context, parent_job['id']) + self.assertTrue('transaction_percent' in parent_job) + self.assertEqual(0, parent_job['transaction_percent']) + self.assertEqual(2, parent_job['subtransactions']) + self.assertEqual(0, parent_job['completed_subtransactions']) + + update_body = dict(completed=True) + update_body = dict(job=update_body) + + subjob = job_api.update_job(self.admin_context, subjob['id'], + update_body) + self.assertTrue(subjob['completed']) + + parent_job = job_api.get_job(self.admin_context, parent_job['id']) + self.assertEqual(50, parent_job['transaction_percent']) + self.assertEqual(2, parent_job['subtransactions']) + self.assertEqual(1, parent_job['completed_subtransactions']) + + job = job_api.update_job(self.admin_context, job['id'], update_body) + self.assertTrue(subjob['completed']) + + parent_job = job_api.get_job(self.admin_context, parent_job['id']) + self.assertEqual(100, parent_job['transaction_percent']) + self.assertEqual(2, parent_job['subtransactions']) + self.assertEqual(2, parent_job['completed_subtransactions']) + self.assertEqual(True, parent_job['completed']) diff --git a/quark/tests/functional/plugin_modules/test_security_groups.py b/quark/tests/functional/plugin_modules/test_security_groups.py new file mode 100644 index 0000000..2ade9b4 --- /dev/null +++ b/quark/tests/functional/plugin_modules/test_security_groups.py @@ -0,0 +1,252 @@ +# Copyright 2016 Rackspace Hosting Inc. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for# the specific language governing permissions and limitations +# under the License. + +import mock +# import netaddr + +import contextlib + +# from neutron.common import exceptions as q_exc +from oslo_config import cfg +from oslo_log import log as logging + +from quark.db import api as db_api +from quark import environment as env +import quark.plugin_modules.mac_address_ranges as macrng_api +import quark.plugin_modules.networks as network_api +import quark.plugin_modules.ports as port_api +import quark.plugin_modules.security_groups as sg_api +import quark.plugin_modules.subnets as subnet_api +from quark.tests.functional.base import BaseFunctionalTest + + +LOG = logging.getLogger(__name__) + + +class SecurityGroupsAsyncUpdateTests(BaseFunctionalTest): + def setUp(self): + super(SecurityGroupsAsyncUpdateTests, self).setUp() + env_set = [ + env.Capabilities.SECURITY_GROUPS, + env.Capabilities.TENANT_NETWORK_SG, + env.Capabilities.EGRESS, + env.Capabilities.SG_UPDATE_ASYNC + ] + override = ','.join(env_set) + self.old_override = cfg.CONF.QUARK.environment_capabilities + cfg.CONF.set_override("environment_capabilities", + override, + "QUARK") + self.dev_id = 0 + + def _get_devid(self): + self.dev_id += 1 + return self.dev_id + + def tearDown(self): + super(SecurityGroupsAsyncUpdateTests, self).tearDown() + cfg.CONF.set_override("environment_capabilities", + self.old_override, + "QUARK") + + def _make_body(self, net): + port_info = {"port": dict(network_id=net['id'], tenant_id="fake", + device_id=self._get_devid())} + return port_info + + def _get_assoc_ports(self, sgid): + db_sg = db_api.security_group_find( + self.context, id=sgid, scope=db_api.ONE) + self.assertIsNotNone(db_sg) + return db_api.sg_gather_associated_ports(self.context, db_sg) + + @contextlib.contextmanager + def _stubs(self, network_info, subnet_v4_info, subnet_v6_info=None): + cls = 'QuarkSGAsyncProcessClient' + mod_path = 'quark.worker_plugins.sg_update_worker.%s' % cls + job_path = 'quark.plugin_modules.jobs' + with contextlib.nested( + mock.patch("neutron.common.rpc.get_notifier"), + mock.patch("neutron.quota.QUOTAS.limit_check"), + mock.patch("%s.add_job_to_context" % job_path), + mock.patch("%s.start_update" % mod_path)) as \ + (notifier, limit_check, addjob, update): + self.context.is_admin = True + net = network_api.create_network(self.context, network_info) + mac = {'mac_address_range': dict(cidr="AA:BB:CC")} + macrng_api.create_mac_address_range(self.context, mac) + self.context.is_admin = False + subnet_v4_info['subnet']['network_id'] = net['id'] + sub_v4 = subnet_api.create_subnet(self.context, subnet_v4_info) + + yield net, sub_v4, update + + def test_gather_sg_ports(self): + """Checking if gather ports works as designed. """ + cidr = "192.168.1.0/24" + network = dict(id='1', name="public", tenant_id="make", + network_plugin="BASE", + ipam_strategy="ANY") + network = {"network": network} + subnet_v4 = dict(id='1', ip_version=4, cidr=cidr, + tenant_id="fake") + subnet_v4_info = {"subnet": subnet_v4} + + with self._stubs(network, subnet_v4_info) as (net, sub_v4, update): + port1 = port_api.create_port(self.context, self._make_body(net)) + self.assertIsNotNone(port1) + port2 = port_api.create_port(self.context, self._make_body(net)) + self.assertIsNotNone(port2) + + sg_body = dict(tenant_id="derp", name="test sg", + description="none") + sg_body = dict(security_group=sg_body) + + sg = sg_api.create_security_group(self.context, sg_body) + self.assertIsNotNone(sg) + sgid = sg['id'] + self.assertIsNotNone(sgid) + + assoc_ports = self._get_assoc_ports(sgid) + self.assertEqual(0, len(assoc_ports)) + + port_body = {'security_groups': [sgid]} + port_body = dict(port=port_body) + + port1 = port_api.update_port(self.context, port1['id'], port_body) + self.assertIsNotNone(port1) + + assoc_ports = self._get_assoc_ports(sgid) + self.assertEqual(1, len(assoc_ports)) + + # NOTE: this is duplicated because update_port modifies the params + port_body = {'security_groups': [sgid]} + port_body = dict(port=port_body) + + port2 = port_api.update_port(self.context, port2['id'], port_body) + self.assertIsNotNone(port2) + + assoc_ports = self._get_assoc_ports(sgid) + self.assertEqual(2, len(assoc_ports)) + + def test_env_caps_off_sg_async_update(self): + """This test ensures that envcaps off works as designed.""" + env_set = [ + env.Capabilities.SECURITY_GROUPS, + env.Capabilities.TENANT_NETWORK_SG, + env.Capabilities.EGRESS, + ] + override = ','.join(env_set) + old_override = cfg.CONF.QUARK.environment_capabilities + cfg.CONF.set_override("environment_capabilities", + override, + "QUARK") + cidr = "192.168.1.0/24" + network = dict(id='1', name="public", tenant_id="make", + network_plugin="BASE", + ipam_strategy="ANY") + network = {"network": network} + subnet_v4 = dict(id='1', ip_version=4, cidr=cidr, + tenant_id="fake") + subnet_v4_info = {"subnet": subnet_v4} + + try: + with self._stubs(network, subnet_v4_info) as (net, sub_v4, update): + port1 = port_api.create_port( + self.context, self._make_body(net)) + self.assertIsNotNone(port1) + + sg_body = dict(tenant_id="derp", name="test sg", + description="none") + sg_body = dict(security_group=sg_body) + + sg = sg_api.create_security_group(self.context, sg_body) + self.assertIsNotNone(sg) + sgid = sg['id'] + self.assertIsNotNone(sgid) + + port_body = {'security_groups': [sgid]} + port_body = dict(port=port_body) + + port1 = port_api.update_port(self.context, port1['id'], + port_body) + self.assertIsNotNone(port1) + + sgr_body = {'protocol': 'tcp', 'security_group_id': sgid, + 'tenant_id': "derp", + 'direction': 'ingress'} + sgr_body = dict(security_group_rule=sgr_body) + sgr = sg_api.create_security_group_rule(self.context, sgr_body) + self.assertIsNotNone(sgr) + self.assertFalse(update.called) + finally: + cfg.CONF.set_override("environment_capabilities", + old_override, + "QUARK") + + def test_env_caps_on_sg_async_update(self): + """This test ensures that envcaps on works as designed.""" + env_set = [ + env.Capabilities.SECURITY_GROUPS, + env.Capabilities.TENANT_NETWORK_SG, + env.Capabilities.EGRESS, + env.Capabilities.SG_UPDATE_ASYNC + ] + override = ','.join(env_set) + old_override = cfg.CONF.QUARK.environment_capabilities + cfg.CONF.set_override("environment_capabilities", + override, + "QUARK") + cidr = "192.168.1.0/24" + network = dict(id='1', name="public", tenant_id="make", + network_plugin="BASE", + ipam_strategy="ANY") + network = {"network": network} + subnet_v4 = dict(id='1', ip_version=4, cidr=cidr, + tenant_id="fake") + subnet_v4_info = {"subnet": subnet_v4} + + try: + with self._stubs(network, subnet_v4_info) as (net, sub_v4, update): + port1 = port_api.create_port( + self.context, self._make_body(net)) + self.assertIsNotNone(port1) + + sg_body = dict(tenant_id="derp", name="test sg", + description="none") + sg_body = dict(security_group=sg_body) + + sg = sg_api.create_security_group(self.context, sg_body) + self.assertIsNotNone(sg) + sgid = sg['id'] + self.assertIsNotNone(sgid) + + port_body = {'security_groups': [sgid]} + port_body = dict(port=port_body) + + port1 = port_api.update_port(self.context, port1['id'], + port_body) + + sgr_body = {'protocol': 'tcp', 'security_group_id': sgid, + 'tenant_id': "derp", + 'direction': 'ingress'} + sgr_body = dict(security_group_rule=sgr_body) + sgr = sg_api.create_security_group_rule(self.context, sgr_body) + self.assertIsNotNone(sgr) + self.assertTrue(update.called) + finally: + cfg.CONF.set_override("environment_capabilities", + old_override, + "QUARK") diff --git a/quark/tests/tools/test_resp_async_middleware.py b/quark/tests/tools/test_resp_async_middleware.py new file mode 100644 index 0000000..f8d7961 --- /dev/null +++ b/quark/tests/tools/test_resp_async_middleware.py @@ -0,0 +1,126 @@ +# Copyright (c) 2016 Rackspace Hosting Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import contextlib +import json +import mock + +from quark.tests import test_base +from quark.tools.middleware import resp_async_id as jobmw + + +mw_mock_path = 'quark.tools.middleware.resp_async_id.ResponseAsyncIdAdder' + + +class FakeResp(object): + def __init__(self, body): + self.body = body + self.headers = {} + + +class FakeContext(object): + def __init__(self, job): + self.async_job = job + + +class TestRespAsyncIDMiddleware(test_base.TestBase): + def setUp(self): + self.middleware_cls = jobmw.ResponseAsyncIdAdder + self.app = mock.Mock() + self.conf = {} + job = {'job': {'id': '3'}} + self.job_ctx = FakeContext(job) + self.no_ctx = {} + self.none_ctx = None + self.random_ctx = {'stuff': {'stuff': 'value'}} + + self.body = '{"something": {"attribute": "value"}}' + self.resp_return = FakeResp(self.body) + self.err_resp = FakeResp('asdf::') + + def test_middleware_instantiation(self): + self.assertIsNotNone(self.middleware_cls(self.app, self.conf)) + + mw = jobmw.filter_factory(self.conf)(self.app) + self.assertIsNotNone(mw) + + def test_mw_none_context(self): + mw = jobmw.filter_factory(self.conf)(self.app) + with contextlib.nested( + mock.patch('%s._get_resp' % mw_mock_path), + mock.patch('%s._get_ctx' % mw_mock_path)) as \ + (get_resp, get_ctx): + get_resp.return_value = self.resp_return + get_ctx.return_value = self.none_ctx + resp = mw.__call__.request('/', method='GET', body=self.body) + self.assertEqual(resp, self.resp_return) + self.assertEqual(self.body, resp.body) + self.assertFalse('job_id' in resp.body) + self.assertFalse('job_id' in resp.headers) + + def test_mw_empty_context(self): + mw = jobmw.filter_factory(self.conf)(self.app) + with contextlib.nested( + mock.patch('%s._get_resp' % mw_mock_path), + mock.patch('%s._get_ctx' % mw_mock_path)) as \ + (get_resp, get_ctx): + get_resp.return_value = self.resp_return + get_ctx.return_value = self.no_ctx + resp = mw.__call__.request('/', method='GET', body=self.body) + self.assertEqual(resp, self.resp_return) + self.assertEqual(self.body, resp.body) + self.assertFalse('job_id' in resp.body) + self.assertFalse('job_id' in resp.headers) + + def test_mw_missing_context(self): + mw = jobmw.filter_factory(self.conf)(self.app) + with contextlib.nested( + mock.patch('%s._get_resp' % mw_mock_path), + mock.patch('%s._get_ctx' % mw_mock_path)) as \ + (get_resp, get_ctx): + get_resp.return_value = self.resp_return + get_ctx.return_value = self.random_ctx + resp = mw.__call__.request('/', method='GET', body=self.body) + self.assertEqual(resp, self.resp_return) + self.assertEqual(self.body, resp.body) + self.assertFalse('job_id' in resp.body) + self.assertFalse('job_id' in resp.headers) + + def test_mw_modify_resp(self): + mw = jobmw.filter_factory(self.conf)(self.app) + with contextlib.nested( + mock.patch('%s._get_resp' % mw_mock_path), + mock.patch('%s._get_ctx' % mw_mock_path)) as \ + (get_resp, get_ctx): + get_resp.return_value = self.resp_return + get_ctx.return_value = self.job_ctx + resp = mw.__call__.request('/', method='GET', body=self.body) + self.assertEqual(resp, self.resp_return) + self.assertNotEqual(self.body, resp.body) + self.assertTrue('job_id' in resp.body) + self.assertTrue('job_id' in resp.headers) + + resp_json = json.loads(resp.body) + self.assertTrue('job_id' in resp_json) + + def test_mw_error_resp(self): + mw = jobmw.filter_factory(self.conf)(self.app) + with contextlib.nested( + mock.patch('%s._get_resp' % mw_mock_path), + mock.patch('%s._get_ctx' % mw_mock_path)) as \ + (get_resp, get_ctx): + get_resp.return_value = self.err_resp + get_ctx.return_value = self.job_ctx + resp = mw.__call__.request('/', method='GET', body=self.body) + self.assertEqual(resp, self.err_resp) diff --git a/quark/tools/async_worker.py b/quark/tools/async_worker.py index 1556be2..3ced67f 100644 --- a/quark/tools/async_worker.py +++ b/quark/tools/async_worker.py @@ -1,22 +1,38 @@ +# Copyright 2016 Rackspace Hosting Inc. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. import eventlet eventlet.monkey_patch(socket=True, select=True, time=True) +import inspect +import itertools import sys -import time from oslo_config import cfg from oslo_log import log as logging -import oslo_messaging as messaging from oslo_service import service as common_service from oslo_utils import excutils +from stevedore import extension from neutron._i18n import _ from neutron._i18n import _LE from neutron.common import config -from neutron.common import rpc as n_rpc from neutron.db import api as session from neutron import service +from quark.worker_plugins import base_worker + service_opts = [ cfg.StrOpt('topic', help=_('Topic for messaging to pub/sub to')), @@ -37,113 +53,99 @@ service_opts = [ CONF = cfg.CONF CONF.register_opts(service_opts, "QUARK_ASYNC") LOG = logging.getLogger(__name__) +VERSION = "1.0" +PLUGIN_EP = 'quark.worker_plugin' -class QuarkRpcTestCallback(object): - target = messaging.Target(version='1.0', namespace=None) - - def stuff(self, context, **kwargs): - return {"status": "okay"} - - -class QuarkAsyncPlugin(object): +class QuarkAsyncServer(object): def __init__(self): - pass + self.plugins = [] + self._register_extensions(VERSION) - def _setup_rpc(self): - self.endpoints = [QuarkRpcTestCallback()] + def _load_worker_plugin_with_module(self, module, version): + """Instantiates worker plugins that have requsite properties. - def start_rpc_listeners(self): - """Configure all listeners here""" - self._setup_rpc() - self.conn = n_rpc.create_connection() - self.conn.create_consumer(CONF.QUARK_ASYNC.topic, self.endpoints, - fanout=False) - return self.conn.consume_in_threads() + The required properties are: + * must have PLUGIN_EP entrypoint registered (or it wouldn't be in the + list) + * must have class attribute versions (list) of supported RPC versions + * must subclass QuarkAsyncPluginBase + """ + classes = inspect.getmembers(module, inspect.isclass) + loaded = 0 + for cls_name, cls in classes: + if hasattr(cls, 'versions'): + if version not in cls.versions: + continue + else: + continue + if issubclass(cls, base_worker.QuarkAsyncPluginBase): + LOG.debug("Loading plugin %s" % cls_name) + plugin = cls() + self.plugins.append(plugin) + loaded += 1 + LOG.debug("Found %d possible plugins and loaded %d" % + (len(classes), loaded)) + + def _discover_via_entrypoints(self): + """Looks for modules with amtching entry points.""" + emgr = extension.ExtensionManager(PLUGIN_EP, invoke_on_load=False) + return ((ext.name, ext.plugin) for ext in emgr) + + def _register_extensions(self, version): + """Loads plugins that match the PLUGIN_EP entrypoint.""" + for name, module in itertools.chain(self._discover_via_entrypoints()): + self._load_worker_plugin_with_module(module, version) + + def serve_rpc(self): + """Launches configured # of workers per loaded plugin.""" + if cfg.CONF.QUARK_ASYNC.rpc_workers < 1: + cfg.CONF.set_override('rpc_workers', 1, "QUARK_ASYNC") + + try: + rpc = service.RpcWorker(self.plugins) + session.dispose() # probaby not needed, but maybe + launcher = common_service.ProcessLauncher(CONF, wait_interval=1.0) + launcher.launch_service(rpc, workers=CONF.QUARK_ASYNC.rpc_workers) + + return launcher + except Exception: + with excutils.save_and_reraise_exception(): + LOG.exception(_LE('Unrecoverable error: please check log for ' + 'details.')) + + def start_api_and_rpc_workers(self): + """Initializes eventlet and starts wait for workers to exit. + + Spawns the workers returned from serve_rpc + """ + pool = eventlet.GreenPool() + + quark_rpc = self.serve_rpc() + pool.spawn(quark_rpc.wait) + + pool.waitall() + + def run(self): + """Start of async worker process.""" + self.start_api_and_rpc_workers() -def serve_rpc(): - - if cfg.CONF.QUARK_ASYNC.rpc_workers < 1: - cfg.CONF.set_override('rpc_workers', 1, "QUARK_ASYNC") - - try: - plugins = [QuarkAsyncPlugin()] - rpc = service.RpcWorker(plugins) - session.dispose() # probaby not needed, but maybe - launcher = common_service.ProcessLauncher(CONF, wait_interval=1.0) - launcher.launch_service(rpc, workers=CONF.QUARK_ASYNC.rpc_workers) - - return launcher - except Exception: - with excutils.save_and_reraise_exception(): - LOG.exception(_LE('Unrecoverable error: please check log for ' - 'details.')) - - -def start_api_and_rpc_workers(): - pool = eventlet.GreenPool() - - quark_rpc = serve_rpc() - pool.spawn(quark_rpc.wait) - - pool.waitall() - - -def boot_server(server_func): - # the configuration will be read into the cfg.CONF global data structure +def main(): config.init(sys.argv[1:]) config.setup_logging() config.set_config_defaults() if not cfg.CONF.config_file: - sys.exit(_("ERROR: Unable to find configuration file via the default" - " search paths (~/.neutron/, ~/, /etc/neutron/, /etc/) and" - " the '--config-file' option!")) + sys.exit(_("ERROR: Unable to find configuration file via the" + " default search paths (~/.neutron/, ~/, /etc/neutron/," + " /etc/) and the '--config-file' option!")) try: - server_func() + QuarkAsyncServer().run() except KeyboardInterrupt: pass except RuntimeError as e: sys.exit(_("ERROR: %s") % e) -def main(): - boot_server(start_api_and_rpc_workers) - - -class QuarkRpcTestApi(object): - """This class is used for testing QuarkRpcTestCallback.""" - def __init__(self): - target = messaging.Target(topic=CONF.QUARK_ASYNC.topic) - self.client = n_rpc.get_client(target) - - def stuff(self, context): - cctxt = self.client.prepare(version='1.0') - return cctxt.call(context, 'stuff') - - -class QuarkAsyncTestContext(object): - """This class is used for testing QuarkRpcTestCallback.""" - def __init__(self): - self.time = time.ctime() - - def to_dict(self): - return {"application": "rpc-client", "time": time.ctime()} - - -def test_main(): - config.init(sys.argv[1:]) - config.setup_logging() - config.set_config_defaults() - if not cfg.CONF.config_file: - sys.exit(_("ERROR: Unable to find configuration file via the default" - " search paths (~/.neutron/, ~/, /etc/neutron/, /etc/) and" - " the '--config-file' option!")) - context = QuarkAsyncTestContext() # typically context is neutron context - client = QuarkRpcTestApi() - LOG.info(client.stuff(context)) - time.sleep(0) # necessary for preventing Timeout exceptions - - if __name__ == "__main__": main() diff --git a/quark/tools/middleware/__init__.py b/quark/tools/middleware/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/quark/tools/middleware/resp_async_id.py b/quark/tools/middleware/resp_async_id.py new file mode 100644 index 0000000..5fe1075 --- /dev/null +++ b/quark/tools/middleware/resp_async_id.py @@ -0,0 +1,64 @@ +# Copyright (c) 2016 Rackspace Hosting Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json + +from neutron import wsgi +from oslo_log import log as logging +import webob.dec +import webob.exc + +LOG = logging.getLogger(__name__) + + +class ResponseAsyncIdAdder(object): + """Return a fake token if one isn't specified.""" + def __init__(self, app, conf): + self.app = app + self.conf = conf + + def _get_resp(self, req): + return req.get_response(self.app) + + def _get_ctx(self, req): + return req.environ.get('neutron.context') + + @webob.dec.wsgify(RequestClass=wsgi.Request) + def __call__(self, req): + """Attempts to put the job_id into the response body and header.""" + resp = self._get_resp(req) + context = self._get_ctx(req) + if hasattr(context, 'async_job'): + try: + json_body = json.loads(resp.body) + json_body['job_id'] = context.async_job['job']['id'] + resp.body = json.dumps(json_body) + resp.headers['job_id'] = context.async_job['job']['id'] + except ValueError: # bad json not abnormal + return resp + except Exception as e: # Bare exception for anything random + LOG.error("Uncaught exception: %s" % e) + return resp + + +def filter_factory(global_conf, **local_conf): + """Returns a WSGI filter app for use with paste.deploy.""" + conf = global_conf.copy() + conf.update(local_conf) + + def wrapper(app): + return ResponseAsyncIdAdder(app, conf) + + return wrapper diff --git a/quark/worker_plugins/__init__.py b/quark/worker_plugins/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/quark/worker_plugins/base_worker.py b/quark/worker_plugins/base_worker.py new file mode 100644 index 0000000..9f3aad7 --- /dev/null +++ b/quark/worker_plugins/base_worker.py @@ -0,0 +1,61 @@ +# Copyright 2016 Rackspace Hosting Inc. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +from neutron.common import rpc as n_rpc +from neutron import context +from oslo_config import cfg +from oslo_log import log as logging + + +CONF = cfg.CONF +LOG = logging.getLogger(__name__) +TEST_TOPIC = 'quark' + + +class QuarkAsyncPluginBase(object): + versions = [] + + def __init__(self, topic): + self._context = None + self.topic = topic + self.endpoints = [] + self.callbacks = [] + + def _setup_rpc(self): + """Registers callbacks to RPC assigned by subclasses. + + This does nothing if the subclasses do not set callbacks value. + """ + self.endpoints.extend(self.callbacks) + + def start_rpc_listeners(self): + """Configure all listeners here""" + self._setup_rpc() + if not self.endpoints: + return [] + self.conn = n_rpc.create_connection() + self.conn.create_consumer(self.topic, self.endpoints, + fanout=False) + return self.conn.consume_in_threads() + + @property + def context(self): + """Provides an admin context for workers.""" + if not self._context: + self._context = context.get_admin_context() + return self._context + + +def get_test_context(): + return context.get_admin_context() diff --git a/quark/worker_plugins/sample_worker.py b/quark/worker_plugins/sample_worker.py new file mode 100644 index 0000000..793da46 --- /dev/null +++ b/quark/worker_plugins/sample_worker.py @@ -0,0 +1,84 @@ +# Copyright 2016 Rackspace Hosting Inc. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +import sys +import time + +from neutron._i18n import _ +from neutron.common import config +from neutron.common import rpc as n_rpc +from oslo_config import cfg +from oslo_log import log as logging +import oslo_messaging as messaging + +from quark.plugin_modules import networks as network_api +from quark.worker_plugins import base_worker + + +CONF = cfg.CONF +LOG = logging.getLogger(__name__) +TEST_TOPIC = 'quark' +VERSION = "1.0" + + +class QuarkRpcTestCallback(object): + """The callbacks class defines the server-side methods to be called.""" + target = messaging.Target(version='1.0', namespace=None) + + def stuff(self, context, **kwargs): + LOG.debug(context) + networks = network_api.get_networks(context) + return {"networks": networks, "status": "okay"} + + +class QuarkAsyncPluginTest(base_worker.QuarkAsyncPluginBase): + """The worker plugin defines the server-side subscriber to be run. + + Each one of these plugins may be executed on multiple workers (and each + worker only runs a single plugin). + """ + versions = [VERSION] + TOPIC = "quark" + + def __init__(self, topic=TOPIC): + super(QuarkAsyncPluginTest, self).__init__(topic) + self.callbacks = [QuarkRpcTestCallback()] + + +class QuarkRpcTestApi(object): + """The API classes define the client-side methods to be called. + + These methods do not need to match the names of the remote calls as long + as the RPC call within match. + """ + def __init__(self, topic): + target = messaging.Target(topic=topic) + self.client = n_rpc.get_client(target) + + def stuff(self, context): + cctxt = self.client.prepare(version='1.0') + return cctxt.call(context, 'stuff') + + +def main(): + config.init(sys.argv[1:]) + config.setup_logging() + config.set_config_defaults() + if not cfg.CONF.config_file: + sys.exit(_("ERROR: Unable to find configuration file via the default" + " search paths (~/.neutron/, ~/, /etc/neutron/, /etc/) and" + " the '--config-file' option!")) + client = QuarkRpcTestApi(TEST_TOPIC) + LOG.info(client.stuff(base_worker.get_test_context())) + time.sleep(0) # necessary for preventing Timeout exceptions diff --git a/quark/worker_plugins/sg_update_worker.py b/quark/worker_plugins/sg_update_worker.py new file mode 100644 index 0000000..e6a57d5 --- /dev/null +++ b/quark/worker_plugins/sg_update_worker.py @@ -0,0 +1,232 @@ +# Copyright 2016 Rackspace Hosting Inc. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +import time + +from neutron.common import rpc as n_rpc +from oslo_config import cfg +from oslo_log import log as logging +import oslo_messaging as messaging +from oslo_messaging import exceptions as om_exc + +from quark.db import api as db_api +from quark.plugin_modules import jobs as job_api +from quark.plugin_modules import ports as port_api +from quark.worker_plugins import base_worker + + +CONF = cfg.CONF +LOG = logging.getLogger(__name__) +CONSUMER_TOPIC = 'quark_sg_update_consumer' +PRODUCER_TOPIC = 'quark_sg_update_producer' +SG_UPDATE_TOPIC = 'quark_sg_update' +VERSION = "1.0" + + +""" +Quark SG update has three components: + 1. AsyncProcess - begins the update process + 2. Producer - produces the 'update port' jobs + 3. Consumer - updates the ports on redis + +Quark SG update workflow is triggered on: + * rule create + * rule delete + +Quark SG update workflow: + 1. triggering event occurs and is completed successfully + 2. RPC call to create job and start async update process + a. RPC cast to start async update process (see: Async update workflow) + 3. Job information stored in context + 4. Response modified to include job information + +Async update workflow: + 1. process started from SG update trigger + 2. for each port associated with SG: + a. create job + b. RPC cast to perform update on port + i. update redis + ii. complete job and check root transaction status + +============================================================ +Component 1. QuarkSGAsyncProcess +============================================================ +""" + + +class QuarkSGAsyncProcessCallback(object): + target = messaging.Target(version='1.0', namespace=None) + + def update_sg(self, context, sg, rule_id, action): + """Begins the async update process.""" + db_sg = db_api.security_group_find(context, id=sg, scope=db_api.ONE) + if not db_sg: + return None + with context.session.begin(): + job_body = dict(action="%s sg rule %s" % (action, rule_id), + resource_id=rule_id, + tenant_id=db_sg['tenant_id']) + job_body = dict(job=job_body) + job = job_api.create_job(context, job_body) + rpc_client = QuarkSGAsyncProducerClient() + try: + rpc_client.populate_subtasks(context, sg, job['id']) + except om_exc.MessagingTimeout: + LOG.error("Failed to create subtasks. Rabbit running?") + return None + return {"job_id": job['id']} + + +class QuarkSGAsyncProcess(base_worker.QuarkAsyncPluginBase): + versions = [VERSION] + + def __init__(self, topic=SG_UPDATE_TOPIC): + super(QuarkSGAsyncProcess, self).__init__(topic) + self.callbacks = [QuarkSGAsyncProcessCallback()] + + +class QuarkSGAsyncProcessClient(object): + def __init__(self): + topic = SG_UPDATE_TOPIC + target = messaging.Target(topic=topic) + self.client = n_rpc.get_client(target) + + def start_update(self, context, sg, rule_id, action): + cctxt = self.client.prepare(version='1.0') + try: + return cctxt.call(context, 'update_sg', sg=sg, rule_id=rule_id, + action=action) + except om_exc.MessagingTimeout: + return None + + +""" +============================================================ +Component 2. QuarkSGAsyncProducer +============================================================ +""" + + +class QuarkSGProducerCallback(object): + target = messaging.Target(version='1.0', namespace=None) + + def populate_subtasks(self, context, sg, parent_job_id): + """Produces a list of ports to be updated async.""" + db_sg = db_api.security_group_find(context, id=sg, scope=db_api.ONE) + if not db_sg: + return None + ports = db_api.sg_gather_associated_ports(context, db_sg) + if len(ports) == 0: + return {"ports": 0} + for port in ports: + job_body = dict(action="update port %s" % port['id'], + tenant_id=db_sg['tenant_id'], + resource_id=port['id'], + parent_id=parent_job_id) + job_body = dict(job=job_body) + job = job_api.create_job(context, job_body) + rpc_consumer = QuarkSGAsyncConsumerClient() + try: + rpc_consumer.update_port(context, port['id'], job['id']) + except om_exc.MessagingTimeout: + # TODO(roaet): Not too sure what can be done here other than + # updating the job as a failure? + LOG.error("Failed to update port. Rabbit running?") + return None + + +class QuarkSGAsyncProducer(base_worker.QuarkAsyncPluginBase): + versions = [VERSION] + + def __init__(self, topic=PRODUCER_TOPIC): + super(QuarkSGAsyncProducer, self).__init__(topic) + self.callbacks = [QuarkSGProducerCallback()] + + +class QuarkSGAsyncProducerClient(object): + def __init__(self): + topic = PRODUCER_TOPIC + target = messaging.Target(topic=topic) + self.client = n_rpc.get_client(target) + + def populate_subtasks(self, context, sg, parent_job_id): + cctxt = self.client.prepare(version='1.0') + return cctxt.cast(context, 'populate_subtasks', sg=sg, + parent_job_id=parent_job_id) + + +""" +============================================================ +Component 3. QuarkSGAsyncConsumer +============================================================ +""" + + +class QuarkSGConsumerCallback(object): + target = messaging.Target(version='1.0', namespace=None) + + def update_ports_for_sg(self, context, portid, jobid): + """Updates the ports through redis.""" + port = db_api.port_find(context, id=portid, scope=db_api.ONE) + if not port: + LOG.warning("Port not found") + return + net_driver = port_api._get_net_driver(port.network, port=port) + base_net_driver = port_api._get_net_driver(port.network) + sg_list = [sg for sg in port.security_groups] + + success = False + error = None + retries = 3 + retry_delay = 2 + for retry in xrange(retries): + try: + net_driver.update_port(context, port_id=port["backend_key"], + mac_address=port["mac_address"], + device_id=port["device_id"], + base_net_driver=base_net_driver, + security_groups=sg_list) + success = True + error = None + break + except Exception as error: + LOG.warning("Could not connect to redis, but retrying soon") + time.sleep(retry_delay) + status_str = "" + if not success: + status_str = "Port %s update failed after %d tries. Error: %s" % ( + portid, retries, error) + update_body = dict(completed=True, status=status_str) + update_body = dict(job=update_body) + job_api.update_job(context, jobid, update_body) + + +class QuarkSGAsyncConsumer(base_worker.QuarkAsyncPluginBase): + versions = [VERSION] + + def __init__(self, topic=CONSUMER_TOPIC): + super(QuarkSGAsyncConsumer, self).__init__(topic) + self.callbacks = [QuarkSGConsumerCallback()] + + +class QuarkSGAsyncConsumerClient(object): + def __init__(self): + topic = CONSUMER_TOPIC + target = messaging.Target(topic=topic) + self.client = n_rpc.get_client(target) + + def update_port(self, context, portid, jobid): + cctxt = self.client.prepare(version='1.0') + return cctxt.cast(context, 'update_ports_for_sg', portid=portid, + jobid=jobid) diff --git a/requirements.txt b/requirements.txt index 5a0a0ff..094dfc7 100644 --- a/requirements.txt +++ b/requirements.txt @@ -14,6 +14,7 @@ docopt==0.6.2 gunicorn==19.3.0 pymysql==0.6.6 twiceredis>=2.0.0 +stevedore>=1.16.0 # Apache-2.0 # agent deps xenapi==1.2 diff --git a/setup.cfg b/setup.cfg index 624cc05..47f050f 100644 --- a/setup.cfg +++ b/setup.cfg @@ -22,8 +22,12 @@ console_scripts = gunicorn-neutron-server = quark.gunicorn_server:main quark-agent = quark.agent.agent:main quark-async-worker = quark.tools.async_worker:main - quark-async-tester = quark.tools.async_worker:test_main + quark-async-tester = quark.worker_plugins.sample_worker:main + quark-sg-tester = quark.worker_plugins.sg_update_worker:main ip_availability = quark.ip_availability:main redis_sg_tool = quark.tools.redis_sg_tool:main null_routes = quark.tools.null_routes:main insert_provider_subnets = quark.tools.insert_provider_subnets:main +quark.worker_plugin = + test_plugin = quark.worker_plugins.sample_worker + sg_update = quark.worker_plugins.sg_update_worker