diff --git a/tricircle/common/constants.py b/tricircle/common/constants.py index 1020a0f..8eaf6c5 100644 --- a/tricircle/common/constants.py +++ b/tricircle/common/constants.py @@ -58,3 +58,11 @@ ns_bridge_port_name = 'ns_bridge_port_%s_%s_%s' MAX_INT = 0x7FFFFFFF expire_time = datetime.datetime(2000, 1, 1) + +# job status +JS_New = 'New' +JS_Running = 'Running' +JS_Success = 'Success' +JS_Fail = 'Fail' + +SP_EXTRA_ID = '00000000-0000-0000-0000-000000000000' diff --git a/tricircle/db/api.py b/tricircle/db/api.py index fe32019..5ddd40a 100644 --- a/tricircle/db/api.py +++ b/tricircle/db/api.py @@ -21,7 +21,9 @@ from oslo_config import cfg from oslo_db import exception as db_exc from oslo_log import log as logging from oslo_utils import timeutils +from oslo_utils import uuidutils +from tricircle.common import constants from tricircle.common.context import is_admin_context as _is_admin_context from tricircle.common import exceptions from tricircle.common.i18n import _ @@ -202,6 +204,72 @@ def get_pod_by_name(context, pod_name): return None +def new_job(context, _type, resource_id): + with context.session.begin(): + job_dict = {'id': uuidutils.generate_uuid(), + 'type': _type, + 'status': constants.JS_New, + 'resource_id': resource_id, + 'extra_id': uuidutils.generate_uuid()} + job = core.create_resource(context, models.Job, job_dict) + return job + + +def register_job(context, _type, resource_id): + try: + context.session.begin() + job_dict = {'id': uuidutils.generate_uuid(), + 'type': _type, + 'status': constants.JS_Running, + 'resource_id': resource_id, + 'extra_id': constants.SP_EXTRA_ID} + job = core.create_resource(context, models.Job, job_dict) + context.session.commit() + return job + except db_exc.DBDuplicateEntry: + context.session.rollback() + return None + except db_exc.DBDeadlock: + context.session.rollback() + return None + finally: + context.session.close() + + +def get_latest_timestamp(context, status, _type, resource_id): + jobs = core.query_resource( + context, models.Job, + [{'key': 'status', 'comparator': 'eq', 'value': status}, + {'key': 'type', 'comparator': 'eq', 'value': _type}, + {'key': 'resource_id', 'comparator': 'eq', 'value': resource_id}], + [('timestamp', False)]) + if jobs: + return jobs[0]['timestamp'] + else: + return None + + +def get_running_job(context, _type, resource_id): + jobs = core.query_resource( + context, models.Job, + [{'key': 'resource_id', 'comparator': 'eq', 'value': resource_id}, + {'key': 'status', 'comparator': 'eq', 'value': constants.JS_Running}, + {'key': 'type', 'comparator': 'eq', 'value': _type}], []) + if jobs: + return jobs[0] + else: + return None + + +def finish_job(context, job_id, successful, timestamp): + status = constants.JS_Success if successful else constants.JS_Fail + with context.session.begin(): + job_dict = {'status': status, + 'timestamp': timestamp, + 'extra_id': uuidutils.generate_uuid()} + core.update_resource(context, models.Job, job_id, job_dict) + + _DEFAULT_QUOTA_NAME = 'default' diff --git a/tricircle/db/migrate_repo/versions/002_resource.py b/tricircle/db/migrate_repo/versions/002_resource.py index c4305b5..8f6cecf 100644 --- a/tricircle/db/migrate_repo/versions/002_resource.py +++ b/tricircle/db/migrate_repo/versions/002_resource.py @@ -217,10 +217,25 @@ def upgrade(migrate_engine): mysql_engine='InnoDB', mysql_charset='utf8') + job = sql.Table( + 'job', meta, + sql.Column('id', sql.String(length=36), primary_key=True), + sql.Column('type', sql.String(length=36)), + sql.Column('timestamp', sql.TIMESTAMP, + server_default=sql.text('CURRENT_TIMESTAMP')), + sql.Column('status', sql.String(length=36)), + sql.Column('resource_id', sql.String(length=36)), + sql.Column('extra_id', sql.String(length=36)), + migrate.UniqueConstraint( + 'type', 'status', 'resource_id', 'extra_id', + name='job0type0status0resource_id0extra_id'), + mysql_engine='InnoDB', + mysql_charset='utf8') + tables = [aggregates, aggregate_metadata, instance_types, instance_type_projects, instance_type_extra_specs, key_pairs, quotas, quota_classes, quota_usages, reservations, - volume_types, + volume_types, job, quality_of_service_specs, cascaded_pods_resource_routing] for table in tables: table.create() diff --git a/tricircle/db/models.py b/tricircle/db/models.py index 5414589..a24bf56 100644 --- a/tricircle/db/models.py +++ b/tricircle/db/models.py @@ -385,3 +385,23 @@ class ResourceRouting(core.ModelBase, core.DictBase, models.TimestampMixin): project_id = sql.Column('project_id', sql.String(length=36)) resource_type = sql.Column('resource_type', sql.String(length=64), nullable=False) + + +class Job(core.ModelBase, core.DictBase): + __tablename__ = 'job' + __table_args__ = ( + schema.UniqueConstraint( + 'type', 'status', 'resource_id', 'extra_id', + name='job0type0status0resource_id0extra_id'), + ) + + attributes = ['id', 'type', 'timestamp', 'status', 'resource_id', + 'extra_id'] + + id = sql.Column('id', sql.String(length=36), primary_key=True) + type = sql.Column('type', sql.String(length=36)) + timestamp = sql.Column('timestamp', sql.TIMESTAMP, + server_default=sql.text('CURRENT_TIMESTAMP')) + status = sql.Column('status', sql.String(length=36)) + resource_id = sql.Column('resource_id', sql.String(length=36)) + extra_id = sql.Column('extra_id', sql.String(length=36)) diff --git a/tricircle/tests/unit/xjob/test_xmanager.py b/tricircle/tests/unit/xjob/test_xmanager.py index 549e1b5..a166742 100644 --- a/tricircle/tests/unit/xjob/test_xmanager.py +++ b/tricircle/tests/unit/xjob/test_xmanager.py @@ -13,15 +13,21 @@ # See the License for the specific language governing permissions and # limitations under the License. +import datetime import mock from mock import patch import unittest +from oslo_config import cfg +from oslo_utils import uuidutils + +from tricircle.common import constants from tricircle.common import context import tricircle.db.api as db_api from tricircle.db import core from tricircle.db import models from tricircle.xjob import xmanager +from tricircle.xjob import xservice BOTTOM1_NETWORK = [] @@ -32,7 +38,8 @@ BOTTOM1_PORT = [] BOTTOM2_PORT = [] BOTTOM1_ROUTER = [] BOTTOM2_ROUTER = [] -RES_LIST = [BOTTOM1_SUBNET, BOTTOM2_SUBNET, BOTTOM1_PORT, BOTTOM2_PORT] +RES_LIST = [BOTTOM1_NETWORK, BOTTOM2_NETWORK, BOTTOM1_SUBNET, BOTTOM2_SUBNET, + BOTTOM1_PORT, BOTTOM2_PORT, BOTTOM1_ROUTER, BOTTOM2_ROUTER] RES_MAP = {'pod_1': {'network': BOTTOM1_NETWORK, 'subnet': BOTTOM1_SUBNET, 'port': BOTTOM1_PORT, @@ -93,6 +100,10 @@ class XManagerTest(unittest.TestCase): core.ModelBase.metadata.create_all(core.get_engine()) # enforce foreign key constraint for sqlite core.get_engine().execute('pragma foreign_keys=on') + for opt in xservice.common_opts: + if opt.name in ('worker_handle_timeout', 'job_run_expire', + 'worker_sleep_time'): + cfg.CONF.register_opt(opt) self.context = context.Context() self.xmanager = FakeXManager() @@ -160,7 +171,7 @@ class XManagerTest(unittest.TestCase): 'ip_address': '10.0.3.1'}]}) self.xmanager.configure_extra_routes(self.context, - {'router': top_router_id}) + payload={'router': top_router_id}) calls = [mock.call(self.context, 'router_1_id', {'router': { 'routes': [{'nexthop': '100.0.1.2', @@ -172,3 +183,91 @@ class XManagerTest(unittest.TestCase): {'nexthop': '100.0.1.1', 'destination': '10.0.3.0/24'}]}})] mock_update.assert_has_calls(calls) + + def test_job_handle(self): + @xmanager._job_handle('fake_resource') + def fake_handle(self, ctx, payload): + pass + + fake_id = 'fake_id' + payload = {'fake_resource': fake_id} + fake_handle(None, self.context, payload=payload) + + jobs = core.query_resource(self.context, models.Job, [], []) + expected_status = [constants.JS_New, constants.JS_Success] + job_status = [job['status'] for job in jobs] + self.assertItemsEqual(expected_status, job_status) + + self.assertEqual(fake_id, jobs[0]['resource_id']) + self.assertEqual(fake_id, jobs[1]['resource_id']) + self.assertEqual('fake_resource', jobs[0]['type']) + self.assertEqual('fake_resource', jobs[1]['type']) + + def test_job_handle_exception(self): + @xmanager._job_handle('fake_resource') + def fake_handle(self, ctx, payload): + raise Exception() + + fake_id = 'fake_id' + payload = {'fake_resource': fake_id} + fake_handle(None, self.context, payload=payload) + + jobs = core.query_resource(self.context, models.Job, [], []) + expected_status = [constants.JS_New, constants.JS_Fail] + job_status = [job['status'] for job in jobs] + self.assertItemsEqual(expected_status, job_status) + + self.assertEqual(fake_id, jobs[0]['resource_id']) + self.assertEqual(fake_id, jobs[1]['resource_id']) + self.assertEqual('fake_resource', jobs[0]['type']) + self.assertEqual('fake_resource', jobs[1]['type']) + + def test_job_run_expire(self): + @xmanager._job_handle('fake_resource') + def fake_handle(self, ctx, payload): + pass + + fake_id = uuidutils.generate_uuid() + payload = {'fake_resource': fake_id} + expired_job = { + 'id': uuidutils.generate_uuid(), + 'type': 'fake_resource', + 'timestamp': datetime.datetime.now() - datetime.timedelta(0, 120), + 'status': constants.JS_Running, + 'resource_id': fake_id, + 'extra_id': constants.SP_EXTRA_ID + } + core.create_resource(self.context, models.Job, expired_job) + fake_handle(None, self.context, payload=payload) + + jobs = core.query_resource(self.context, models.Job, [], []) + expected_status = ['New', 'Fail', 'Success'] + job_status = [job['status'] for job in jobs] + self.assertItemsEqual(expected_status, job_status) + + for i in xrange(3): + self.assertEqual(fake_id, jobs[i]['resource_id']) + self.assertEqual('fake_resource', jobs[i]['type']) + + @patch.object(db_api, 'get_running_job') + @patch.object(db_api, 'register_job') + def test_worker_handle_timeout(self, mock_register, mock_get): + @xmanager._job_handle('fake_resource') + def fake_handle(self, ctx, payload): + pass + + cfg.CONF.set_override('worker_handle_timeout', 1) + mock_register.return_value = None + mock_get.return_value = None + + fake_id = uuidutils.generate_uuid() + payload = {'fake_resource': fake_id} + fake_handle(None, self.context, payload=payload) + + # nothing to assert, what we test is that fake_handle can exit when + # timeout + + def tearDown(self): + core.ModelBase.metadata.drop_all(core.get_engine()) + for res in RES_LIST: + del res[:] diff --git a/tricircle/xjob/xmanager.py b/tricircle/xjob/xmanager.py index bbd1a24..566b74e 100755 --- a/tricircle/xjob/xmanager.py +++ b/tricircle/xjob/xmanager.py @@ -13,7 +13,10 @@ # See the License for the specific language governing permissions and # limitations under the License. +import datetime +import eventlet import netaddr +import six from oslo_config import cfg from oslo_log import log as logging @@ -23,7 +26,9 @@ from oslo_service import periodic_task from tricircle.common import client from tricircle.common import constants from tricircle.common.i18n import _ +from tricircle.common.i18n import _LE from tricircle.common.i18n import _LI +from tricircle.common.i18n import _LW import tricircle.db.api as db_api @@ -31,6 +36,78 @@ CONF = cfg.CONF LOG = logging.getLogger(__name__) +def _job_handle(job_type): + def handle_func(func): + @six.wraps(func) + def handle_args(*args, **kwargs): + ctx = args[1] + payload = kwargs['payload'] + + resource_id = payload[job_type] + db_api.new_job(ctx, job_type, resource_id) + start_time = datetime.datetime.now() + + while True: + current_time = datetime.datetime.now() + delta = current_time - start_time + if delta.seconds >= CONF.worker_handle_timeout: + # quit when this handle is running for a long time + break + time_new = db_api.get_latest_timestamp(ctx, constants.JS_New, + job_type, resource_id) + time_success = db_api.get_latest_timestamp( + ctx, constants.JS_Success, job_type, resource_id) + if time_success and time_success >= time_new: + break + job = db_api.register_job(ctx, job_type, resource_id) + if not job: + # fail to obtain the lock, let other worker handle the job + running_job = db_api.get_running_job(ctx, job_type, + resource_id) + if not running_job: + # there are two reasons that running_job is None. one + # is that the running job has just been finished, the + # other is that all workers fail to register the job + # due to deadlock exception. so we sleep and try again + eventlet.sleep(CONF.worker_sleep_time) + continue + job_time = running_job['timestamp'] + current_time = datetime.datetime.now() + delta = current_time - job_time + if delta.seconds > CONF.job_run_expire: + # previous running job expires, we set its status to + # fail and try again to obtain the lock + db_api.finish_job(ctx, running_job['id'], False, + time_new) + LOG.warning(_LW('Job %(job)s of type %(job_type)s for ' + 'resource %(resource)s expires, set ' + 'its state to Fail'), + {'job': running_job['id'], + 'job_type': job_type, + 'resource': resource_id}) + eventlet.sleep(CONF.worker_sleep_time) + continue + else: + # previous running job is still valid, we just leave + # the job to the worker who holds the lock + break + # successfully obtain the lock, start to execute handler + try: + func(*args, **kwargs) + except Exception: + db_api.finish_job(ctx, job['id'], False, time_new) + LOG.error(_LE('Job %(job)s of type %(job_type)s for ' + 'resource %(resource)s fails'), + {'job': job['id'], + 'job_type': job_type, + 'resource': resource_id}) + break + db_api.finish_job(ctx, job['id'], True, time_new) + eventlet.sleep(CONF.worker_sleep_time) + return handle_args + return handle_func + + class PeriodicTasks(periodic_task.PeriodicTasks): def __init__(self): super(PeriodicTasks, self).__init__(CONF) @@ -128,6 +205,7 @@ class XManager(PeriodicTasks): return info_text + @_job_handle('router') def configure_extra_routes(self, ctx, payload): # TODO(zhiyuan) performance and reliability issue # better have a job tracking mechanism diff --git a/tricircle/xjob/xservice.py b/tricircle/xjob/xservice.py index e49613d..dc1a39d 100755 --- a/tricircle/xjob/xservice.py +++ b/tricircle/xjob/xservice.py @@ -47,7 +47,15 @@ common_opts = [ cfg.StrOpt('host', default='tricircle.xhost', help=_("The host name for RPC server")), cfg.IntOpt('workers', default=1, - help=_("number of workers")), + help=_("Number of workers")), + cfg.IntOpt('worker_handle_timeout', default=1800, + help=_("Timeout for worker's one turn of processing, in" + " seconds")), + cfg.IntOpt('job_run_expire', default=60, + help=_("Running job is considered expires after this time, in" + " seconds")), + cfg.FloatOpt('worker_sleep_time', default=0.1, + help=_("Seconds a worker sleeps after one run in a loop")) ] service_opts = [