From 2bc2db5ae689d87e98a3dfd31e7038363d7dc109 Mon Sep 17 00:00:00 2001 From: zhiyuan_cai Date: Wed, 2 Mar 2016 09:15:46 +0800 Subject: [PATCH] Asynchronous job management(part 1) Implement asynchronous job management to ensure jobs can be successfully completed even if those jobs temporally fail for some reasons. The detailed design can be found in section 9 in design document. This patch focuses on defining database schema and building lock mechanism to avoid running the same type of jobs at the same time. Enabling workers to rerun failed job and purge old job records will be covered in the following patches. Change-Id: I87d0056a95eb7cb963e1c3599062a60299472298 --- tricircle/common/constants.py | 8 ++ tricircle/db/api.py | 68 ++++++++++++ .../db/migrate_repo/versions/002_resource.py | 17 ++- tricircle/db/models.py | 20 ++++ tricircle/tests/unit/xjob/test_xmanager.py | 103 +++++++++++++++++- tricircle/xjob/xmanager.py | 78 +++++++++++++ tricircle/xjob/xservice.py | 10 +- 7 files changed, 300 insertions(+), 4 deletions(-) diff --git a/tricircle/common/constants.py b/tricircle/common/constants.py index 1020a0f..8eaf6c5 100644 --- a/tricircle/common/constants.py +++ b/tricircle/common/constants.py @@ -58,3 +58,11 @@ ns_bridge_port_name = 'ns_bridge_port_%s_%s_%s' MAX_INT = 0x7FFFFFFF expire_time = datetime.datetime(2000, 1, 1) + +# job status +JS_New = 'New' +JS_Running = 'Running' +JS_Success = 'Success' +JS_Fail = 'Fail' + +SP_EXTRA_ID = '00000000-0000-0000-0000-000000000000' diff --git a/tricircle/db/api.py b/tricircle/db/api.py index fe32019..5ddd40a 100644 --- a/tricircle/db/api.py +++ b/tricircle/db/api.py @@ -21,7 +21,9 @@ from oslo_config import cfg from oslo_db import exception as db_exc from oslo_log import log as logging from oslo_utils import timeutils +from oslo_utils import uuidutils +from tricircle.common import constants from tricircle.common.context import is_admin_context as _is_admin_context from tricircle.common import exceptions from tricircle.common.i18n import _ @@ -202,6 +204,72 @@ def get_pod_by_name(context, pod_name): return None +def new_job(context, _type, resource_id): + with context.session.begin(): + job_dict = {'id': uuidutils.generate_uuid(), + 'type': _type, + 'status': constants.JS_New, + 'resource_id': resource_id, + 'extra_id': uuidutils.generate_uuid()} + job = core.create_resource(context, models.Job, job_dict) + return job + + +def register_job(context, _type, resource_id): + try: + context.session.begin() + job_dict = {'id': uuidutils.generate_uuid(), + 'type': _type, + 'status': constants.JS_Running, + 'resource_id': resource_id, + 'extra_id': constants.SP_EXTRA_ID} + job = core.create_resource(context, models.Job, job_dict) + context.session.commit() + return job + except db_exc.DBDuplicateEntry: + context.session.rollback() + return None + except db_exc.DBDeadlock: + context.session.rollback() + return None + finally: + context.session.close() + + +def get_latest_timestamp(context, status, _type, resource_id): + jobs = core.query_resource( + context, models.Job, + [{'key': 'status', 'comparator': 'eq', 'value': status}, + {'key': 'type', 'comparator': 'eq', 'value': _type}, + {'key': 'resource_id', 'comparator': 'eq', 'value': resource_id}], + [('timestamp', False)]) + if jobs: + return jobs[0]['timestamp'] + else: + return None + + +def get_running_job(context, _type, resource_id): + jobs = core.query_resource( + context, models.Job, + [{'key': 'resource_id', 'comparator': 'eq', 'value': resource_id}, + {'key': 'status', 'comparator': 'eq', 'value': constants.JS_Running}, + {'key': 'type', 'comparator': 'eq', 'value': _type}], []) + if jobs: + return jobs[0] + else: + return None + + +def finish_job(context, job_id, successful, timestamp): + status = constants.JS_Success if successful else constants.JS_Fail + with context.session.begin(): + job_dict = {'status': status, + 'timestamp': timestamp, + 'extra_id': uuidutils.generate_uuid()} + core.update_resource(context, models.Job, job_id, job_dict) + + _DEFAULT_QUOTA_NAME = 'default' diff --git a/tricircle/db/migrate_repo/versions/002_resource.py b/tricircle/db/migrate_repo/versions/002_resource.py index c4305b5..8f6cecf 100644 --- a/tricircle/db/migrate_repo/versions/002_resource.py +++ b/tricircle/db/migrate_repo/versions/002_resource.py @@ -217,10 +217,25 @@ def upgrade(migrate_engine): mysql_engine='InnoDB', mysql_charset='utf8') + job = sql.Table( + 'job', meta, + sql.Column('id', sql.String(length=36), primary_key=True), + sql.Column('type', sql.String(length=36)), + sql.Column('timestamp', sql.TIMESTAMP, + server_default=sql.text('CURRENT_TIMESTAMP')), + sql.Column('status', sql.String(length=36)), + sql.Column('resource_id', sql.String(length=36)), + sql.Column('extra_id', sql.String(length=36)), + migrate.UniqueConstraint( + 'type', 'status', 'resource_id', 'extra_id', + name='job0type0status0resource_id0extra_id'), + mysql_engine='InnoDB', + mysql_charset='utf8') + tables = [aggregates, aggregate_metadata, instance_types, instance_type_projects, instance_type_extra_specs, key_pairs, quotas, quota_classes, quota_usages, reservations, - volume_types, + volume_types, job, quality_of_service_specs, cascaded_pods_resource_routing] for table in tables: table.create() diff --git a/tricircle/db/models.py b/tricircle/db/models.py index 5414589..a24bf56 100644 --- a/tricircle/db/models.py +++ b/tricircle/db/models.py @@ -385,3 +385,23 @@ class ResourceRouting(core.ModelBase, core.DictBase, models.TimestampMixin): project_id = sql.Column('project_id', sql.String(length=36)) resource_type = sql.Column('resource_type', sql.String(length=64), nullable=False) + + +class Job(core.ModelBase, core.DictBase): + __tablename__ = 'job' + __table_args__ = ( + schema.UniqueConstraint( + 'type', 'status', 'resource_id', 'extra_id', + name='job0type0status0resource_id0extra_id'), + ) + + attributes = ['id', 'type', 'timestamp', 'status', 'resource_id', + 'extra_id'] + + id = sql.Column('id', sql.String(length=36), primary_key=True) + type = sql.Column('type', sql.String(length=36)) + timestamp = sql.Column('timestamp', sql.TIMESTAMP, + server_default=sql.text('CURRENT_TIMESTAMP')) + status = sql.Column('status', sql.String(length=36)) + resource_id = sql.Column('resource_id', sql.String(length=36)) + extra_id = sql.Column('extra_id', sql.String(length=36)) diff --git a/tricircle/tests/unit/xjob/test_xmanager.py b/tricircle/tests/unit/xjob/test_xmanager.py index 549e1b5..a166742 100644 --- a/tricircle/tests/unit/xjob/test_xmanager.py +++ b/tricircle/tests/unit/xjob/test_xmanager.py @@ -13,15 +13,21 @@ # See the License for the specific language governing permissions and # limitations under the License. +import datetime import mock from mock import patch import unittest +from oslo_config import cfg +from oslo_utils import uuidutils + +from tricircle.common import constants from tricircle.common import context import tricircle.db.api as db_api from tricircle.db import core from tricircle.db import models from tricircle.xjob import xmanager +from tricircle.xjob import xservice BOTTOM1_NETWORK = [] @@ -32,7 +38,8 @@ BOTTOM1_PORT = [] BOTTOM2_PORT = [] BOTTOM1_ROUTER = [] BOTTOM2_ROUTER = [] -RES_LIST = [BOTTOM1_SUBNET, BOTTOM2_SUBNET, BOTTOM1_PORT, BOTTOM2_PORT] +RES_LIST = [BOTTOM1_NETWORK, BOTTOM2_NETWORK, BOTTOM1_SUBNET, BOTTOM2_SUBNET, + BOTTOM1_PORT, BOTTOM2_PORT, BOTTOM1_ROUTER, BOTTOM2_ROUTER] RES_MAP = {'pod_1': {'network': BOTTOM1_NETWORK, 'subnet': BOTTOM1_SUBNET, 'port': BOTTOM1_PORT, @@ -93,6 +100,10 @@ class XManagerTest(unittest.TestCase): core.ModelBase.metadata.create_all(core.get_engine()) # enforce foreign key constraint for sqlite core.get_engine().execute('pragma foreign_keys=on') + for opt in xservice.common_opts: + if opt.name in ('worker_handle_timeout', 'job_run_expire', + 'worker_sleep_time'): + cfg.CONF.register_opt(opt) self.context = context.Context() self.xmanager = FakeXManager() @@ -160,7 +171,7 @@ class XManagerTest(unittest.TestCase): 'ip_address': '10.0.3.1'}]}) self.xmanager.configure_extra_routes(self.context, - {'router': top_router_id}) + payload={'router': top_router_id}) calls = [mock.call(self.context, 'router_1_id', {'router': { 'routes': [{'nexthop': '100.0.1.2', @@ -172,3 +183,91 @@ class XManagerTest(unittest.TestCase): {'nexthop': '100.0.1.1', 'destination': '10.0.3.0/24'}]}})] mock_update.assert_has_calls(calls) + + def test_job_handle(self): + @xmanager._job_handle('fake_resource') + def fake_handle(self, ctx, payload): + pass + + fake_id = 'fake_id' + payload = {'fake_resource': fake_id} + fake_handle(None, self.context, payload=payload) + + jobs = core.query_resource(self.context, models.Job, [], []) + expected_status = [constants.JS_New, constants.JS_Success] + job_status = [job['status'] for job in jobs] + self.assertItemsEqual(expected_status, job_status) + + self.assertEqual(fake_id, jobs[0]['resource_id']) + self.assertEqual(fake_id, jobs[1]['resource_id']) + self.assertEqual('fake_resource', jobs[0]['type']) + self.assertEqual('fake_resource', jobs[1]['type']) + + def test_job_handle_exception(self): + @xmanager._job_handle('fake_resource') + def fake_handle(self, ctx, payload): + raise Exception() + + fake_id = 'fake_id' + payload = {'fake_resource': fake_id} + fake_handle(None, self.context, payload=payload) + + jobs = core.query_resource(self.context, models.Job, [], []) + expected_status = [constants.JS_New, constants.JS_Fail] + job_status = [job['status'] for job in jobs] + self.assertItemsEqual(expected_status, job_status) + + self.assertEqual(fake_id, jobs[0]['resource_id']) + self.assertEqual(fake_id, jobs[1]['resource_id']) + self.assertEqual('fake_resource', jobs[0]['type']) + self.assertEqual('fake_resource', jobs[1]['type']) + + def test_job_run_expire(self): + @xmanager._job_handle('fake_resource') + def fake_handle(self, ctx, payload): + pass + + fake_id = uuidutils.generate_uuid() + payload = {'fake_resource': fake_id} + expired_job = { + 'id': uuidutils.generate_uuid(), + 'type': 'fake_resource', + 'timestamp': datetime.datetime.now() - datetime.timedelta(0, 120), + 'status': constants.JS_Running, + 'resource_id': fake_id, + 'extra_id': constants.SP_EXTRA_ID + } + core.create_resource(self.context, models.Job, expired_job) + fake_handle(None, self.context, payload=payload) + + jobs = core.query_resource(self.context, models.Job, [], []) + expected_status = ['New', 'Fail', 'Success'] + job_status = [job['status'] for job in jobs] + self.assertItemsEqual(expected_status, job_status) + + for i in xrange(3): + self.assertEqual(fake_id, jobs[i]['resource_id']) + self.assertEqual('fake_resource', jobs[i]['type']) + + @patch.object(db_api, 'get_running_job') + @patch.object(db_api, 'register_job') + def test_worker_handle_timeout(self, mock_register, mock_get): + @xmanager._job_handle('fake_resource') + def fake_handle(self, ctx, payload): + pass + + cfg.CONF.set_override('worker_handle_timeout', 1) + mock_register.return_value = None + mock_get.return_value = None + + fake_id = uuidutils.generate_uuid() + payload = {'fake_resource': fake_id} + fake_handle(None, self.context, payload=payload) + + # nothing to assert, what we test is that fake_handle can exit when + # timeout + + def tearDown(self): + core.ModelBase.metadata.drop_all(core.get_engine()) + for res in RES_LIST: + del res[:] diff --git a/tricircle/xjob/xmanager.py b/tricircle/xjob/xmanager.py index bbd1a24..566b74e 100755 --- a/tricircle/xjob/xmanager.py +++ b/tricircle/xjob/xmanager.py @@ -13,7 +13,10 @@ # See the License for the specific language governing permissions and # limitations under the License. +import datetime +import eventlet import netaddr +import six from oslo_config import cfg from oslo_log import log as logging @@ -23,7 +26,9 @@ from oslo_service import periodic_task from tricircle.common import client from tricircle.common import constants from tricircle.common.i18n import _ +from tricircle.common.i18n import _LE from tricircle.common.i18n import _LI +from tricircle.common.i18n import _LW import tricircle.db.api as db_api @@ -31,6 +36,78 @@ CONF = cfg.CONF LOG = logging.getLogger(__name__) +def _job_handle(job_type): + def handle_func(func): + @six.wraps(func) + def handle_args(*args, **kwargs): + ctx = args[1] + payload = kwargs['payload'] + + resource_id = payload[job_type] + db_api.new_job(ctx, job_type, resource_id) + start_time = datetime.datetime.now() + + while True: + current_time = datetime.datetime.now() + delta = current_time - start_time + if delta.seconds >= CONF.worker_handle_timeout: + # quit when this handle is running for a long time + break + time_new = db_api.get_latest_timestamp(ctx, constants.JS_New, + job_type, resource_id) + time_success = db_api.get_latest_timestamp( + ctx, constants.JS_Success, job_type, resource_id) + if time_success and time_success >= time_new: + break + job = db_api.register_job(ctx, job_type, resource_id) + if not job: + # fail to obtain the lock, let other worker handle the job + running_job = db_api.get_running_job(ctx, job_type, + resource_id) + if not running_job: + # there are two reasons that running_job is None. one + # is that the running job has just been finished, the + # other is that all workers fail to register the job + # due to deadlock exception. so we sleep and try again + eventlet.sleep(CONF.worker_sleep_time) + continue + job_time = running_job['timestamp'] + current_time = datetime.datetime.now() + delta = current_time - job_time + if delta.seconds > CONF.job_run_expire: + # previous running job expires, we set its status to + # fail and try again to obtain the lock + db_api.finish_job(ctx, running_job['id'], False, + time_new) + LOG.warning(_LW('Job %(job)s of type %(job_type)s for ' + 'resource %(resource)s expires, set ' + 'its state to Fail'), + {'job': running_job['id'], + 'job_type': job_type, + 'resource': resource_id}) + eventlet.sleep(CONF.worker_sleep_time) + continue + else: + # previous running job is still valid, we just leave + # the job to the worker who holds the lock + break + # successfully obtain the lock, start to execute handler + try: + func(*args, **kwargs) + except Exception: + db_api.finish_job(ctx, job['id'], False, time_new) + LOG.error(_LE('Job %(job)s of type %(job_type)s for ' + 'resource %(resource)s fails'), + {'job': job['id'], + 'job_type': job_type, + 'resource': resource_id}) + break + db_api.finish_job(ctx, job['id'], True, time_new) + eventlet.sleep(CONF.worker_sleep_time) + return handle_args + return handle_func + + class PeriodicTasks(periodic_task.PeriodicTasks): def __init__(self): super(PeriodicTasks, self).__init__(CONF) @@ -128,6 +205,7 @@ class XManager(PeriodicTasks): return info_text + @_job_handle('router') def configure_extra_routes(self, ctx, payload): # TODO(zhiyuan) performance and reliability issue # better have a job tracking mechanism diff --git a/tricircle/xjob/xservice.py b/tricircle/xjob/xservice.py index e49613d..dc1a39d 100755 --- a/tricircle/xjob/xservice.py +++ b/tricircle/xjob/xservice.py @@ -47,7 +47,15 @@ common_opts = [ cfg.StrOpt('host', default='tricircle.xhost', help=_("The host name for RPC server")), cfg.IntOpt('workers', default=1, - help=_("number of workers")), + help=_("Number of workers")), + cfg.IntOpt('worker_handle_timeout', default=1800, + help=_("Timeout for worker's one turn of processing, in" + " seconds")), + cfg.IntOpt('job_run_expire', default=60, + help=_("Running job is considered expires after this time, in" + " seconds")), + cfg.FloatOpt('worker_sleep_time', default=0.1, + help=_("Seconds a worker sleeps after one run in a loop")) ] service_opts = [