From 3641f28cc6d52f4d27a563f47d38af862d53b97e Mon Sep 17 00:00:00 2001 From: xywang <233652566@qq.com> Date: Tue, 11 Dec 2018 11:40:20 +0800 Subject: [PATCH] Fix redis CI job Change-Id: Ia59ad6f5ce6311eda92cf033a72df9c94e1ffb18 --- lower-constraints.txt | 2 +- test-requirements.txt | 2 +- zaqar/storage/redis/catalogue.py | 2 +- zaqar/storage/redis/claims.py | 4 ++-- zaqar/storage/redis/flavors.py | 9 ++++++--- zaqar/storage/redis/messages.py | 7 +++++-- zaqar/storage/redis/pools.py | 11 +++++++---- zaqar/storage/redis/queues.py | 7 +++++-- zaqar/storage/redis/subscriptions.py | 18 ++++++++++-------- 9 files changed, 38 insertions(+), 24 deletions(-) diff --git a/lower-constraints.txt b/lower-constraints.txt index b32919856..5672547ec 100644 --- a/lower-constraints.txt +++ b/lower-constraints.txt @@ -48,7 +48,7 @@ python-subunit==1.0.0 python-swiftclient==3.2.0 pytz==2013.6 PyYAML==3.12 -redis==2.10.0 +redis==3.0.0 reno==2.5.0 requests==2.14.2 requestsexceptions==1.2.0 diff --git a/test-requirements.txt b/test-requirements.txt index 5dc64c798..ed90ce82e 100644 --- a/test-requirements.txt +++ b/test-requirements.txt @@ -8,7 +8,7 @@ hacking!=0.13.0,<0.14,>=0.12.0 # Apache-2.0 mock>=2.0.0 # BSD # Backends -redis>=2.10.0 # MIT +redis>=3.0.0 # MIT pymongo>=3.6.0 # Apache-2.0 python-swiftclient>=3.2.0 # Apache-2.0 websocket-client>=0.44.0 # LGPLv2+ diff --git a/zaqar/storage/redis/catalogue.py b/zaqar/storage/redis/catalogue.py index 389f30eaa..73514cfe9 100644 --- a/zaqar/storage/redis/catalogue.py +++ b/zaqar/storage/redis/catalogue.py @@ -92,7 +92,7 @@ class CatalogueController(base.CatalogueBase): } # Pipeline ensures atomic inserts. with self._client.pipeline() as pipe: - pipe.zadd(catalogue_project_key, 1, queue_key) + pipe.zadd(catalogue_project_key, {queue_key: 1}) pipe.hmset(catalogue_queue_key, catalogue) try: diff --git a/zaqar/storage/redis/claims.py b/zaqar/storage/redis/claims.py index 0a3940cc2..09b2934fb 100644 --- a/zaqar/storage/redis/claims.py +++ b/zaqar/storage/redis/claims.py @@ -315,7 +315,7 @@ class ClaimController(storage.Claim, scripting.Mixin): claims_set_key = utils.scope_claims_set(queue, project, QUEUE_CLAIMS_SUFFIX) - pipe.zadd(claims_set_key, claim_expires, claim_id) + pipe.zadd(claims_set_key, {claim_id: claim_expires}) pipe.execute() if ('_max_claim_count' in queue_meta and @@ -430,7 +430,7 @@ class ClaimController(storage.Claim, scripting.Mixin): claims_set_key = utils.scope_claims_set(queue, project, QUEUE_CLAIMS_SUFFIX) - pipe.zadd(claims_set_key, claim_expires, claim_id) + pipe.zadd(claims_set_key, {claim_id: claim_expires}) pipe.execute() diff --git a/zaqar/storage/redis/flavors.py b/zaqar/storage/redis/flavors.py index cad770f20..669edc4d6 100644 --- a/zaqar/storage/redis/flavors.py +++ b/zaqar/storage/redis/flavors.py @@ -78,7 +78,10 @@ class FlavorsController(base.FlavorsBase): client = self._client subset_key = utils.flavor_project_subset_key(project) marker_key = utils.flavor_name_hash_key(marker) - rank = client.zrank(subset_key, marker_key) + if marker_key: + rank = client.zrank(subset_key, marker_key) + else: + rank = None start = rank + 1 if rank is not None else 0 cursor = (f for f in client.zrange(subset_key, start, @@ -119,8 +122,8 @@ class FlavorsController(base.FlavorsBase): } # Pipeline ensures atomic inserts. with self._client.pipeline() as pipe: - pipe.zadd(set_key, 1, hash_key) - pipe.zadd(subset_key, 1, hash_key) + pipe.zadd(set_key, {hash_key: 1}) + pipe.zadd(subset_key, {hash_key: 1}) pipe.hmset(hash_key, flavors) pipe.execute() else: diff --git a/zaqar/storage/redis/messages.py b/zaqar/storage/redis/messages.py index 17cef59e5..563dd44d3 100644 --- a/zaqar/storage/redis/messages.py +++ b/zaqar/storage/redis/messages.py @@ -135,7 +135,7 @@ class MessageController(storage.Message, scripting.Mixin): return self._client.zcard(utils.msgset_key(queue, project)) def _create_msgset(self, queue, project, pipe): - pipe.zadd(MSGSET_INDEX_KEY, 1, utils.msgset_key(queue, project)) + pipe.zadd(MSGSET_INDEX_KEY, {utils.msgset_key(queue, project): 1}) def _delete_msgset(self, queue, project, pipe): pipe.zrem(MSGSET_INDEX_KEY, utils.msgset_key(queue, project)) @@ -243,7 +243,10 @@ class MessageController(storage.Message, scripting.Mixin): # of the queue; otherwise we would just filter them all # out and likely end up with an empty list to return. marker = self._find_first_unclaimed(queue, project, limit) - start = client.zrank(msgset_key, marker) or 0 + if marker: + start = client.zrank(msgset_key, marker) or 0 + else: + start = 0 else: rank = client.zrank(msgset_key, marker) start = rank + 1 if rank else 0 diff --git a/zaqar/storage/redis/pools.py b/zaqar/storage/redis/pools.py index ee9568195..b51e2b523 100644 --- a/zaqar/storage/redis/pools.py +++ b/zaqar/storage/redis/pools.py @@ -97,7 +97,10 @@ class PoolsController(base.PoolsBase): client = self._client set_key = utils.pools_set_key() marker_key = utils.pools_name_hash_key(marker) - rank = client.zrank(set_key, marker_key) + if marker_key: + rank = client.zrank(set_key, marker_key) + else: + rank = None start = rank + 1 if rank is not None else 0 cursor = (f for f in client.zrange(set_key, start, @@ -163,9 +166,9 @@ class PoolsController(base.PoolsBase): } # Pipeline ensures atomic inserts. with self._client.pipeline() as pipe: - pipe.zadd(set_key, 1, pool_key) + pipe.zadd(set_key, {pool_key: 1}) if flavor is not None: - pipe.zadd(subset_key, 1, pool_key) + pipe.zadd(subset_key, {pool_key: 1}) pipe.hmset(pool_key, pool) pipe.execute() @@ -200,7 +203,7 @@ class PoolsController(base.PoolsBase): if flavor_old != flavor_new: if flavor_new is not None: new_subset_key = utils.pools_subset_key(flavor_new) - pipe.zadd(new_subset_key, 1, pool_key) + pipe.zadd(new_subset_key, {pool_key: 1}) # (gengchc2) remove pool from flavor_old.pools subset if flavor_old is not None: old_subset_key = utils.pools_subset_key(flavor_old) diff --git a/zaqar/storage/redis/queues.py b/zaqar/storage/redis/queues.py index 4746edb35..fa756804b 100644 --- a/zaqar/storage/redis/queues.py +++ b/zaqar/storage/redis/queues.py @@ -89,7 +89,10 @@ class QueueController(storage.Queue): client = self._client qset_key = utils.scope_queue_name(QUEUES_SET_STORE_NAME, project) marker = utils.scope_queue_name(marker, project) - rank = client.zrank(qset_key, marker) + if marker: + rank = client.zrank(qset_key, marker) + else: + rank = None start = rank + 1 if rank else 0 cursor = (q for q in client.zrange(qset_key, start, @@ -133,7 +136,7 @@ class QueueController(storage.Queue): # Pipeline ensures atomic inserts. with self._client.pipeline() as pipe: - pipe.zadd(qset_key, 1, queue_key).hmset(queue_key, queue) + pipe.zadd(qset_key, {queue_key: 1}).hmset(queue_key, queue) try: pipe.execute() diff --git a/zaqar/storage/redis/subscriptions.py b/zaqar/storage/redis/subscriptions.py index c15e976ba..6ca481a63 100644 --- a/zaqar/storage/redis/subscriptions.py +++ b/zaqar/storage/redis/subscriptions.py @@ -59,7 +59,10 @@ class SubscriptionController(base.Subscription): subset_key = utils.scope_subscription_ids_set(queue, project, SUBSCRIPTION_IDS_SUFFIX) - rank = client.zrank(subset_key, marker) + if marker: + rank = client.zrank(subset_key, marker) + else: + rank = None start = rank + 1 if rank is not None else 0 cursor = (q for q in client.zrange(subset_key, start, @@ -71,9 +74,9 @@ class SubscriptionController(base.Subscription): ttl = int(record[2]) expires = int(record[3]) created = expires - ttl - is_confirmed = True + is_confirmed = 1 if len(record) == 6: - is_confirmed = record[5] == str(True) + is_confirmed = record[5] ret = { 'id': sid, 'source': record[0], @@ -114,7 +117,7 @@ class SubscriptionController(base.Subscription): source = queue now = timeutils.utcnow_ts() expires = now + ttl - confirmed = False + confirmed = 0 subscription = {'id': subscription_id, 's': source, @@ -131,9 +134,8 @@ class SubscriptionController(base.Subscription): if not self._is_duplicated_subscriber(subscriber, queue, project): - pipe.zadd(subset_key, 1, - subscription_id).hmset(subscription_id, - subscription) + pipe.zadd(subset_key, {subscription_id: 1}).hmset( + subscription_id, subscription) pipe.expire(subscription_id, ttl) pipe.execute() else: @@ -262,7 +264,7 @@ class SubscriptionController(base.Subscription): # Let's get our subscription by ID. If it does not exist, # SubscriptionDoesNotExist error will be raised internally. self.get(queue, subscription_id, project=project) - + confirmed = 1 if confirmed else 0 fields = {'c': confirmed} with self._client.pipeline() as pipe: pipe.hmset(subscription_id, fields)