diff --git a/setup.cfg b/setup.cfg index 8b728f6ca..92f72421c 100644 --- a/setup.cfg +++ b/setup.cfg @@ -42,6 +42,7 @@ zaqar.data.storage = sqlite = zaqar.storage.sqlalchemy.driver:DataDriver sqlalchemy = zaqar.storage.sqlalchemy.driver:DataDriver mongodb = zaqar.storage.mongodb.driver:DataDriver + mongodb.fifo = zaqar.storage.mongodb.driver:FIFODataDriver redis = zaqar.storage.redis.driver:DataDriver faulty = zaqar.tests.faulty_storage:DataDriver diff --git a/tests/etc/wsgi_fifo_mongodb.conf b/tests/etc/wsgi_fifo_mongodb.conf new file mode 100644 index 000000000..40efdb10a --- /dev/null +++ b/tests/etc/wsgi_fifo_mongodb.conf @@ -0,0 +1,21 @@ +[DEFAULT] +debug = False +verbose = False +unreliable = True + +[drivers] +transport = wsgi +storage = mongodb + +[drivers:transport:wsgi] +port = 8888 + +[drivers:message_store:mongodb] +uri = mongodb.fifo://127.0.0.1:27017 +database = message_zaqar_test_fifo +max_reconnect_attempts = 3 +reconnect_sleep = 0.001 + +# NOTE(kgriffs): Reduce from the default of 1000 to reduce the +# duration of related tests +max_attempts = 5 diff --git a/tests/unit/storage/test_impl_mongodb.py b/tests/unit/storage/test_impl_mongodb.py index 8a59164a1..7dcedf8cd 100644 --- a/tests/unit/storage/test_impl_mongodb.py +++ b/tests/unit/storage/test_impl_mongodb.py @@ -342,6 +342,17 @@ class MongodbMessageTests(MongodbSetupMixin, base.MessageControllerTest): timeutils.clear_time_override() + +@testing.requires_mongodb +class MongodbFIFOMessageTests(MongodbSetupMixin, base.MessageControllerTest): + + driver_class = mongodb.DataDriver + config_file = 'wsgi_fifo_mongodb.conf' + controller_class = controllers.FIFOMessageController + + # NOTE(kgriffs): MongoDB's TTL scavenger only runs once a minute + gc_interval = 60 + def test_race_condition_on_post(self): queue_name = self.queue_name @@ -459,13 +470,12 @@ class MongodbSubscriptionTests(MongodbSetupMixin, @testing.requires_mongodb class MongodbPoolsTests(base.PoolsControllerTest): + config_file = 'wsgi_mongodb.conf' driver_class = mongodb.ControlDriver controller_class = controllers.PoolsController def setUp(self): super(MongodbPoolsTests, self).setUp() - self.load_conf('wsgi_mongodb.conf') - self.flavors_controller = self.driver.flavors_controller def tearDown(self): super(MongodbPoolsTests, self).tearDown() @@ -478,6 +488,13 @@ class MongodbPoolsTests(base.PoolsControllerTest): with testing.expect(errors.PoolInUseByFlavor): self.pools_controller.delete(self.pool) + def test_mismatching_capabilities_fifo(self): + with testing.expect(errors.PoolCapabilitiesMismatch): + self.pools_controller.create(str(uuid.uuid1()), + 100, 'mongodb.fifo://localhost', + group=self.pool_group, + options={}) + @testing.requires_mongodb class MongodbCatalogueTests(base.CatalogueControllerTest): diff --git a/zaqar/storage/mongodb/controllers.py b/zaqar/storage/mongodb/controllers.py index 01b176a8e..7706f1538 100644 --- a/zaqar/storage/mongodb/controllers.py +++ b/zaqar/storage/mongodb/controllers.py @@ -35,6 +35,7 @@ CatalogueController = catalogue.CatalogueController ClaimController = claims.ClaimController FlavorsController = flavors.FlavorsController MessageController = messages.MessageController +FIFOMessageController = messages.FIFOMessageController QueueController = queues.QueueController PoolsController = pools.PoolsController SubscriptionController = subscriptions.SubscriptionController diff --git a/zaqar/storage/mongodb/driver.py b/zaqar/storage/mongodb/driver.py index da213926c..d83b7efa1 100644 --- a/zaqar/storage/mongodb/driver.py +++ b/zaqar/storage/mongodb/driver.py @@ -32,6 +32,13 @@ LOG = logging.getLogger(__name__) def _connection(conf): + # NOTE(flaper87): remove possible zaqar specific + # schemes like: mongodb.fifo + uri = conf.uri + + if conf.uri: + uri = "mongodb://%s" % (conf.uri.split("://")[-1]) + if conf.uri and 'replicaSet' in conf.uri: MongoClient = pymongo.MongoReplicaSetClient else: @@ -58,9 +65,9 @@ def _connection(conf): if conf.ssl_ca_certs: kwargs['ssl_ca_certs'] = conf.ssl_ca_certs - return MongoClient(conf.uri, **kwargs) + return MongoClient(uri, **kwargs) - return MongoClient(conf.uri) + return MongoClient(uri) class DataDriver(storage.DataDriverBase): @@ -69,6 +76,8 @@ class DataDriver(storage.DataDriverBase): _DRIVER_OPTIONS = options._config_options() + _COL_SUFIX = "_messages_p" + def __init__(self, conf, cache): super(DataDriver, self).__init__(conf, cache) @@ -162,7 +171,7 @@ class DataDriver(storage.DataDriverBase): # # self.driver.message_databases[0] # - return [self.connection[name + '_messages_p' + str(p)] + return [self.connection[name + self._COL_SUFIX + str(p)] for p in range(partitions)] @decorators.lazy_property(write=False) @@ -193,6 +202,20 @@ class DataDriver(storage.DataDriverBase): return controllers.SubscriptionController(self) +class FIFODataDriver(DataDriver): + + BASE_CAPABILITIES = (storage.Capabilities.DURABILITY, + storage.Capabilities.CLAIMS, + storage.Capabilities.AOD, + storage.Capabilities.HIGH_THROUGHPUT) + + _COL_SUFIX = "_messages_fifo_p" + + @decorators.lazy_property(write=False) + def message_controller(self): + return controllers.FIFOMessageController(self) + + class ControlDriver(storage.ControlDriverBase): def __init__(self, conf, cache): diff --git a/zaqar/storage/mongodb/messages.py b/zaqar/storage/mongodb/messages.py index 5aef7fa4e..3dd3fb071 100644 --- a/zaqar/storage/mongodb/messages.py +++ b/zaqar/storage/mongodb/messages.py @@ -183,16 +183,8 @@ class MessageController(storage.Message): name='counting', background=True) - # NOTE(kgriffs): This index must be unique so that - # inserting a message with the same marker to the - # same queue will fail; this is used to detect a - # race condition which can cause an observer client - # to miss a message when there is more than one - # producer posting messages to the same queue, in - # parallel. collection.ensure_index(MARKER_INDEX_FIELDS, name='queue_marker', - unique=True, background=True) collection.ensure_index(TRANSACTION_INDEX_FIELDS, @@ -498,6 +490,171 @@ class MessageController(storage.Message): return utils.HookedCursor(messages, denormalizer) + @utils.raises_conn_error + @utils.retries_on_autoreconnect + def post(self, queue_name, messages, client_uuid, project=None): + # NOTE(flaper87): This method should be safe to retry on + # autoreconnect, since we've a 2-step insert for messages. + # The worst-case scenario is that we'll increase the counter + # several times and we'd end up with some non-active messages. + + if not self._queue_ctrl.exists(queue_name, project): + raise errors.QueueDoesNotExist(queue_name, project) + + now = timeutils.utcnow_ts() + now_dt = datetime.datetime.utcfromtimestamp(now) + collection = self._collection(queue_name, project) + + messages = list(messages) + msgs_n = len(messages) + next_marker = self._queue_ctrl._inc_counter(queue_name, + project, + amount=msgs_n) - msgs_n + + prepared_messages = [ + { + PROJ_QUEUE: utils.scope_queue_name(queue_name, project), + 't': message['ttl'], + 'e': now_dt + datetime.timedelta(seconds=message['ttl']), + 'u': client_uuid, + 'c': {'id': None, 'e': now}, + 'b': message['body'] if 'body' in message else {}, + 'k': next_marker + index, + 'tx': None, + } + + for index, message in enumerate(messages) + ] + + ids = collection.insert(prepared_messages) + + return [str(id_) for id_ in ids] + + @utils.raises_conn_error + @utils.retries_on_autoreconnect + def delete(self, queue_name, message_id, project=None, claim=None): + # NOTE(cpp-cabrera): return early - this is an invalid message + # id so we won't be able to find it any way + mid = utils.to_oid(message_id) + if mid is None: + return + + collection = self._collection(queue_name, project) + + query = { + '_id': mid, + PROJ_QUEUE: utils.scope_queue_name(queue_name, project), + } + + cid = utils.to_oid(claim) + if cid is None: + raise errors.ClaimDoesNotExist(queue_name, project, claim) + + now = timeutils.utcnow_ts() + cursor = collection.find(query).hint(ID_INDEX_FIELDS) + + try: + message = next(cursor) + except StopIteration: + return + + if claim is None: + if _is_claimed(message, now): + raise errors.MessageIsClaimed(message_id) + + else: + if message['c']['id'] != cid: + # NOTE(kgriffs): Read from primary in case the message + # was just barely claimed, and claim hasn't made it to + # the secondary. + pref = pymongo.read_preferences.ReadPreference.PRIMARY + message = collection.find_one(query, read_preference=pref) + + if message['c']['id'] != cid: + if _is_claimed(message, now): + raise errors.MessageNotClaimedBy(message_id, claim) + + raise errors.MessageNotClaimed(message_id) + + collection.remove(query['_id'], w=0) + + @utils.raises_conn_error + @utils.retries_on_autoreconnect + def bulk_delete(self, queue_name, message_ids, project=None): + message_ids = [mid for mid in map(utils.to_oid, message_ids) if mid] + query = { + '_id': {'$in': message_ids}, + PROJ_QUEUE: utils.scope_queue_name(queue_name, project), + } + + collection = self._collection(queue_name, project) + collection.remove(query, w=0) + + @utils.raises_conn_error + @utils.retries_on_autoreconnect + def pop(self, queue_name, limit, project=None): + query = { + PROJ_QUEUE: utils.scope_queue_name(queue_name, project), + } + + # Only include messages that are not part of + # any claim, or are part of an expired claim. + now = timeutils.utcnow_ts() + query['c.e'] = {'$lte': now} + + collection = self._collection(queue_name, project) + fields = {'_id': 1, 't': 1, 'b': 1, 'c.id': 1} + + messages = (collection.find_and_modify(query, + fields=fields, + remove=True) + for _ in range(limit)) + + final_messages = [_basic_message(message, now) + for message in messages + if message] + + return final_messages + + +class FIFOMessageController(MessageController): + + def _ensure_indexes(self, collection): + """Ensures that all indexes are created.""" + + collection.ensure_index(TTL_INDEX_FIELDS, + name='ttl', + expireAfterSeconds=0, + background=True) + + collection.ensure_index(ACTIVE_INDEX_FIELDS, + name='active', + background=True) + + collection.ensure_index(CLAIMED_INDEX_FIELDS, + name='claimed', + background=True) + + collection.ensure_index(COUNTING_INDEX_FIELDS, + name='counting', + background=True) + + # NOTE(kgriffs): This index must be unique so that + # inserting a message with the same marker to the + # same queue will fail; this is used to detect a + # race condition which can cause an observer client + # to miss a message when there is more than one + # producer posting messages to the same queue, in + # parallel. + collection.ensure_index(MARKER_INDEX_FIELDS, + name='queue_marker', + unique=True, + background=True) + + collection.ensure_index(TRANSACTION_INDEX_FIELDS, + name='transaction', + background=True) + @utils.raises_conn_error @utils.retries_on_autoreconnect def post(self, queue_name, messages, client_uuid, project=None): @@ -688,92 +845,6 @@ class MessageController(storage.Message): raise errors.MessageConflict(queue_name, project) - @utils.raises_conn_error - @utils.retries_on_autoreconnect - def delete(self, queue_name, message_id, project=None, claim=None): - # NOTE(cpp-cabrera): return early - this is an invalid message - # id so we won't be able to find it any way - mid = utils.to_oid(message_id) - if mid is None: - return - - collection = self._collection(queue_name, project) - - query = { - '_id': mid, - PROJ_QUEUE: utils.scope_queue_name(queue_name, project), - } - - cid = utils.to_oid(claim) - if cid is None: - raise errors.ClaimDoesNotExist(queue_name, project, claim) - - now = timeutils.utcnow_ts() - cursor = collection.find(query).hint(ID_INDEX_FIELDS) - - try: - message = next(cursor) - except StopIteration: - return - - if claim is None: - if _is_claimed(message, now): - raise errors.MessageIsClaimed(message_id) - - else: - if message['c']['id'] != cid: - # NOTE(kgriffs): Read from primary in case the message - # was just barely claimed, and claim hasn't made it to - # the secondary. - pref = pymongo.read_preferences.ReadPreference.PRIMARY - message = collection.find_one(query, read_preference=pref) - - if message['c']['id'] != cid: - if _is_claimed(message, now): - raise errors.MessageNotClaimedBy(message_id, claim) - - raise errors.MessageNotClaimed(message_id) - - collection.remove(query['_id'], w=0) - - @utils.raises_conn_error - @utils.retries_on_autoreconnect - def bulk_delete(self, queue_name, message_ids, project=None): - message_ids = [mid for mid in map(utils.to_oid, message_ids) if mid] - query = { - '_id': {'$in': message_ids}, - PROJ_QUEUE: utils.scope_queue_name(queue_name, project), - } - - collection = self._collection(queue_name, project) - collection.remove(query, w=0) - - @utils.raises_conn_error - @utils.retries_on_autoreconnect - def pop(self, queue_name, limit, project=None): - query = { - PROJ_QUEUE: utils.scope_queue_name(queue_name, project), - } - - # Only include messages that are not part of - # any claim, or are part of an expired claim. - now = timeutils.utcnow_ts() - query['c.e'] = {'$lte': now} - - collection = self._collection(queue_name, project) - fields = {'_id': 1, 't': 1, 'b': 1, 'c.id': 1} - - messages = (collection.find_and_modify(query, - fields=fields, - remove=True) - for _ in range(limit)) - - final_messages = [_basic_message(message, now) - for message in messages - if message] - - return final_messages - def _is_claimed(msg, now): return (msg['c']['id'] is not None and diff --git a/zaqar/tests/unit/storage/base.py b/zaqar/tests/unit/storage/base.py index bc47fe43c..48b09f34e 100644 --- a/zaqar/tests/unit/storage/base.py +++ b/zaqar/tests/unit/storage/base.py @@ -1070,6 +1070,8 @@ class PoolsControllerTest(ControllerBaseTest): self.pools_controller.create(self.pool, 100, 'localhost', group=self.pool_group, options={}) + self.flavors_controller = self.driver.flavors_controller + def tearDown(self): self.pools_controller.drop_all() super(PoolsControllerTest, self).tearDown()