Work on making worker plugins
JIRA:NCP-1172 Fixed redis-base to use configurable min_sentinels Must enable environment variable for this to work SG Update workflow: - PUT to /security-groups - Parent task is made and sent to producer RPC - Creates the task id and then returns it to the main call - Then starts making sub tasks for each port associated with the SG - Each sub task will perform the required redis calls and update its provided subtask to completed (or error) - Returns normal SG body + task information Updated the workflow to use the rule_id as the resource_id for the TX Transactions now autocomplete after updating their subtransactions Conflicts: quark/db/migration/alembic/versions/HEAD Change-Id: I44545f2edb410d19e3156ee934dca283857c10d9
This commit is contained in:
parent
24d4d0a302
commit
60fb3f908a
7
quark/cache/redis_base.py
vendored
7
quark/cache/redis_base.py
vendored
@ -45,6 +45,10 @@ quark_opts = [
|
||||
cfg.StrOpt("redis_db",
|
||||
default="0",
|
||||
help=("The database number to use")),
|
||||
cfg.IntOpt("redis_min_other_sentinels",
|
||||
default=2,
|
||||
help=_("The minimum number of sentinels required for quorum."
|
||||
" Set this to 0 if you only have one sentinel.")),
|
||||
cfg.FloatOpt("redis_socket_timeout",
|
||||
default=0.1,
|
||||
help=("Timeout for Redis socket operations"))]
|
||||
@ -83,7 +87,8 @@ class ClientBase(object):
|
||||
sentinel_kwargs = TwiceRedis.DEFAULT_SENTINEL_KWARGS
|
||||
pool_kwargs['socket_timeout'] = CONF.QUARK.redis_socket_timeout
|
||||
pool_kwargs['socket_keepalive'] = False
|
||||
sentinel_kwargs['min_other_sentinels'] = 2
|
||||
sentinel_kwargs['min_other_sentinels'] = \
|
||||
CONF.QUARK.redis_min_other_sentinels
|
||||
return TwiceRedis(master_name=CONF.QUARK.redis_sentinel_master,
|
||||
sentinels=sentinels,
|
||||
password=CONF.QUARK.redis_password,
|
||||
|
@ -95,7 +95,7 @@ def _model_query(context, model, filters, fields=None):
|
||||
model_filters = []
|
||||
eq_filters = ["address", "cidr", "deallocated", "ip_version", "service",
|
||||
"mac_address_range_id", "transaction_id", "lock_id",
|
||||
"address_type", "completed"]
|
||||
"address_type", "completed", "resource_id"]
|
||||
in_filters = ["device_id", "device_owner", "group_id", "id", "mac_address",
|
||||
"name", "network_id", "segment_id", "subnet_id",
|
||||
"used_by_tenant_id", "version", "project_id"]
|
||||
@ -889,6 +889,19 @@ def dns_delete(context, dns):
|
||||
context.session.delete(dns)
|
||||
|
||||
|
||||
def sg_gather_associated_ports(context, group):
|
||||
"""Gather all ports associated to security group.
|
||||
|
||||
Returns:
|
||||
* list, or None
|
||||
"""
|
||||
if not group:
|
||||
return None
|
||||
if not hasattr(group, "ports") or len(group.ports) <= 0:
|
||||
return []
|
||||
return group.ports
|
||||
|
||||
|
||||
@scoped
|
||||
def security_group_find(context, **filters):
|
||||
query = context.session.query(models.SecurityGroup).options(
|
||||
@ -1172,25 +1185,52 @@ def segment_allocation_range_delete(context, sa_range):
|
||||
@scoped
|
||||
def async_transaction_find(context, lock_mode=False, **filters):
|
||||
query = context.session.query(models.AsyncTransactions)
|
||||
mod = models.AsyncTransactions
|
||||
if lock_mode:
|
||||
query = query.with_lockmode("update")
|
||||
|
||||
model_filters = _model_query(
|
||||
context, models.AsyncTransactions, filters)
|
||||
model_filters = _model_query(context, mod, filters)
|
||||
|
||||
if 'transaction_id' in filters:
|
||||
query = query.filter(or_(
|
||||
mod.id == filters['transaction_id'],
|
||||
and_(*model_filters)))
|
||||
else:
|
||||
query = query.filter(*model_filters)
|
||||
|
||||
query = query.filter(*model_filters)
|
||||
return query
|
||||
|
||||
|
||||
def _check_transaction_completion(context, transaction, just_made=False):
|
||||
# TODO(roaet): this pegs the DB pretty often and there may be a better way
|
||||
completed = 0
|
||||
for sub in transaction.subtransactions:
|
||||
if sub.get('completed'):
|
||||
completed += 1
|
||||
if len(transaction.subtransactions) == completed:
|
||||
completed = True
|
||||
if just_made:
|
||||
completed = False
|
||||
data = {'completed': completed}
|
||||
async_transaction_update(context, transaction, **data)
|
||||
|
||||
|
||||
def async_transaction_create(context, **transaction_dict):
|
||||
tx = models.AsyncTransactions()
|
||||
tx.update(transaction_dict)
|
||||
context.session.add(tx)
|
||||
if tx.transaction_id is not None:
|
||||
parent_tx = async_transaction_find(context, id=tx.transaction_id,
|
||||
scope=ONE)
|
||||
_check_transaction_completion(context, parent_tx, just_made=True)
|
||||
return tx
|
||||
|
||||
|
||||
def async_transaction_update(context, transaction, **kwargs):
|
||||
transaction.update(kwargs)
|
||||
tx = transaction.transaction
|
||||
if transaction.completed and tx is not None:
|
||||
_check_transaction_completion(context, tx)
|
||||
context.session.add(transaction)
|
||||
return transaction
|
||||
|
||||
|
@ -1 +1 @@
|
||||
79b768afed65
|
||||
da46a8b30bd8
|
||||
|
@ -0,0 +1,29 @@
|
||||
"""create sub async transaction relationship
|
||||
Revision ID: da46a8b30bd8
|
||||
Revises: 79b768afed65
|
||||
Create Date: 2016-08-12 09:24:29.941684
|
||||
|
||||
"""
|
||||
|
||||
# revision identifiers, used by Alembic.
|
||||
revision = 'da46a8b30bd8'
|
||||
down_revision = '79b768afed65'
|
||||
|
||||
from alembic import op
|
||||
import sqlalchemy as sa
|
||||
|
||||
|
||||
def upgrade():
|
||||
op.add_column('quark_async_transactions',
|
||||
sa.Column('status', sa.String(255), nullable=True))
|
||||
op.add_column('quark_async_transactions',
|
||||
sa.Column('resource_id', sa.String(255), nullable=True))
|
||||
op.add_column('quark_async_transactions',
|
||||
sa.Column('transaction_id', sa.String(255), nullable=True))
|
||||
op.add_column('quark_async_transactions',
|
||||
sa.Column('parent_task_id', sa.String(255), nullable=True))
|
||||
|
||||
|
||||
def downgrade():
|
||||
op.drop_column('quark_async_transactions', 'resource_id')
|
||||
op.drop_column('quark_async_transactions', 'status')
|
@ -595,3 +595,26 @@ class AsyncTransactions(BASEV2, HasId, HasTenant):
|
||||
__tablename__ = "quark_async_transactions"
|
||||
action = sa.Column(sa.String(255))
|
||||
completed = sa.Column(sa.Boolean(), default=False)
|
||||
status = sa.Column(sa.String(255))
|
||||
resource_id = sa.Column(sa.String(36))
|
||||
|
||||
parent_task_id = sa.Column(
|
||||
sa.String(36),
|
||||
sa.ForeignKey("quark_async_transactions.id", ondelete="cascade"),
|
||||
nullable=True)
|
||||
transaction_id = sa.Column(
|
||||
sa.String(36),
|
||||
sa.ForeignKey("quark_async_transactions.id", ondelete="cascade"),
|
||||
nullable=True)
|
||||
|
||||
parent = orm.relationship(
|
||||
"AsyncTransactions",
|
||||
backref=orm.backref("children", cascade="all, delete"),
|
||||
foreign_keys=[parent_task_id], single_parent=True,
|
||||
remote_side="AsyncTransactions.id")
|
||||
|
||||
transaction = orm.relationship(
|
||||
"AsyncTransactions",
|
||||
backref=orm.backref("subtransactions", cascade="all, delete"),
|
||||
foreign_keys=[transaction_id], single_parent=True,
|
||||
remote_side="AsyncTransactions.id")
|
||||
|
@ -24,6 +24,7 @@ class Capabilities(object):
|
||||
EGRESS = "egress"
|
||||
TENANT_NETWORK_SG = "tenant_network_sg"
|
||||
IP_BILLING = "ip_billing"
|
||||
SG_UPDATE_ASYNC = "security_groups_update_async"
|
||||
|
||||
|
||||
quark_opts = [
|
||||
|
@ -25,6 +25,15 @@ CONF = cfg.CONF
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def add_job_to_context(context, job_id):
|
||||
"""Adds job to neutron context for use later."""
|
||||
db_job = db_api.async_transaction_find(
|
||||
context, id=job_id, scope=db_api.ONE)
|
||||
if not db_job:
|
||||
return
|
||||
context.async_job = {"job": v._make_job_dict(db_job)}
|
||||
|
||||
|
||||
def get_jobs(context, **filters):
|
||||
LOG.info("get_jobs for tenant %s" % context.tenant_id)
|
||||
if not filters:
|
||||
@ -44,14 +53,40 @@ def get_job(context, id):
|
||||
|
||||
|
||||
def create_job(context, body):
|
||||
"""Creates a job with support for subjobs.
|
||||
|
||||
If parent_id is not in the body:
|
||||
* the job is considered a parent job
|
||||
* it will have a NULL transaction id
|
||||
* its transaction id == its id
|
||||
* all subjobs will use its transaction id as theirs
|
||||
|
||||
Else:
|
||||
* the job is a sub job
|
||||
* the parent id is the id passed in
|
||||
* the transaction id is the root of the job tree
|
||||
"""
|
||||
LOG.info("create_job for tenant %s" % context.tenant_id)
|
||||
|
||||
if not context.is_admin:
|
||||
raise n_exc.NotAuthorized()
|
||||
job = body.get('job')
|
||||
if 'parent_id' in job:
|
||||
parent_id = job['parent_id']
|
||||
if not parent_id:
|
||||
raise q_exc.JobNotFound(job_id=parent_id)
|
||||
parent_job = db_api.async_transaction_find(
|
||||
context, id=parent_id, scope=db_api.ONE)
|
||||
if not parent_job:
|
||||
raise q_exc.JobNotFound(job_id=parent_id)
|
||||
tid = parent_id
|
||||
if parent_job.get('transaction_id'):
|
||||
tid = parent_job.get('transaction_id')
|
||||
job['transaction_id'] = tid
|
||||
|
||||
if not job:
|
||||
raise n_exc.BadRequest(resource="job", msg="Invalid request body.")
|
||||
with context.session.begin():
|
||||
with context.session.begin(subtransactions=True):
|
||||
new_job = db_api.async_transaction_create(context, **job)
|
||||
return v._make_job_dict(new_job)
|
||||
|
||||
|
@ -478,6 +478,9 @@ def update_port(context, id, port):
|
||||
# a future patch where we have time to solve it well.
|
||||
kwargs = {}
|
||||
if new_security_groups is not None:
|
||||
# TODO(anyone): this is kind of silly (when testing), because it will
|
||||
# modify the incoming dict. Probably should be a copy or
|
||||
# something.
|
||||
kwargs["security_groups"] = security_group_mods
|
||||
net_driver.update_port(context, port_id=port_db["backend_key"],
|
||||
mac_address=port_db["mac_address"],
|
||||
|
@ -21,16 +21,21 @@ from oslo_log import log as logging
|
||||
from oslo_utils import uuidutils
|
||||
|
||||
from quark.db import api as db_api
|
||||
from quark.environment import Capabilities
|
||||
from quark import environment as env
|
||||
from quark import exceptions as q_exc
|
||||
from quark.plugin_modules import jobs as job_api
|
||||
from quark import plugin_views as v
|
||||
from quark import protocols
|
||||
|
||||
from quark.worker_plugins import sg_update_worker as sg_rpc_api
|
||||
|
||||
CONF = cfg.CONF
|
||||
LOG = logging.getLogger(__name__)
|
||||
DEFAULT_SG_UUID = "00000000-0000-0000-0000-000000000000"
|
||||
GROUP_NAME_MAX_LENGTH = 255
|
||||
GROUP_DESCRIPTION_MAX_LENGTH = 255
|
||||
RULE_CREATE = 'create'
|
||||
RULE_DELETE = 'delete'
|
||||
|
||||
|
||||
def _validate_security_group_rule(context, rule):
|
||||
@ -40,17 +45,20 @@ def _validate_security_group_rule(context, rule):
|
||||
error_message="Remote groups are not currently supported")
|
||||
|
||||
direction = rule.get("direction")
|
||||
if direction == Capabilities.EGRESS:
|
||||
if Capabilities.EGRESS not in CONF.QUARK.environment_capabilities:
|
||||
if direction == env.Capabilities.EGRESS:
|
||||
if env.Capabilities.EGRESS not in CONF.QUARK.environment_capabilities:
|
||||
raise q_exc.EgressSecurityGroupRulesNotEnabled()
|
||||
|
||||
protocol = rule.pop('protocol')
|
||||
port_range_min = rule['port_range_min']
|
||||
port_range_max = rule['port_range_max']
|
||||
ethertype = protocols.translate_ethertype(rule["ethertype"])
|
||||
# NOTE(roaet): these are not required by spec
|
||||
port_range_min = rule.get('port_range_min')
|
||||
port_range_max = rule.get('port_range_max')
|
||||
# TODO(anyone): this will error as None, so defaulting to ipv4
|
||||
et = rule.get('ethertype', 'IPv4')
|
||||
ethertype = protocols.translate_ethertype(et)
|
||||
|
||||
if protocol:
|
||||
protocol = protocols.translate_protocol(protocol, rule["ethertype"])
|
||||
protocol = protocols.translate_protocol(protocol, et)
|
||||
protocols.validate_protocol_with_port_ranges(ethertype,
|
||||
protocol,
|
||||
port_range_min,
|
||||
@ -100,26 +108,17 @@ def create_security_group(context, security_group):
|
||||
return v._make_security_group_dict(dbgroup)
|
||||
|
||||
|
||||
def create_security_group_rule(context, security_group_rule):
|
||||
LOG.info("create_security_group for tenant %s" %
|
||||
(context.tenant_id))
|
||||
def update_security_group(context, id, security_group):
|
||||
if id == DEFAULT_SG_UUID:
|
||||
raise sg_ext.SecurityGroupCannotUpdateDefault()
|
||||
new_group = security_group["security_group"]
|
||||
_validate_security_group(new_group)
|
||||
|
||||
with context.session.begin():
|
||||
rule = _validate_security_group_rule(
|
||||
context, security_group_rule["security_group_rule"])
|
||||
rule["id"] = uuidutils.generate_uuid()
|
||||
group = db_api.security_group_find(context, id=id, scope=db_api.ONE)
|
||||
db_group = db_api.security_group_update(context, group, **new_group)
|
||||
|
||||
group_id = rule["security_group_id"]
|
||||
group = db_api.security_group_find(context, id=group_id,
|
||||
scope=db_api.ONE)
|
||||
if not group:
|
||||
raise sg_ext.SecurityGroupNotFound(id=group_id)
|
||||
|
||||
quota.QUOTAS.limit_check(
|
||||
context, context.tenant_id,
|
||||
security_rules_per_group=len(group.get("rules", [])) + 1)
|
||||
|
||||
new_rule = db_api.security_group_rule_create(context, **rule)
|
||||
return v._make_security_group_rule_dict(new_rule)
|
||||
return v._make_security_group_dict(db_group)
|
||||
|
||||
|
||||
def delete_security_group(context, id):
|
||||
@ -139,7 +138,81 @@ def delete_security_group(context, id):
|
||||
db_api.security_group_delete(context, group)
|
||||
|
||||
|
||||
def get_security_group(context, id, fields=None):
|
||||
LOG.info("get_security_group %s for tenant %s" %
|
||||
(id, context.tenant_id))
|
||||
group = db_api.security_group_find(context, id=id, scope=db_api.ONE)
|
||||
if not group:
|
||||
raise sg_ext.SecurityGroupNotFound(id=id)
|
||||
return v._make_security_group_dict(group, fields)
|
||||
|
||||
|
||||
def get_security_groups(context, filters=None, fields=None,
|
||||
sorts=None, limit=None, marker=None,
|
||||
page_reverse=False):
|
||||
LOG.info("get_security_groups for tenant %s" %
|
||||
(context.tenant_id))
|
||||
groups = db_api.security_group_find(context, **filters)
|
||||
return [v._make_security_group_dict(group) for group in groups]
|
||||
|
||||
|
||||
@env.has_capability(env.Capabilities.SG_UPDATE_ASYNC)
|
||||
def _perform_async_update_rule(context, id, db_sg_group, rule_id, action):
|
||||
"""Updates a SG rule async and return the job information.
|
||||
|
||||
Only happens if the security group has associated ports. If the async
|
||||
connection fails the update continues (legacy mode).
|
||||
"""
|
||||
rpc_reply = None
|
||||
sg_rpc = sg_rpc_api.QuarkSGAsyncProcessClient()
|
||||
ports = db_api.sg_gather_associated_ports(context, db_sg_group)
|
||||
if len(ports) > 0:
|
||||
rpc_reply = sg_rpc.start_update(context, id, rule_id, action)
|
||||
if rpc_reply:
|
||||
job_id = rpc_reply['job_id']
|
||||
job_api.add_job_to_context(context, job_id)
|
||||
else:
|
||||
LOG.error("Async update failed. Is the worker running?")
|
||||
|
||||
|
||||
def create_security_group_rule(context, security_group_rule):
|
||||
"""Creates a rule and updates the ports (async) if enabled."""
|
||||
LOG.info("create_security_group for tenant %s" %
|
||||
(context.tenant_id))
|
||||
with context.session.begin():
|
||||
rule = _validate_security_group_rule(
|
||||
context, security_group_rule["security_group_rule"])
|
||||
rule["id"] = uuidutils.generate_uuid()
|
||||
|
||||
group_id = rule["security_group_id"]
|
||||
group = db_api.security_group_find(context, id=group_id,
|
||||
scope=db_api.ONE)
|
||||
if not group:
|
||||
raise sg_ext.SecurityGroupNotFound(id=group_id)
|
||||
|
||||
quota.QUOTAS.limit_check(
|
||||
context, context.tenant_id,
|
||||
security_rules_per_group=len(group.get("rules", [])) + 1)
|
||||
|
||||
new_rule = db_api.security_group_rule_create(context, **rule)
|
||||
if group:
|
||||
_perform_async_update_rule(context, group_id, group, new_rule.id,
|
||||
RULE_CREATE)
|
||||
return v._make_security_group_rule_dict(new_rule)
|
||||
|
||||
|
||||
def get_security_group_rule(context, id, fields=None):
|
||||
LOG.info("get_security_group_rule %s for tenant %s" %
|
||||
(id, context.tenant_id))
|
||||
rule = db_api.security_group_rule_find(context, id=id,
|
||||
scope=db_api.ONE)
|
||||
if not rule:
|
||||
raise sg_ext.SecurityGroupRuleNotFound(id=id)
|
||||
return v._make_security_group_rule_dict(rule, fields)
|
||||
|
||||
|
||||
def delete_security_group_rule(context, id):
|
||||
"""Deletes a rule and updates the ports (async) if enabled."""
|
||||
LOG.info("delete_security_group %s for tenant %s" %
|
||||
(id, context.tenant_id))
|
||||
with context.session.begin():
|
||||
@ -155,34 +228,8 @@ def delete_security_group_rule(context, id):
|
||||
|
||||
rule["id"] = id
|
||||
db_api.security_group_rule_delete(context, rule)
|
||||
|
||||
|
||||
def get_security_group(context, id, fields=None):
|
||||
LOG.info("get_security_group %s for tenant %s" %
|
||||
(id, context.tenant_id))
|
||||
group = db_api.security_group_find(context, id=id, scope=db_api.ONE)
|
||||
if not group:
|
||||
raise sg_ext.SecurityGroupNotFound(id=id)
|
||||
return v._make_security_group_dict(group, fields)
|
||||
|
||||
|
||||
def get_security_group_rule(context, id, fields=None):
|
||||
LOG.info("get_security_group_rule %s for tenant %s" %
|
||||
(id, context.tenant_id))
|
||||
rule = db_api.security_group_rule_find(context, id=id,
|
||||
scope=db_api.ONE)
|
||||
if not rule:
|
||||
raise sg_ext.SecurityGroupRuleNotFound(id=id)
|
||||
return v._make_security_group_rule_dict(rule, fields)
|
||||
|
||||
|
||||
def get_security_groups(context, filters=None, fields=None,
|
||||
sorts=None, limit=None, marker=None,
|
||||
page_reverse=False):
|
||||
LOG.info("get_security_groups for tenant %s" %
|
||||
(context.tenant_id))
|
||||
groups = db_api.security_group_find(context, **filters)
|
||||
return [v._make_security_group_dict(group) for group in groups]
|
||||
if group:
|
||||
_perform_async_update_rule(context, group.id, group, id, RULE_DELETE)
|
||||
|
||||
|
||||
def get_security_group_rules(context, filters=None, fields=None,
|
||||
@ -192,15 +239,3 @@ def get_security_group_rules(context, filters=None, fields=None,
|
||||
(context.tenant_id))
|
||||
rules = db_api.security_group_rule_find(context, **filters)
|
||||
return [v._make_security_group_rule_dict(rule) for rule in rules]
|
||||
|
||||
|
||||
def update_security_group(context, id, security_group):
|
||||
if id == DEFAULT_SG_UUID:
|
||||
raise sg_ext.SecurityGroupCannotUpdateDefault()
|
||||
new_group = security_group["security_group"]
|
||||
_validate_security_group(new_group)
|
||||
|
||||
with context.session.begin():
|
||||
group = db_api.security_group_find(context, id=id, scope=db_api.ONE)
|
||||
db_group = db_api.security_group_update(context, group, **new_group)
|
||||
return v._make_security_group_dict(db_group)
|
||||
|
@ -362,8 +362,28 @@ def _make_scaling_ip_dict(flip):
|
||||
|
||||
|
||||
def _make_job_dict(job):
|
||||
return {"id": job.get('id'),
|
||||
"""Creates the view for a job while calculating progress.
|
||||
|
||||
Since a root job does not have a transaction id (TID) it will return its
|
||||
id as the TID.
|
||||
"""
|
||||
body = {"id": job.get('id'),
|
||||
"action": job.get('action'),
|
||||
"completed": job.get('completed'),
|
||||
"tenant_id": job.get('tenant_id'),
|
||||
"created_at": job.get('created_at')}
|
||||
"created_at": job.get('created_at'),
|
||||
"transaction_id": job.get('transaction_id'),
|
||||
"parent_id": job.get('parent_id', None)}
|
||||
if not body['transaction_id']:
|
||||
body['transaction_id'] = job.get('id')
|
||||
completed = 0
|
||||
for sub in job.subtransactions:
|
||||
if sub.get('completed'):
|
||||
completed += 1
|
||||
pct = 100 if job.get('completed') else 0
|
||||
if len(job.subtransactions) > 0:
|
||||
pct = float(completed) / len(job.subtransactions) * 100.0
|
||||
body['transaction_percent'] = int(pct)
|
||||
body['completed_subtransactions'] = completed
|
||||
body['subtransactions'] = len(job.subtransactions)
|
||||
return body
|
||||
|
@ -39,6 +39,70 @@ class QuarkJobs(BaseFunctionalTest):
|
||||
self.assertFalse(job['completed'])
|
||||
self.assertEqual(self.tenant_id, job['tenant_id'])
|
||||
self.assertEqual(self.action, job['action'])
|
||||
self.assertEqual(None, job['parent_id'])
|
||||
self.assertEqual(job['id'], job['transaction_id'])
|
||||
self.assertEqual(0, job['subtransactions'])
|
||||
self.assertEqual(0, job['completed_subtransactions'])
|
||||
|
||||
def test_add_job_to_context(self):
|
||||
job_body = dict(tenant_id=self.tenant_id, action=self.action,
|
||||
completed=False)
|
||||
job_body = dict(job=job_body)
|
||||
job = job_api.create_job(self.admin_context, job_body)
|
||||
self.assertIsNotNone(job)
|
||||
self.assertFalse(hasattr(self.admin_context, 'async_job'))
|
||||
job_api.add_job_to_context(self.admin_context, job['id'])
|
||||
self.assertTrue(hasattr(self.admin_context, 'async_job'))
|
||||
self.assertEqual(self.admin_context.async_job['job']['id'],
|
||||
job['id'])
|
||||
|
||||
def test_add_missing_job_to_context(self):
|
||||
self.assertFalse(hasattr(self.admin_context, 'async_job'))
|
||||
job_api.add_job_to_context(self.admin_context, 0)
|
||||
self.assertFalse(hasattr(self.admin_context, 'async_job'))
|
||||
|
||||
def test_create_job_with_parent_job(self):
|
||||
job_body = dict(tenant_id=self.tenant_id, action=self.action,
|
||||
completed=False)
|
||||
job_body = dict(job=job_body)
|
||||
parent_job = job_api.create_job(self.admin_context, job_body)
|
||||
job_body = dict(tenant_id=self.tenant_id, action=self.action,
|
||||
completed=False, parent_id=parent_job['id'])
|
||||
job_body = dict(job=job_body)
|
||||
job = job_api.create_job(self.admin_context, job_body)
|
||||
self.assertIsNotNone(job)
|
||||
self.assertFalse(job['completed'])
|
||||
self.assertEqual(self.tenant_id, job['tenant_id'])
|
||||
self.assertEqual(self.action, job['action'])
|
||||
self.assertEqual(parent_job['id'], job['parent_id'])
|
||||
self.assertEqual(parent_job['id'], job['transaction_id'],
|
||||
"transaction id should be outer most parent id")
|
||||
tx_job = job_api.get_job(self.admin_context, parent_job['id'])
|
||||
self.assertEqual(1, tx_job['subtransactions'])
|
||||
self.assertEqual(0, tx_job['completed_subtransactions'])
|
||||
|
||||
def test_create_deep_job_list(self):
|
||||
parent_job = None
|
||||
transaction = None
|
||||
for i in xrange(4):
|
||||
job_body = dict(tenant_id=self.tenant_id, action=self.action,
|
||||
completed=False)
|
||||
if parent_job:
|
||||
job_body['parent_id'] = parent_job
|
||||
job_body = dict(job=job_body)
|
||||
job = job_api.create_job(self.admin_context, job_body)
|
||||
self.assertIsNotNone(job)
|
||||
if parent_job:
|
||||
self.assertEqual(parent_job, job['parent_id'])
|
||||
if transaction is None:
|
||||
self.assertIsNotNone(job['transaction_id'])
|
||||
transaction = job['id']
|
||||
else:
|
||||
self.assertEqual(transaction, job['transaction_id'])
|
||||
parent_job = job['id']
|
||||
tx_job = job_api.get_job(self.admin_context, transaction)
|
||||
self.assertEqual(3, tx_job['subtransactions'])
|
||||
self.assertEqual(0, job['completed_subtransactions'])
|
||||
|
||||
def test_create_job_fail_non_admin(self):
|
||||
job_body = dict(tenant_id=self.tenant_id, action=self.action,
|
||||
@ -49,17 +113,24 @@ class QuarkJobs(BaseFunctionalTest):
|
||||
|
||||
def test_get_jobs(self):
|
||||
job_body = dict(tenant_id=self.tenant_id, action=self.action,
|
||||
completed=False)
|
||||
completed=False, resource_id='foo')
|
||||
job_body = dict(job=job_body)
|
||||
job1 = job_api.create_job(self.admin_context, job_body)
|
||||
self.assertIsNotNone(job1)
|
||||
|
||||
job_body = dict(tenant_id=self.tenant_id2, action=self.action,
|
||||
completed=True)
|
||||
completed=False, resource_id='bar')
|
||||
job_body = dict(job=job_body)
|
||||
job2 = job_api.create_job(self.admin_context, job_body)
|
||||
self.assertIsNotNone(job2)
|
||||
|
||||
job_body = dict(tenant_id=self.tenant_id2, action=self.action,
|
||||
completed=False, resource_id='bar',
|
||||
parent_id=job2['id'])
|
||||
job_body = dict(job=job_body)
|
||||
job3 = job_api.create_job(self.admin_context, job_body)
|
||||
self.assertIsNotNone(job3)
|
||||
|
||||
jobs = job_api.get_job(self.admin_context, job1['id'])
|
||||
self.assertFalse(type(jobs) in [list, tuple])
|
||||
job = jobs
|
||||
@ -68,7 +139,7 @@ class QuarkJobs(BaseFunctionalTest):
|
||||
self.assertEqual(self.action, job['action'])
|
||||
|
||||
job = job_api.get_job(self.admin_context, job2['id'])
|
||||
self.assertTrue(job['completed'])
|
||||
self.assertFalse(job['completed'])
|
||||
self.assertEqual(self.tenant_id2, job['tenant_id'])
|
||||
self.assertEqual(self.action, job['action'])
|
||||
|
||||
@ -77,15 +148,19 @@ class QuarkJobs(BaseFunctionalTest):
|
||||
|
||||
jobs = job_api.get_jobs(self.admin_context)
|
||||
self.assertTrue(type(jobs) in [list, tuple])
|
||||
self.assertEqual(3, len(jobs))
|
||||
|
||||
jobs = job_api.get_jobs(self.admin_context, transaction_id=job2['id'])
|
||||
self.assertTrue(type(jobs) in [list, tuple])
|
||||
self.assertEqual(2, len(jobs))
|
||||
|
||||
jobs = job_api.get_jobs(self.admin_context, completed=True)
|
||||
self.assertTrue(type(jobs) in [list, tuple])
|
||||
self.assertEqual(1, len(jobs))
|
||||
self.assertEqual(0, len(jobs))
|
||||
|
||||
jobs = job_api.get_jobs(self.admin_context, completed=False)
|
||||
self.assertTrue(type(jobs) in [list, tuple])
|
||||
self.assertEqual(1, len(jobs))
|
||||
self.assertEqual(3, len(jobs))
|
||||
|
||||
jobs = job_api.get_jobs(self.admin_context, completed='hello')
|
||||
self.assertTrue(type(jobs) in [list, tuple])
|
||||
@ -99,6 +174,18 @@ class QuarkJobs(BaseFunctionalTest):
|
||||
self.assertTrue(type(jobs) in [list, tuple])
|
||||
self.assertEqual(0, len(jobs))
|
||||
|
||||
jobs = job_api.get_jobs(self.admin_context, resource_id='foo')
|
||||
self.assertTrue(type(jobs) in [list, tuple])
|
||||
self.assertEqual(1, len(jobs))
|
||||
|
||||
jobs = job_api.get_jobs(self.admin_context, resource_id='bar')
|
||||
self.assertTrue(type(jobs) in [list, tuple])
|
||||
self.assertEqual(2, len(jobs))
|
||||
|
||||
jobs = job_api.get_jobs(self.admin_context, resource_id='asdf')
|
||||
self.assertTrue(type(jobs) in [list, tuple])
|
||||
self.assertEqual(0, len(jobs))
|
||||
|
||||
def test_get_job_different_non_admin(self):
|
||||
job_body = dict(tenant_id=self.context.tenant_id, action=self.action,
|
||||
completed=False)
|
||||
@ -185,6 +272,38 @@ class QuarkJobs(BaseFunctionalTest):
|
||||
with self.assertRaises(q_exc.JobNotFound):
|
||||
job_api.delete_job(self.admin_context, job1['id'])
|
||||
|
||||
def test_delete_job_with_children(self):
|
||||
job_body = dict(tenant_id=self.tenant_id, action=self.action,
|
||||
completed=False)
|
||||
job_body = dict(job=job_body)
|
||||
parent_job = job_api.create_job(self.admin_context, job_body)
|
||||
parent_job = job_api.get_job(self.admin_context, parent_job['id'])
|
||||
self.assertIsNotNone(parent_job)
|
||||
job_body = dict(tenant_id=self.tenant_id, action=self.action,
|
||||
completed=False, parent_id=parent_job['id'])
|
||||
job_body = dict(job=job_body)
|
||||
job = job_api.create_job(self.admin_context, job_body)
|
||||
job = job_api.get_job(self.admin_context, job['id'])
|
||||
self.assertIsNotNone(job)
|
||||
|
||||
job_body = dict(tenant_id=self.tenant_id, action=self.action,
|
||||
completed=False, parent_id=job['id'])
|
||||
job_body = dict(job=job_body)
|
||||
subjob = job_api.create_job(self.admin_context, job_body)
|
||||
subjob = job_api.get_job(self.admin_context, subjob['id'])
|
||||
self.assertIsNotNone(job)
|
||||
|
||||
job_api.delete_job(self.admin_context, parent_job['id'])
|
||||
|
||||
with self.assertRaises(q_exc.JobNotFound):
|
||||
job_api.get_job(self.admin_context, parent_job['id'])
|
||||
|
||||
with self.assertRaises(q_exc.JobNotFound):
|
||||
job_api.get_job(self.admin_context, job['id'])
|
||||
|
||||
with self.assertRaises(q_exc.JobNotFound):
|
||||
job_api.get_job(self.admin_context, subjob['id'])
|
||||
|
||||
def test_delete_job_fail_non_admin(self):
|
||||
with self.assertRaises(n_exc.NotAuthorized):
|
||||
job_api.delete_job(self.context, 'derp')
|
||||
@ -205,3 +324,51 @@ class QuarkJobs(BaseFunctionalTest):
|
||||
|
||||
with self.assertRaises(q_exc.JobNotFound):
|
||||
job_api.get_job(self.context, job1['id'])
|
||||
|
||||
def test_transaction_completion_percent(self):
|
||||
job_body = dict(tenant_id=self.tenant_id, action=self.action,
|
||||
completed=False)
|
||||
job_body = dict(job=job_body)
|
||||
parent_job = job_api.create_job(self.admin_context, job_body)
|
||||
parent_job = job_api.get_job(self.admin_context, parent_job['id'])
|
||||
self.assertIsNotNone(parent_job)
|
||||
job_body = dict(tenant_id=self.tenant_id, action=self.action,
|
||||
completed=False, parent_id=parent_job['id'])
|
||||
job_body = dict(job=job_body)
|
||||
job = job_api.create_job(self.admin_context, job_body)
|
||||
job = job_api.get_job(self.admin_context, job['id'])
|
||||
self.assertIsNotNone(job)
|
||||
|
||||
job_body = dict(tenant_id=self.tenant_id, action=self.action,
|
||||
completed=False, parent_id=job['id'])
|
||||
job_body = dict(job=job_body)
|
||||
subjob = job_api.create_job(self.admin_context, job_body)
|
||||
subjob = job_api.get_job(self.admin_context, subjob['id'])
|
||||
self.assertIsNotNone(job)
|
||||
|
||||
parent_job = job_api.get_job(self.admin_context, parent_job['id'])
|
||||
self.assertTrue('transaction_percent' in parent_job)
|
||||
self.assertEqual(0, parent_job['transaction_percent'])
|
||||
self.assertEqual(2, parent_job['subtransactions'])
|
||||
self.assertEqual(0, parent_job['completed_subtransactions'])
|
||||
|
||||
update_body = dict(completed=True)
|
||||
update_body = dict(job=update_body)
|
||||
|
||||
subjob = job_api.update_job(self.admin_context, subjob['id'],
|
||||
update_body)
|
||||
self.assertTrue(subjob['completed'])
|
||||
|
||||
parent_job = job_api.get_job(self.admin_context, parent_job['id'])
|
||||
self.assertEqual(50, parent_job['transaction_percent'])
|
||||
self.assertEqual(2, parent_job['subtransactions'])
|
||||
self.assertEqual(1, parent_job['completed_subtransactions'])
|
||||
|
||||
job = job_api.update_job(self.admin_context, job['id'], update_body)
|
||||
self.assertTrue(subjob['completed'])
|
||||
|
||||
parent_job = job_api.get_job(self.admin_context, parent_job['id'])
|
||||
self.assertEqual(100, parent_job['transaction_percent'])
|
||||
self.assertEqual(2, parent_job['subtransactions'])
|
||||
self.assertEqual(2, parent_job['completed_subtransactions'])
|
||||
self.assertEqual(True, parent_job['completed'])
|
||||
|
252
quark/tests/functional/plugin_modules/test_security_groups.py
Normal file
252
quark/tests/functional/plugin_modules/test_security_groups.py
Normal file
@ -0,0 +1,252 @@
|
||||
# Copyright 2016 Rackspace Hosting Inc.
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for# the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import mock
|
||||
# import netaddr
|
||||
|
||||
import contextlib
|
||||
|
||||
# from neutron.common import exceptions as q_exc
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log as logging
|
||||
|
||||
from quark.db import api as db_api
|
||||
from quark import environment as env
|
||||
import quark.plugin_modules.mac_address_ranges as macrng_api
|
||||
import quark.plugin_modules.networks as network_api
|
||||
import quark.plugin_modules.ports as port_api
|
||||
import quark.plugin_modules.security_groups as sg_api
|
||||
import quark.plugin_modules.subnets as subnet_api
|
||||
from quark.tests.functional.base import BaseFunctionalTest
|
||||
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class SecurityGroupsAsyncUpdateTests(BaseFunctionalTest):
|
||||
def setUp(self):
|
||||
super(SecurityGroupsAsyncUpdateTests, self).setUp()
|
||||
env_set = [
|
||||
env.Capabilities.SECURITY_GROUPS,
|
||||
env.Capabilities.TENANT_NETWORK_SG,
|
||||
env.Capabilities.EGRESS,
|
||||
env.Capabilities.SG_UPDATE_ASYNC
|
||||
]
|
||||
override = ','.join(env_set)
|
||||
self.old_override = cfg.CONF.QUARK.environment_capabilities
|
||||
cfg.CONF.set_override("environment_capabilities",
|
||||
override,
|
||||
"QUARK")
|
||||
self.dev_id = 0
|
||||
|
||||
def _get_devid(self):
|
||||
self.dev_id += 1
|
||||
return self.dev_id
|
||||
|
||||
def tearDown(self):
|
||||
super(SecurityGroupsAsyncUpdateTests, self).tearDown()
|
||||
cfg.CONF.set_override("environment_capabilities",
|
||||
self.old_override,
|
||||
"QUARK")
|
||||
|
||||
def _make_body(self, net):
|
||||
port_info = {"port": dict(network_id=net['id'], tenant_id="fake",
|
||||
device_id=self._get_devid())}
|
||||
return port_info
|
||||
|
||||
def _get_assoc_ports(self, sgid):
|
||||
db_sg = db_api.security_group_find(
|
||||
self.context, id=sgid, scope=db_api.ONE)
|
||||
self.assertIsNotNone(db_sg)
|
||||
return db_api.sg_gather_associated_ports(self.context, db_sg)
|
||||
|
||||
@contextlib.contextmanager
|
||||
def _stubs(self, network_info, subnet_v4_info, subnet_v6_info=None):
|
||||
cls = 'QuarkSGAsyncProcessClient'
|
||||
mod_path = 'quark.worker_plugins.sg_update_worker.%s' % cls
|
||||
job_path = 'quark.plugin_modules.jobs'
|
||||
with contextlib.nested(
|
||||
mock.patch("neutron.common.rpc.get_notifier"),
|
||||
mock.patch("neutron.quota.QUOTAS.limit_check"),
|
||||
mock.patch("%s.add_job_to_context" % job_path),
|
||||
mock.patch("%s.start_update" % mod_path)) as \
|
||||
(notifier, limit_check, addjob, update):
|
||||
self.context.is_admin = True
|
||||
net = network_api.create_network(self.context, network_info)
|
||||
mac = {'mac_address_range': dict(cidr="AA:BB:CC")}
|
||||
macrng_api.create_mac_address_range(self.context, mac)
|
||||
self.context.is_admin = False
|
||||
subnet_v4_info['subnet']['network_id'] = net['id']
|
||||
sub_v4 = subnet_api.create_subnet(self.context, subnet_v4_info)
|
||||
|
||||
yield net, sub_v4, update
|
||||
|
||||
def test_gather_sg_ports(self):
|
||||
"""Checking if gather ports works as designed. """
|
||||
cidr = "192.168.1.0/24"
|
||||
network = dict(id='1', name="public", tenant_id="make",
|
||||
network_plugin="BASE",
|
||||
ipam_strategy="ANY")
|
||||
network = {"network": network}
|
||||
subnet_v4 = dict(id='1', ip_version=4, cidr=cidr,
|
||||
tenant_id="fake")
|
||||
subnet_v4_info = {"subnet": subnet_v4}
|
||||
|
||||
with self._stubs(network, subnet_v4_info) as (net, sub_v4, update):
|
||||
port1 = port_api.create_port(self.context, self._make_body(net))
|
||||
self.assertIsNotNone(port1)
|
||||
port2 = port_api.create_port(self.context, self._make_body(net))
|
||||
self.assertIsNotNone(port2)
|
||||
|
||||
sg_body = dict(tenant_id="derp", name="test sg",
|
||||
description="none")
|
||||
sg_body = dict(security_group=sg_body)
|
||||
|
||||
sg = sg_api.create_security_group(self.context, sg_body)
|
||||
self.assertIsNotNone(sg)
|
||||
sgid = sg['id']
|
||||
self.assertIsNotNone(sgid)
|
||||
|
||||
assoc_ports = self._get_assoc_ports(sgid)
|
||||
self.assertEqual(0, len(assoc_ports))
|
||||
|
||||
port_body = {'security_groups': [sgid]}
|
||||
port_body = dict(port=port_body)
|
||||
|
||||
port1 = port_api.update_port(self.context, port1['id'], port_body)
|
||||
self.assertIsNotNone(port1)
|
||||
|
||||
assoc_ports = self._get_assoc_ports(sgid)
|
||||
self.assertEqual(1, len(assoc_ports))
|
||||
|
||||
# NOTE: this is duplicated because update_port modifies the params
|
||||
port_body = {'security_groups': [sgid]}
|
||||
port_body = dict(port=port_body)
|
||||
|
||||
port2 = port_api.update_port(self.context, port2['id'], port_body)
|
||||
self.assertIsNotNone(port2)
|
||||
|
||||
assoc_ports = self._get_assoc_ports(sgid)
|
||||
self.assertEqual(2, len(assoc_ports))
|
||||
|
||||
def test_env_caps_off_sg_async_update(self):
|
||||
"""This test ensures that envcaps off works as designed."""
|
||||
env_set = [
|
||||
env.Capabilities.SECURITY_GROUPS,
|
||||
env.Capabilities.TENANT_NETWORK_SG,
|
||||
env.Capabilities.EGRESS,
|
||||
]
|
||||
override = ','.join(env_set)
|
||||
old_override = cfg.CONF.QUARK.environment_capabilities
|
||||
cfg.CONF.set_override("environment_capabilities",
|
||||
override,
|
||||
"QUARK")
|
||||
cidr = "192.168.1.0/24"
|
||||
network = dict(id='1', name="public", tenant_id="make",
|
||||
network_plugin="BASE",
|
||||
ipam_strategy="ANY")
|
||||
network = {"network": network}
|
||||
subnet_v4 = dict(id='1', ip_version=4, cidr=cidr,
|
||||
tenant_id="fake")
|
||||
subnet_v4_info = {"subnet": subnet_v4}
|
||||
|
||||
try:
|
||||
with self._stubs(network, subnet_v4_info) as (net, sub_v4, update):
|
||||
port1 = port_api.create_port(
|
||||
self.context, self._make_body(net))
|
||||
self.assertIsNotNone(port1)
|
||||
|
||||
sg_body = dict(tenant_id="derp", name="test sg",
|
||||
description="none")
|
||||
sg_body = dict(security_group=sg_body)
|
||||
|
||||
sg = sg_api.create_security_group(self.context, sg_body)
|
||||
self.assertIsNotNone(sg)
|
||||
sgid = sg['id']
|
||||
self.assertIsNotNone(sgid)
|
||||
|
||||
port_body = {'security_groups': [sgid]}
|
||||
port_body = dict(port=port_body)
|
||||
|
||||
port1 = port_api.update_port(self.context, port1['id'],
|
||||
port_body)
|
||||
self.assertIsNotNone(port1)
|
||||
|
||||
sgr_body = {'protocol': 'tcp', 'security_group_id': sgid,
|
||||
'tenant_id': "derp",
|
||||
'direction': 'ingress'}
|
||||
sgr_body = dict(security_group_rule=sgr_body)
|
||||
sgr = sg_api.create_security_group_rule(self.context, sgr_body)
|
||||
self.assertIsNotNone(sgr)
|
||||
self.assertFalse(update.called)
|
||||
finally:
|
||||
cfg.CONF.set_override("environment_capabilities",
|
||||
old_override,
|
||||
"QUARK")
|
||||
|
||||
def test_env_caps_on_sg_async_update(self):
|
||||
"""This test ensures that envcaps on works as designed."""
|
||||
env_set = [
|
||||
env.Capabilities.SECURITY_GROUPS,
|
||||
env.Capabilities.TENANT_NETWORK_SG,
|
||||
env.Capabilities.EGRESS,
|
||||
env.Capabilities.SG_UPDATE_ASYNC
|
||||
]
|
||||
override = ','.join(env_set)
|
||||
old_override = cfg.CONF.QUARK.environment_capabilities
|
||||
cfg.CONF.set_override("environment_capabilities",
|
||||
override,
|
||||
"QUARK")
|
||||
cidr = "192.168.1.0/24"
|
||||
network = dict(id='1', name="public", tenant_id="make",
|
||||
network_plugin="BASE",
|
||||
ipam_strategy="ANY")
|
||||
network = {"network": network}
|
||||
subnet_v4 = dict(id='1', ip_version=4, cidr=cidr,
|
||||
tenant_id="fake")
|
||||
subnet_v4_info = {"subnet": subnet_v4}
|
||||
|
||||
try:
|
||||
with self._stubs(network, subnet_v4_info) as (net, sub_v4, update):
|
||||
port1 = port_api.create_port(
|
||||
self.context, self._make_body(net))
|
||||
self.assertIsNotNone(port1)
|
||||
|
||||
sg_body = dict(tenant_id="derp", name="test sg",
|
||||
description="none")
|
||||
sg_body = dict(security_group=sg_body)
|
||||
|
||||
sg = sg_api.create_security_group(self.context, sg_body)
|
||||
self.assertIsNotNone(sg)
|
||||
sgid = sg['id']
|
||||
self.assertIsNotNone(sgid)
|
||||
|
||||
port_body = {'security_groups': [sgid]}
|
||||
port_body = dict(port=port_body)
|
||||
|
||||
port1 = port_api.update_port(self.context, port1['id'],
|
||||
port_body)
|
||||
|
||||
sgr_body = {'protocol': 'tcp', 'security_group_id': sgid,
|
||||
'tenant_id': "derp",
|
||||
'direction': 'ingress'}
|
||||
sgr_body = dict(security_group_rule=sgr_body)
|
||||
sgr = sg_api.create_security_group_rule(self.context, sgr_body)
|
||||
self.assertIsNotNone(sgr)
|
||||
self.assertTrue(update.called)
|
||||
finally:
|
||||
cfg.CONF.set_override("environment_capabilities",
|
||||
old_override,
|
||||
"QUARK")
|
126
quark/tests/tools/test_resp_async_middleware.py
Normal file
126
quark/tests/tools/test_resp_async_middleware.py
Normal file
@ -0,0 +1,126 @@
|
||||
# Copyright (c) 2016 Rackspace Hosting Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
import contextlib
|
||||
import json
|
||||
import mock
|
||||
|
||||
from quark.tests import test_base
|
||||
from quark.tools.middleware import resp_async_id as jobmw
|
||||
|
||||
|
||||
mw_mock_path = 'quark.tools.middleware.resp_async_id.ResponseAsyncIdAdder'
|
||||
|
||||
|
||||
class FakeResp(object):
|
||||
def __init__(self, body):
|
||||
self.body = body
|
||||
self.headers = {}
|
||||
|
||||
|
||||
class FakeContext(object):
|
||||
def __init__(self, job):
|
||||
self.async_job = job
|
||||
|
||||
|
||||
class TestRespAsyncIDMiddleware(test_base.TestBase):
|
||||
def setUp(self):
|
||||
self.middleware_cls = jobmw.ResponseAsyncIdAdder
|
||||
self.app = mock.Mock()
|
||||
self.conf = {}
|
||||
job = {'job': {'id': '3'}}
|
||||
self.job_ctx = FakeContext(job)
|
||||
self.no_ctx = {}
|
||||
self.none_ctx = None
|
||||
self.random_ctx = {'stuff': {'stuff': 'value'}}
|
||||
|
||||
self.body = '{"something": {"attribute": "value"}}'
|
||||
self.resp_return = FakeResp(self.body)
|
||||
self.err_resp = FakeResp('asdf::')
|
||||
|
||||
def test_middleware_instantiation(self):
|
||||
self.assertIsNotNone(self.middleware_cls(self.app, self.conf))
|
||||
|
||||
mw = jobmw.filter_factory(self.conf)(self.app)
|
||||
self.assertIsNotNone(mw)
|
||||
|
||||
def test_mw_none_context(self):
|
||||
mw = jobmw.filter_factory(self.conf)(self.app)
|
||||
with contextlib.nested(
|
||||
mock.patch('%s._get_resp' % mw_mock_path),
|
||||
mock.patch('%s._get_ctx' % mw_mock_path)) as \
|
||||
(get_resp, get_ctx):
|
||||
get_resp.return_value = self.resp_return
|
||||
get_ctx.return_value = self.none_ctx
|
||||
resp = mw.__call__.request('/', method='GET', body=self.body)
|
||||
self.assertEqual(resp, self.resp_return)
|
||||
self.assertEqual(self.body, resp.body)
|
||||
self.assertFalse('job_id' in resp.body)
|
||||
self.assertFalse('job_id' in resp.headers)
|
||||
|
||||
def test_mw_empty_context(self):
|
||||
mw = jobmw.filter_factory(self.conf)(self.app)
|
||||
with contextlib.nested(
|
||||
mock.patch('%s._get_resp' % mw_mock_path),
|
||||
mock.patch('%s._get_ctx' % mw_mock_path)) as \
|
||||
(get_resp, get_ctx):
|
||||
get_resp.return_value = self.resp_return
|
||||
get_ctx.return_value = self.no_ctx
|
||||
resp = mw.__call__.request('/', method='GET', body=self.body)
|
||||
self.assertEqual(resp, self.resp_return)
|
||||
self.assertEqual(self.body, resp.body)
|
||||
self.assertFalse('job_id' in resp.body)
|
||||
self.assertFalse('job_id' in resp.headers)
|
||||
|
||||
def test_mw_missing_context(self):
|
||||
mw = jobmw.filter_factory(self.conf)(self.app)
|
||||
with contextlib.nested(
|
||||
mock.patch('%s._get_resp' % mw_mock_path),
|
||||
mock.patch('%s._get_ctx' % mw_mock_path)) as \
|
||||
(get_resp, get_ctx):
|
||||
get_resp.return_value = self.resp_return
|
||||
get_ctx.return_value = self.random_ctx
|
||||
resp = mw.__call__.request('/', method='GET', body=self.body)
|
||||
self.assertEqual(resp, self.resp_return)
|
||||
self.assertEqual(self.body, resp.body)
|
||||
self.assertFalse('job_id' in resp.body)
|
||||
self.assertFalse('job_id' in resp.headers)
|
||||
|
||||
def test_mw_modify_resp(self):
|
||||
mw = jobmw.filter_factory(self.conf)(self.app)
|
||||
with contextlib.nested(
|
||||
mock.patch('%s._get_resp' % mw_mock_path),
|
||||
mock.patch('%s._get_ctx' % mw_mock_path)) as \
|
||||
(get_resp, get_ctx):
|
||||
get_resp.return_value = self.resp_return
|
||||
get_ctx.return_value = self.job_ctx
|
||||
resp = mw.__call__.request('/', method='GET', body=self.body)
|
||||
self.assertEqual(resp, self.resp_return)
|
||||
self.assertNotEqual(self.body, resp.body)
|
||||
self.assertTrue('job_id' in resp.body)
|
||||
self.assertTrue('job_id' in resp.headers)
|
||||
|
||||
resp_json = json.loads(resp.body)
|
||||
self.assertTrue('job_id' in resp_json)
|
||||
|
||||
def test_mw_error_resp(self):
|
||||
mw = jobmw.filter_factory(self.conf)(self.app)
|
||||
with contextlib.nested(
|
||||
mock.patch('%s._get_resp' % mw_mock_path),
|
||||
mock.patch('%s._get_ctx' % mw_mock_path)) as \
|
||||
(get_resp, get_ctx):
|
||||
get_resp.return_value = self.err_resp
|
||||
get_ctx.return_value = self.job_ctx
|
||||
resp = mw.__call__.request('/', method='GET', body=self.body)
|
||||
self.assertEqual(resp, self.err_resp)
|
@ -1,22 +1,38 @@
|
||||
# Copyright 2016 Rackspace Hosting Inc.
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
import eventlet
|
||||
eventlet.monkey_patch(socket=True, select=True, time=True)
|
||||
|
||||
import inspect
|
||||
import itertools
|
||||
import sys
|
||||
import time
|
||||
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log as logging
|
||||
import oslo_messaging as messaging
|
||||
from oslo_service import service as common_service
|
||||
from oslo_utils import excutils
|
||||
from stevedore import extension
|
||||
|
||||
from neutron._i18n import _
|
||||
from neutron._i18n import _LE
|
||||
from neutron.common import config
|
||||
from neutron.common import rpc as n_rpc
|
||||
from neutron.db import api as session
|
||||
from neutron import service
|
||||
|
||||
from quark.worker_plugins import base_worker
|
||||
|
||||
service_opts = [
|
||||
cfg.StrOpt('topic',
|
||||
help=_('Topic for messaging to pub/sub to')),
|
||||
@ -37,113 +53,99 @@ service_opts = [
|
||||
CONF = cfg.CONF
|
||||
CONF.register_opts(service_opts, "QUARK_ASYNC")
|
||||
LOG = logging.getLogger(__name__)
|
||||
VERSION = "1.0"
|
||||
PLUGIN_EP = 'quark.worker_plugin'
|
||||
|
||||
|
||||
class QuarkRpcTestCallback(object):
|
||||
target = messaging.Target(version='1.0', namespace=None)
|
||||
|
||||
def stuff(self, context, **kwargs):
|
||||
return {"status": "okay"}
|
||||
|
||||
|
||||
class QuarkAsyncPlugin(object):
|
||||
class QuarkAsyncServer(object):
|
||||
def __init__(self):
|
||||
pass
|
||||
self.plugins = []
|
||||
self._register_extensions(VERSION)
|
||||
|
||||
def _setup_rpc(self):
|
||||
self.endpoints = [QuarkRpcTestCallback()]
|
||||
def _load_worker_plugin_with_module(self, module, version):
|
||||
"""Instantiates worker plugins that have requsite properties.
|
||||
|
||||
def start_rpc_listeners(self):
|
||||
"""Configure all listeners here"""
|
||||
self._setup_rpc()
|
||||
self.conn = n_rpc.create_connection()
|
||||
self.conn.create_consumer(CONF.QUARK_ASYNC.topic, self.endpoints,
|
||||
fanout=False)
|
||||
return self.conn.consume_in_threads()
|
||||
The required properties are:
|
||||
* must have PLUGIN_EP entrypoint registered (or it wouldn't be in the
|
||||
list)
|
||||
* must have class attribute versions (list) of supported RPC versions
|
||||
* must subclass QuarkAsyncPluginBase
|
||||
"""
|
||||
classes = inspect.getmembers(module, inspect.isclass)
|
||||
loaded = 0
|
||||
for cls_name, cls in classes:
|
||||
if hasattr(cls, 'versions'):
|
||||
if version not in cls.versions:
|
||||
continue
|
||||
else:
|
||||
continue
|
||||
if issubclass(cls, base_worker.QuarkAsyncPluginBase):
|
||||
LOG.debug("Loading plugin %s" % cls_name)
|
||||
plugin = cls()
|
||||
self.plugins.append(plugin)
|
||||
loaded += 1
|
||||
LOG.debug("Found %d possible plugins and loaded %d" %
|
||||
(len(classes), loaded))
|
||||
|
||||
def _discover_via_entrypoints(self):
|
||||
"""Looks for modules with amtching entry points."""
|
||||
emgr = extension.ExtensionManager(PLUGIN_EP, invoke_on_load=False)
|
||||
return ((ext.name, ext.plugin) for ext in emgr)
|
||||
|
||||
def _register_extensions(self, version):
|
||||
"""Loads plugins that match the PLUGIN_EP entrypoint."""
|
||||
for name, module in itertools.chain(self._discover_via_entrypoints()):
|
||||
self._load_worker_plugin_with_module(module, version)
|
||||
|
||||
def serve_rpc(self):
|
||||
"""Launches configured # of workers per loaded plugin."""
|
||||
if cfg.CONF.QUARK_ASYNC.rpc_workers < 1:
|
||||
cfg.CONF.set_override('rpc_workers', 1, "QUARK_ASYNC")
|
||||
|
||||
try:
|
||||
rpc = service.RpcWorker(self.plugins)
|
||||
session.dispose() # probaby not needed, but maybe
|
||||
launcher = common_service.ProcessLauncher(CONF, wait_interval=1.0)
|
||||
launcher.launch_service(rpc, workers=CONF.QUARK_ASYNC.rpc_workers)
|
||||
|
||||
return launcher
|
||||
except Exception:
|
||||
with excutils.save_and_reraise_exception():
|
||||
LOG.exception(_LE('Unrecoverable error: please check log for '
|
||||
'details.'))
|
||||
|
||||
def start_api_and_rpc_workers(self):
|
||||
"""Initializes eventlet and starts wait for workers to exit.
|
||||
|
||||
Spawns the workers returned from serve_rpc
|
||||
"""
|
||||
pool = eventlet.GreenPool()
|
||||
|
||||
quark_rpc = self.serve_rpc()
|
||||
pool.spawn(quark_rpc.wait)
|
||||
|
||||
pool.waitall()
|
||||
|
||||
def run(self):
|
||||
"""Start of async worker process."""
|
||||
self.start_api_and_rpc_workers()
|
||||
|
||||
|
||||
def serve_rpc():
|
||||
|
||||
if cfg.CONF.QUARK_ASYNC.rpc_workers < 1:
|
||||
cfg.CONF.set_override('rpc_workers', 1, "QUARK_ASYNC")
|
||||
|
||||
try:
|
||||
plugins = [QuarkAsyncPlugin()]
|
||||
rpc = service.RpcWorker(plugins)
|
||||
session.dispose() # probaby not needed, but maybe
|
||||
launcher = common_service.ProcessLauncher(CONF, wait_interval=1.0)
|
||||
launcher.launch_service(rpc, workers=CONF.QUARK_ASYNC.rpc_workers)
|
||||
|
||||
return launcher
|
||||
except Exception:
|
||||
with excutils.save_and_reraise_exception():
|
||||
LOG.exception(_LE('Unrecoverable error: please check log for '
|
||||
'details.'))
|
||||
|
||||
|
||||
def start_api_and_rpc_workers():
|
||||
pool = eventlet.GreenPool()
|
||||
|
||||
quark_rpc = serve_rpc()
|
||||
pool.spawn(quark_rpc.wait)
|
||||
|
||||
pool.waitall()
|
||||
|
||||
|
||||
def boot_server(server_func):
|
||||
# the configuration will be read into the cfg.CONF global data structure
|
||||
def main():
|
||||
config.init(sys.argv[1:])
|
||||
config.setup_logging()
|
||||
config.set_config_defaults()
|
||||
if not cfg.CONF.config_file:
|
||||
sys.exit(_("ERROR: Unable to find configuration file via the default"
|
||||
" search paths (~/.neutron/, ~/, /etc/neutron/, /etc/) and"
|
||||
" the '--config-file' option!"))
|
||||
sys.exit(_("ERROR: Unable to find configuration file via the"
|
||||
" default search paths (~/.neutron/, ~/, /etc/neutron/,"
|
||||
" /etc/) and the '--config-file' option!"))
|
||||
try:
|
||||
server_func()
|
||||
QuarkAsyncServer().run()
|
||||
except KeyboardInterrupt:
|
||||
pass
|
||||
except RuntimeError as e:
|
||||
sys.exit(_("ERROR: %s") % e)
|
||||
|
||||
|
||||
def main():
|
||||
boot_server(start_api_and_rpc_workers)
|
||||
|
||||
|
||||
class QuarkRpcTestApi(object):
|
||||
"""This class is used for testing QuarkRpcTestCallback."""
|
||||
def __init__(self):
|
||||
target = messaging.Target(topic=CONF.QUARK_ASYNC.topic)
|
||||
self.client = n_rpc.get_client(target)
|
||||
|
||||
def stuff(self, context):
|
||||
cctxt = self.client.prepare(version='1.0')
|
||||
return cctxt.call(context, 'stuff')
|
||||
|
||||
|
||||
class QuarkAsyncTestContext(object):
|
||||
"""This class is used for testing QuarkRpcTestCallback."""
|
||||
def __init__(self):
|
||||
self.time = time.ctime()
|
||||
|
||||
def to_dict(self):
|
||||
return {"application": "rpc-client", "time": time.ctime()}
|
||||
|
||||
|
||||
def test_main():
|
||||
config.init(sys.argv[1:])
|
||||
config.setup_logging()
|
||||
config.set_config_defaults()
|
||||
if not cfg.CONF.config_file:
|
||||
sys.exit(_("ERROR: Unable to find configuration file via the default"
|
||||
" search paths (~/.neutron/, ~/, /etc/neutron/, /etc/) and"
|
||||
" the '--config-file' option!"))
|
||||
context = QuarkAsyncTestContext() # typically context is neutron context
|
||||
client = QuarkRpcTestApi()
|
||||
LOG.info(client.stuff(context))
|
||||
time.sleep(0) # necessary for preventing Timeout exceptions
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
|
0
quark/tools/middleware/__init__.py
Normal file
0
quark/tools/middleware/__init__.py
Normal file
64
quark/tools/middleware/resp_async_id.py
Normal file
64
quark/tools/middleware/resp_async_id.py
Normal file
@ -0,0 +1,64 @@
|
||||
# Copyright (c) 2016 Rackspace Hosting Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import json
|
||||
|
||||
from neutron import wsgi
|
||||
from oslo_log import log as logging
|
||||
import webob.dec
|
||||
import webob.exc
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class ResponseAsyncIdAdder(object):
|
||||
"""Return a fake token if one isn't specified."""
|
||||
def __init__(self, app, conf):
|
||||
self.app = app
|
||||
self.conf = conf
|
||||
|
||||
def _get_resp(self, req):
|
||||
return req.get_response(self.app)
|
||||
|
||||
def _get_ctx(self, req):
|
||||
return req.environ.get('neutron.context')
|
||||
|
||||
@webob.dec.wsgify(RequestClass=wsgi.Request)
|
||||
def __call__(self, req):
|
||||
"""Attempts to put the job_id into the response body and header."""
|
||||
resp = self._get_resp(req)
|
||||
context = self._get_ctx(req)
|
||||
if hasattr(context, 'async_job'):
|
||||
try:
|
||||
json_body = json.loads(resp.body)
|
||||
json_body['job_id'] = context.async_job['job']['id']
|
||||
resp.body = json.dumps(json_body)
|
||||
resp.headers['job_id'] = context.async_job['job']['id']
|
||||
except ValueError: # bad json not abnormal
|
||||
return resp
|
||||
except Exception as e: # Bare exception for anything random
|
||||
LOG.error("Uncaught exception: %s" % e)
|
||||
return resp
|
||||
|
||||
|
||||
def filter_factory(global_conf, **local_conf):
|
||||
"""Returns a WSGI filter app for use with paste.deploy."""
|
||||
conf = global_conf.copy()
|
||||
conf.update(local_conf)
|
||||
|
||||
def wrapper(app):
|
||||
return ResponseAsyncIdAdder(app, conf)
|
||||
|
||||
return wrapper
|
0
quark/worker_plugins/__init__.py
Normal file
0
quark/worker_plugins/__init__.py
Normal file
61
quark/worker_plugins/base_worker.py
Normal file
61
quark/worker_plugins/base_worker.py
Normal file
@ -0,0 +1,61 @@
|
||||
# Copyright 2016 Rackspace Hosting Inc.
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
from neutron.common import rpc as n_rpc
|
||||
from neutron import context
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log as logging
|
||||
|
||||
|
||||
CONF = cfg.CONF
|
||||
LOG = logging.getLogger(__name__)
|
||||
TEST_TOPIC = 'quark'
|
||||
|
||||
|
||||
class QuarkAsyncPluginBase(object):
|
||||
versions = []
|
||||
|
||||
def __init__(self, topic):
|
||||
self._context = None
|
||||
self.topic = topic
|
||||
self.endpoints = []
|
||||
self.callbacks = []
|
||||
|
||||
def _setup_rpc(self):
|
||||
"""Registers callbacks to RPC assigned by subclasses.
|
||||
|
||||
This does nothing if the subclasses do not set callbacks value.
|
||||
"""
|
||||
self.endpoints.extend(self.callbacks)
|
||||
|
||||
def start_rpc_listeners(self):
|
||||
"""Configure all listeners here"""
|
||||
self._setup_rpc()
|
||||
if not self.endpoints:
|
||||
return []
|
||||
self.conn = n_rpc.create_connection()
|
||||
self.conn.create_consumer(self.topic, self.endpoints,
|
||||
fanout=False)
|
||||
return self.conn.consume_in_threads()
|
||||
|
||||
@property
|
||||
def context(self):
|
||||
"""Provides an admin context for workers."""
|
||||
if not self._context:
|
||||
self._context = context.get_admin_context()
|
||||
return self._context
|
||||
|
||||
|
||||
def get_test_context():
|
||||
return context.get_admin_context()
|
84
quark/worker_plugins/sample_worker.py
Normal file
84
quark/worker_plugins/sample_worker.py
Normal file
@ -0,0 +1,84 @@
|
||||
# Copyright 2016 Rackspace Hosting Inc.
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
import sys
|
||||
import time
|
||||
|
||||
from neutron._i18n import _
|
||||
from neutron.common import config
|
||||
from neutron.common import rpc as n_rpc
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log as logging
|
||||
import oslo_messaging as messaging
|
||||
|
||||
from quark.plugin_modules import networks as network_api
|
||||
from quark.worker_plugins import base_worker
|
||||
|
||||
|
||||
CONF = cfg.CONF
|
||||
LOG = logging.getLogger(__name__)
|
||||
TEST_TOPIC = 'quark'
|
||||
VERSION = "1.0"
|
||||
|
||||
|
||||
class QuarkRpcTestCallback(object):
|
||||
"""The callbacks class defines the server-side methods to be called."""
|
||||
target = messaging.Target(version='1.0', namespace=None)
|
||||
|
||||
def stuff(self, context, **kwargs):
|
||||
LOG.debug(context)
|
||||
networks = network_api.get_networks(context)
|
||||
return {"networks": networks, "status": "okay"}
|
||||
|
||||
|
||||
class QuarkAsyncPluginTest(base_worker.QuarkAsyncPluginBase):
|
||||
"""The worker plugin defines the server-side subscriber to be run.
|
||||
|
||||
Each one of these plugins may be executed on multiple workers (and each
|
||||
worker only runs a single plugin).
|
||||
"""
|
||||
versions = [VERSION]
|
||||
TOPIC = "quark"
|
||||
|
||||
def __init__(self, topic=TOPIC):
|
||||
super(QuarkAsyncPluginTest, self).__init__(topic)
|
||||
self.callbacks = [QuarkRpcTestCallback()]
|
||||
|
||||
|
||||
class QuarkRpcTestApi(object):
|
||||
"""The API classes define the client-side methods to be called.
|
||||
|
||||
These methods do not need to match the names of the remote calls as long
|
||||
as the RPC call within match.
|
||||
"""
|
||||
def __init__(self, topic):
|
||||
target = messaging.Target(topic=topic)
|
||||
self.client = n_rpc.get_client(target)
|
||||
|
||||
def stuff(self, context):
|
||||
cctxt = self.client.prepare(version='1.0')
|
||||
return cctxt.call(context, 'stuff')
|
||||
|
||||
|
||||
def main():
|
||||
config.init(sys.argv[1:])
|
||||
config.setup_logging()
|
||||
config.set_config_defaults()
|
||||
if not cfg.CONF.config_file:
|
||||
sys.exit(_("ERROR: Unable to find configuration file via the default"
|
||||
" search paths (~/.neutron/, ~/, /etc/neutron/, /etc/) and"
|
||||
" the '--config-file' option!"))
|
||||
client = QuarkRpcTestApi(TEST_TOPIC)
|
||||
LOG.info(client.stuff(base_worker.get_test_context()))
|
||||
time.sleep(0) # necessary for preventing Timeout exceptions
|
232
quark/worker_plugins/sg_update_worker.py
Normal file
232
quark/worker_plugins/sg_update_worker.py
Normal file
@ -0,0 +1,232 @@
|
||||
# Copyright 2016 Rackspace Hosting Inc.
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
import time
|
||||
|
||||
from neutron.common import rpc as n_rpc
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log as logging
|
||||
import oslo_messaging as messaging
|
||||
from oslo_messaging import exceptions as om_exc
|
||||
|
||||
from quark.db import api as db_api
|
||||
from quark.plugin_modules import jobs as job_api
|
||||
from quark.plugin_modules import ports as port_api
|
||||
from quark.worker_plugins import base_worker
|
||||
|
||||
|
||||
CONF = cfg.CONF
|
||||
LOG = logging.getLogger(__name__)
|
||||
CONSUMER_TOPIC = 'quark_sg_update_consumer'
|
||||
PRODUCER_TOPIC = 'quark_sg_update_producer'
|
||||
SG_UPDATE_TOPIC = 'quark_sg_update'
|
||||
VERSION = "1.0"
|
||||
|
||||
|
||||
"""
|
||||
Quark SG update has three components:
|
||||
1. AsyncProcess - begins the update process
|
||||
2. Producer - produces the 'update port' jobs
|
||||
3. Consumer - updates the ports on redis
|
||||
|
||||
Quark SG update workflow is triggered on:
|
||||
* rule create
|
||||
* rule delete
|
||||
|
||||
Quark SG update workflow:
|
||||
1. triggering event occurs and is completed successfully
|
||||
2. RPC call to create job and start async update process
|
||||
a. RPC cast to start async update process (see: Async update workflow)
|
||||
3. Job information stored in context
|
||||
4. Response modified to include job information
|
||||
|
||||
Async update workflow:
|
||||
1. process started from SG update trigger
|
||||
2. for each port associated with SG:
|
||||
a. create job
|
||||
b. RPC cast to perform update on port
|
||||
i. update redis
|
||||
ii. complete job and check root transaction status
|
||||
|
||||
============================================================
|
||||
Component 1. QuarkSGAsyncProcess
|
||||
============================================================
|
||||
"""
|
||||
|
||||
|
||||
class QuarkSGAsyncProcessCallback(object):
|
||||
target = messaging.Target(version='1.0', namespace=None)
|
||||
|
||||
def update_sg(self, context, sg, rule_id, action):
|
||||
"""Begins the async update process."""
|
||||
db_sg = db_api.security_group_find(context, id=sg, scope=db_api.ONE)
|
||||
if not db_sg:
|
||||
return None
|
||||
with context.session.begin():
|
||||
job_body = dict(action="%s sg rule %s" % (action, rule_id),
|
||||
resource_id=rule_id,
|
||||
tenant_id=db_sg['tenant_id'])
|
||||
job_body = dict(job=job_body)
|
||||
job = job_api.create_job(context, job_body)
|
||||
rpc_client = QuarkSGAsyncProducerClient()
|
||||
try:
|
||||
rpc_client.populate_subtasks(context, sg, job['id'])
|
||||
except om_exc.MessagingTimeout:
|
||||
LOG.error("Failed to create subtasks. Rabbit running?")
|
||||
return None
|
||||
return {"job_id": job['id']}
|
||||
|
||||
|
||||
class QuarkSGAsyncProcess(base_worker.QuarkAsyncPluginBase):
|
||||
versions = [VERSION]
|
||||
|
||||
def __init__(self, topic=SG_UPDATE_TOPIC):
|
||||
super(QuarkSGAsyncProcess, self).__init__(topic)
|
||||
self.callbacks = [QuarkSGAsyncProcessCallback()]
|
||||
|
||||
|
||||
class QuarkSGAsyncProcessClient(object):
|
||||
def __init__(self):
|
||||
topic = SG_UPDATE_TOPIC
|
||||
target = messaging.Target(topic=topic)
|
||||
self.client = n_rpc.get_client(target)
|
||||
|
||||
def start_update(self, context, sg, rule_id, action):
|
||||
cctxt = self.client.prepare(version='1.0')
|
||||
try:
|
||||
return cctxt.call(context, 'update_sg', sg=sg, rule_id=rule_id,
|
||||
action=action)
|
||||
except om_exc.MessagingTimeout:
|
||||
return None
|
||||
|
||||
|
||||
"""
|
||||
============================================================
|
||||
Component 2. QuarkSGAsyncProducer
|
||||
============================================================
|
||||
"""
|
||||
|
||||
|
||||
class QuarkSGProducerCallback(object):
|
||||
target = messaging.Target(version='1.0', namespace=None)
|
||||
|
||||
def populate_subtasks(self, context, sg, parent_job_id):
|
||||
"""Produces a list of ports to be updated async."""
|
||||
db_sg = db_api.security_group_find(context, id=sg, scope=db_api.ONE)
|
||||
if not db_sg:
|
||||
return None
|
||||
ports = db_api.sg_gather_associated_ports(context, db_sg)
|
||||
if len(ports) == 0:
|
||||
return {"ports": 0}
|
||||
for port in ports:
|
||||
job_body = dict(action="update port %s" % port['id'],
|
||||
tenant_id=db_sg['tenant_id'],
|
||||
resource_id=port['id'],
|
||||
parent_id=parent_job_id)
|
||||
job_body = dict(job=job_body)
|
||||
job = job_api.create_job(context, job_body)
|
||||
rpc_consumer = QuarkSGAsyncConsumerClient()
|
||||
try:
|
||||
rpc_consumer.update_port(context, port['id'], job['id'])
|
||||
except om_exc.MessagingTimeout:
|
||||
# TODO(roaet): Not too sure what can be done here other than
|
||||
# updating the job as a failure?
|
||||
LOG.error("Failed to update port. Rabbit running?")
|
||||
return None
|
||||
|
||||
|
||||
class QuarkSGAsyncProducer(base_worker.QuarkAsyncPluginBase):
|
||||
versions = [VERSION]
|
||||
|
||||
def __init__(self, topic=PRODUCER_TOPIC):
|
||||
super(QuarkSGAsyncProducer, self).__init__(topic)
|
||||
self.callbacks = [QuarkSGProducerCallback()]
|
||||
|
||||
|
||||
class QuarkSGAsyncProducerClient(object):
|
||||
def __init__(self):
|
||||
topic = PRODUCER_TOPIC
|
||||
target = messaging.Target(topic=topic)
|
||||
self.client = n_rpc.get_client(target)
|
||||
|
||||
def populate_subtasks(self, context, sg, parent_job_id):
|
||||
cctxt = self.client.prepare(version='1.0')
|
||||
return cctxt.cast(context, 'populate_subtasks', sg=sg,
|
||||
parent_job_id=parent_job_id)
|
||||
|
||||
|
||||
"""
|
||||
============================================================
|
||||
Component 3. QuarkSGAsyncConsumer
|
||||
============================================================
|
||||
"""
|
||||
|
||||
|
||||
class QuarkSGConsumerCallback(object):
|
||||
target = messaging.Target(version='1.0', namespace=None)
|
||||
|
||||
def update_ports_for_sg(self, context, portid, jobid):
|
||||
"""Updates the ports through redis."""
|
||||
port = db_api.port_find(context, id=portid, scope=db_api.ONE)
|
||||
if not port:
|
||||
LOG.warning("Port not found")
|
||||
return
|
||||
net_driver = port_api._get_net_driver(port.network, port=port)
|
||||
base_net_driver = port_api._get_net_driver(port.network)
|
||||
sg_list = [sg for sg in port.security_groups]
|
||||
|
||||
success = False
|
||||
error = None
|
||||
retries = 3
|
||||
retry_delay = 2
|
||||
for retry in xrange(retries):
|
||||
try:
|
||||
net_driver.update_port(context, port_id=port["backend_key"],
|
||||
mac_address=port["mac_address"],
|
||||
device_id=port["device_id"],
|
||||
base_net_driver=base_net_driver,
|
||||
security_groups=sg_list)
|
||||
success = True
|
||||
error = None
|
||||
break
|
||||
except Exception as error:
|
||||
LOG.warning("Could not connect to redis, but retrying soon")
|
||||
time.sleep(retry_delay)
|
||||
status_str = ""
|
||||
if not success:
|
||||
status_str = "Port %s update failed after %d tries. Error: %s" % (
|
||||
portid, retries, error)
|
||||
update_body = dict(completed=True, status=status_str)
|
||||
update_body = dict(job=update_body)
|
||||
job_api.update_job(context, jobid, update_body)
|
||||
|
||||
|
||||
class QuarkSGAsyncConsumer(base_worker.QuarkAsyncPluginBase):
|
||||
versions = [VERSION]
|
||||
|
||||
def __init__(self, topic=CONSUMER_TOPIC):
|
||||
super(QuarkSGAsyncConsumer, self).__init__(topic)
|
||||
self.callbacks = [QuarkSGConsumerCallback()]
|
||||
|
||||
|
||||
class QuarkSGAsyncConsumerClient(object):
|
||||
def __init__(self):
|
||||
topic = CONSUMER_TOPIC
|
||||
target = messaging.Target(topic=topic)
|
||||
self.client = n_rpc.get_client(target)
|
||||
|
||||
def update_port(self, context, portid, jobid):
|
||||
cctxt = self.client.prepare(version='1.0')
|
||||
return cctxt.cast(context, 'update_ports_for_sg', portid=portid,
|
||||
jobid=jobid)
|
@ -14,6 +14,7 @@ docopt==0.6.2
|
||||
gunicorn==19.3.0
|
||||
pymysql==0.6.6
|
||||
twiceredis>=2.0.0
|
||||
stevedore>=1.16.0 # Apache-2.0
|
||||
|
||||
# agent deps
|
||||
xenapi==1.2
|
||||
|
@ -22,8 +22,12 @@ console_scripts =
|
||||
gunicorn-neutron-server = quark.gunicorn_server:main
|
||||
quark-agent = quark.agent.agent:main
|
||||
quark-async-worker = quark.tools.async_worker:main
|
||||
quark-async-tester = quark.tools.async_worker:test_main
|
||||
quark-async-tester = quark.worker_plugins.sample_worker:main
|
||||
quark-sg-tester = quark.worker_plugins.sg_update_worker:main
|
||||
ip_availability = quark.ip_availability:main
|
||||
redis_sg_tool = quark.tools.redis_sg_tool:main
|
||||
null_routes = quark.tools.null_routes:main
|
||||
insert_provider_subnets = quark.tools.insert_provider_subnets:main
|
||||
quark.worker_plugin =
|
||||
test_plugin = quark.worker_plugins.sample_worker
|
||||
sg_update = quark.worker_plugins.sg_update_worker
|
||||
|
Loading…
x
Reference in New Issue
Block a user