
Now with redis backend, if the queue is deleted, the request to claim is still work and will return some unexpected result, such as 503 error. So we should ensure the queue exists before get/update the claim. Change-Id: I324e342b0ec49ca2b6aa48e371fc818a53ac1ffb Closes-bug: #1591921
416 lines
15 KiB
Python
416 lines
15 KiB
Python
# Copyright (c) 2014 Prashanth Raghu.
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
|
# implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
|
|
import functools
|
|
|
|
import msgpack
|
|
from oslo_utils import timeutils
|
|
from oslo_utils import uuidutils
|
|
|
|
from zaqar.common import decorators
|
|
from zaqar import storage
|
|
from zaqar.storage import errors
|
|
from zaqar.storage.redis import messages
|
|
from zaqar.storage.redis import scripting
|
|
from zaqar.storage.redis import utils
|
|
|
|
QUEUE_CLAIMS_SUFFIX = 'claims'
|
|
CLAIM_MESSAGES_SUFFIX = 'messages'
|
|
|
|
RETRY_CLAIM_TIMEOUT = 10
|
|
|
|
# NOTE(kgriffs): Number of claims to read at a time when counting
|
|
# the total number of claimed messages for a queue.
|
|
#
|
|
# TODO(kgriffs): Tune this parameter and/or make it configurable. It
|
|
# takes ~0.8 ms to retrieve 100 items from a sorted set on a 2.7 GHz
|
|
# Intel Core i7 (not including network latency).
|
|
COUNTING_BATCH_SIZE = 100
|
|
|
|
|
|
class ClaimController(storage.Claim, scripting.Mixin):
|
|
"""Implements claim resource operations using Redis.
|
|
|
|
Redis Data Structures:
|
|
|
|
1. Claims list (Redis set) contains claim IDs
|
|
|
|
Key: <project_id>.<queue_name>.claims
|
|
|
|
+-------------+---------+
|
|
| Name | Field |
|
|
+=============+=========+
|
|
| claim_ids | m |
|
|
+-------------+---------+
|
|
|
|
2. Claimed Messages (Redis set) contains the list
|
|
of message ids stored per claim
|
|
|
|
Key: <claim_id>.messages
|
|
|
|
3. Claim info (Redis hash):
|
|
|
|
Key: <claim_id>
|
|
|
|
+----------------+---------+
|
|
| Name | Field |
|
|
+================+=========+
|
|
| ttl | t |
|
|
+----------------+---------+
|
|
| id | id |
|
|
+----------------+---------+
|
|
| expires | e |
|
|
+----------------+---------+
|
|
| num_messages | n |
|
|
+----------------+---------+
|
|
"""
|
|
|
|
script_names = ['claim_messages']
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
super(ClaimController, self).__init__(*args, **kwargs)
|
|
self._client = self.driver.connection
|
|
|
|
self._packer = msgpack.Packer(encoding='utf-8',
|
|
use_bin_type=True).pack
|
|
self._unpacker = functools.partial(msgpack.unpackb, encoding='utf-8')
|
|
|
|
@decorators.lazy_property(write=False)
|
|
def _message_ctrl(self):
|
|
return self.driver.message_controller
|
|
|
|
@decorators.lazy_property(write=False)
|
|
def _queue_ctrl(self):
|
|
return self.driver.queue_controller
|
|
|
|
def _get_claim_info(self, claim_id, fields, transform=int):
|
|
"""Get one or more fields from the claim Info."""
|
|
|
|
values = self._client.hmget(claim_id, fields)
|
|
if values == [None]:
|
|
return values
|
|
else:
|
|
return [transform(v) for v in values] if transform else values
|
|
|
|
def _claim_messages(self, msgset_key, now, limit,
|
|
claim_id, claim_expires, msg_ttl, msg_expires):
|
|
|
|
# NOTE(kgriffs): A watch on a pipe could also be used, but that
|
|
# is less efficient and predictable, based on our experience in
|
|
# having to do something similar in the MongoDB driver.
|
|
func = self._scripts['claim_messages']
|
|
|
|
args = [now, limit, claim_id, claim_expires, msg_ttl, msg_expires]
|
|
return func(keys=[msgset_key], args=args)
|
|
|
|
def _exists(self, queue, claim_id, project):
|
|
client = self._client
|
|
claims_set_key = utils.scope_claims_set(queue, project,
|
|
QUEUE_CLAIMS_SUFFIX)
|
|
# In some cases, the queue maybe doesn't exist. So we should check
|
|
# whether the queue exists. Return False if no such queue exists.
|
|
|
|
# Todo(flwang): We should delete all related data after the queue is
|
|
# deleted. See the blueprint for more detail:
|
|
# https://blueprints.launchpad.net/zaqar/+spec/clear-resources-after-delete-queue
|
|
if not self._queue_ctrl._exists(queue, project):
|
|
return False
|
|
|
|
# Return False if no such claim exists
|
|
# TODO(prashanthr_): Discuss the feasibility of a bloom filter.
|
|
if client.zscore(claims_set_key, claim_id) is None:
|
|
return False
|
|
|
|
expires = self._get_claim_info(claim_id, b'e')[0]
|
|
now = timeutils.utcnow_ts()
|
|
|
|
if expires <= now:
|
|
# NOTE(kgriffs): Redis should automatically remove the
|
|
# other records in the very near future. This one
|
|
# has to be manually deleted, however.
|
|
client.zrem(claims_set_key, claim_id)
|
|
return False
|
|
|
|
return True
|
|
|
|
def _get_claimed_message_keys(self, claim_msgs_key):
|
|
return self._client.lrange(claim_msgs_key, 0, -1)
|
|
|
|
def _count_messages(self, queue, project):
|
|
"""Count and return the total number of claimed messages."""
|
|
|
|
# NOTE(kgriffs): Iterate through all claims, adding up the
|
|
# number of messages per claim. This is obviously slower
|
|
# than keeping a side counter, but is also less error-prone.
|
|
# Plus, it avoids having to do a lot of extra work during
|
|
# garbage collection passes. Also, considering that most
|
|
# workloads won't require a large number of claims, most of
|
|
# the time we can do this in a single pass, so it is still
|
|
# pretty fast.
|
|
|
|
claims_set_key = utils.scope_claims_set(queue, project,
|
|
QUEUE_CLAIMS_SUFFIX)
|
|
num_claimed = 0
|
|
offset = 0
|
|
|
|
while True:
|
|
claim_ids = self._client.zrange(claims_set_key, offset,
|
|
offset + COUNTING_BATCH_SIZE - 1)
|
|
if not claim_ids:
|
|
break
|
|
|
|
offset += len(claim_ids)
|
|
|
|
with self._client.pipeline() as pipe:
|
|
for cid in claim_ids:
|
|
pipe.hmget(cid, 'n')
|
|
|
|
claim_infos = pipe.execute()
|
|
|
|
for info in claim_infos:
|
|
# NOTE(kgriffs): In case the claim was deleted out
|
|
# from under us, sanity-check that we got a non-None
|
|
# info list.
|
|
if info:
|
|
num_claimed += int(info[0])
|
|
|
|
return num_claimed
|
|
|
|
def _del_message(self, queue, project, claim_id, message_id, pipe):
|
|
"""Called by MessageController when messages are being deleted.
|
|
|
|
This method removes the message from claim data structures.
|
|
"""
|
|
|
|
claim_msgs_key = utils.scope_claim_messages(claim_id,
|
|
CLAIM_MESSAGES_SUFFIX)
|
|
|
|
# NOTE(kgriffs): In practice, scanning will be quite fast,
|
|
# since the usual pattern is to delete messages from oldest
|
|
# to newest, and the list is sorted in that order. Also,
|
|
# the length of the list will usually be ~10 messages.
|
|
pipe.lrem(claim_msgs_key, 1, message_id)
|
|
|
|
# NOTE(kgriffs): Decrement the message counter used for stats
|
|
pipe.hincrby(claim_id, 'n', -1)
|
|
|
|
@utils.raises_conn_error
|
|
@utils.retries_on_connection_error
|
|
def _gc(self, queue, project):
|
|
"""Garbage-collect expired claim data.
|
|
|
|
Not all claim data can be automatically expired. This method
|
|
cleans up the remainder.
|
|
|
|
:returns: Number of claims removed
|
|
"""
|
|
|
|
claims_set_key = utils.scope_claims_set(queue, project,
|
|
QUEUE_CLAIMS_SUFFIX)
|
|
now = timeutils.utcnow_ts()
|
|
num_removed = self._client.zremrangebyscore(claims_set_key, 0, now)
|
|
return num_removed
|
|
|
|
@utils.raises_conn_error
|
|
@utils.retries_on_connection_error
|
|
def get(self, queue, claim_id, project=None):
|
|
if not self._exists(queue, claim_id, project):
|
|
raise errors.ClaimDoesNotExist(claim_id, queue, project)
|
|
|
|
claim_msgs_key = utils.scope_claim_messages(claim_id,
|
|
CLAIM_MESSAGES_SUFFIX)
|
|
|
|
# basic_messages
|
|
msg_keys = self._get_claimed_message_keys(claim_msgs_key)
|
|
claimed_msgs = messages.Message.from_redis_bulk(msg_keys,
|
|
self._client)
|
|
now = timeutils.utcnow_ts()
|
|
basic_messages = [msg.to_basic(now)
|
|
for msg in claimed_msgs if msg]
|
|
|
|
# claim_meta
|
|
now = timeutils.utcnow_ts()
|
|
expires, ttl = self._get_claim_info(claim_id, [b'e', b't'])
|
|
update_time = expires - ttl
|
|
age = now - update_time
|
|
|
|
claim_meta = {
|
|
'age': age,
|
|
'ttl': ttl,
|
|
'id': claim_id,
|
|
}
|
|
|
|
return claim_meta, basic_messages
|
|
|
|
@utils.raises_conn_error
|
|
@utils.retries_on_connection_error
|
|
def create(self, queue, metadata, project=None,
|
|
limit=storage.DEFAULT_MESSAGES_PER_CLAIM):
|
|
|
|
claim_ttl = metadata['ttl']
|
|
grace = metadata['grace']
|
|
|
|
now = timeutils.utcnow_ts()
|
|
msg_ttl = claim_ttl + grace
|
|
claim_expires = now + claim_ttl
|
|
msg_expires = claim_expires + grace
|
|
|
|
claim_id = uuidutils.generate_uuid()
|
|
claimed_msgs = []
|
|
|
|
# NOTE(kgriffs): Claim some messages
|
|
msgset_key = utils.msgset_key(queue, project)
|
|
claimed_ids = self._claim_messages(msgset_key, now, limit,
|
|
claim_id, claim_expires,
|
|
msg_ttl, msg_expires)
|
|
|
|
if claimed_ids:
|
|
claimed_msgs = messages.Message.from_redis_bulk(claimed_ids,
|
|
self._client)
|
|
claimed_msgs = [msg.to_basic(now) for msg in claimed_msgs]
|
|
|
|
# NOTE(kgriffs): Perist claim records
|
|
with self._client.pipeline() as pipe:
|
|
claim_msgs_key = utils.scope_claim_messages(
|
|
claim_id, CLAIM_MESSAGES_SUFFIX)
|
|
|
|
for mid in claimed_ids:
|
|
pipe.rpush(claim_msgs_key, mid)
|
|
|
|
pipe.expire(claim_msgs_key, claim_ttl)
|
|
|
|
claim_info = {
|
|
'id': claim_id,
|
|
't': claim_ttl,
|
|
'e': claim_expires,
|
|
'n': len(claimed_ids),
|
|
}
|
|
|
|
pipe.hmset(claim_id, claim_info)
|
|
pipe.expire(claim_id, claim_ttl)
|
|
|
|
# NOTE(kgriffs): Add the claim ID to a set so that
|
|
# existence checks can be performed quickly. This
|
|
# is also used as a watch key in order to guard
|
|
# against race conditions.
|
|
#
|
|
# A sorted set is used to facilitate cleaning
|
|
# up the IDs of expired claims.
|
|
claims_set_key = utils.scope_claims_set(queue, project,
|
|
QUEUE_CLAIMS_SUFFIX)
|
|
|
|
pipe.zadd(claims_set_key, claim_expires, claim_id)
|
|
pipe.execute()
|
|
|
|
return claim_id, claimed_msgs
|
|
|
|
@utils.raises_conn_error
|
|
@utils.retries_on_connection_error
|
|
def update(self, queue, claim_id, metadata, project=None):
|
|
if not self._exists(queue, claim_id, project):
|
|
raise errors.ClaimDoesNotExist(claim_id, queue, project)
|
|
|
|
now = timeutils.utcnow_ts()
|
|
|
|
claim_ttl = metadata['ttl']
|
|
claim_expires = now + claim_ttl
|
|
|
|
grace = metadata['grace']
|
|
msg_ttl = claim_ttl + grace
|
|
msg_expires = claim_expires + grace
|
|
|
|
claim_msgs_key = utils.scope_claim_messages(claim_id,
|
|
CLAIM_MESSAGES_SUFFIX)
|
|
|
|
msg_keys = self._get_claimed_message_keys(claim_msgs_key)
|
|
claimed_msgs = messages.MessageEnvelope.from_redis_bulk(msg_keys,
|
|
self._client)
|
|
claim_info = {
|
|
't': claim_ttl,
|
|
'e': claim_expires,
|
|
}
|
|
|
|
with self._client.pipeline() as pipe:
|
|
for msg in claimed_msgs:
|
|
if msg:
|
|
msg.claim_id = claim_id
|
|
msg.claim_expires = claim_expires
|
|
|
|
if _msg_would_expire(msg, claim_expires):
|
|
msg.ttl = msg_ttl
|
|
msg.expires = msg_expires
|
|
|
|
# TODO(kgriffs): Rather than writing back the
|
|
# entire message, only set the fields that
|
|
# have changed.
|
|
#
|
|
# When this change is made, don't forget to
|
|
# also call pipe.expire with the new TTL value.
|
|
msg.to_redis(pipe)
|
|
|
|
# Update the claim id and claim expiration info
|
|
# for all the messages.
|
|
pipe.hmset(claim_id, claim_info)
|
|
pipe.expire(claim_id, claim_ttl)
|
|
|
|
pipe.expire(claim_msgs_key, claim_ttl)
|
|
|
|
claims_set_key = utils.scope_claims_set(queue, project,
|
|
QUEUE_CLAIMS_SUFFIX)
|
|
|
|
pipe.zadd(claims_set_key, claim_expires, claim_id)
|
|
|
|
pipe.execute()
|
|
|
|
@utils.raises_conn_error
|
|
@utils.retries_on_connection_error
|
|
def delete(self, queue, claim_id, project=None):
|
|
# NOTE(prashanthr_): Return silently when the claim
|
|
# does not exist
|
|
if not self._exists(queue, claim_id, project):
|
|
return
|
|
|
|
now = timeutils.utcnow_ts()
|
|
claim_msgs_key = utils.scope_claim_messages(claim_id,
|
|
CLAIM_MESSAGES_SUFFIX)
|
|
|
|
msg_keys = self._get_claimed_message_keys(claim_msgs_key)
|
|
claimed_msgs = messages.MessageEnvelope.from_redis_bulk(msg_keys,
|
|
self._client)
|
|
# Update the claim id and claim expiration info
|
|
# for all the messages.
|
|
claims_set_key = utils.scope_claims_set(queue, project,
|
|
QUEUE_CLAIMS_SUFFIX)
|
|
|
|
with self._client.pipeline() as pipe:
|
|
pipe.zrem(claims_set_key, claim_id)
|
|
pipe.delete(claim_id)
|
|
pipe.delete(claim_msgs_key)
|
|
|
|
for msg in claimed_msgs:
|
|
if msg:
|
|
msg.claim_id = None
|
|
msg.claim_expires = now
|
|
|
|
# TODO(kgriffs): Rather than writing back the
|
|
# entire message, only set the fields that
|
|
# have changed.
|
|
msg.to_redis(pipe)
|
|
|
|
pipe.execute()
|
|
|
|
|
|
def _msg_would_expire(message, now):
|
|
return message.expires <= now
|