Merge "Support redis as mgmt storage backend"
This commit is contained in:
commit
6dc9030d61
@ -206,6 +206,11 @@ class PoolAlreadyExists(Conflict):
|
||||
msg_format = u'The database URI is in use by another pool.'
|
||||
|
||||
|
||||
class PoolRedisNotSupportGroup(ExceptionBase):
|
||||
|
||||
msg_format = (u'Redis not support pool_goup, please use flavor ')
|
||||
|
||||
|
||||
class SubscriptionAlreadyExists(Conflict):
|
||||
|
||||
msg_format = (u'Such subscription already exists. Subscriptions '
|
||||
|
@ -34,8 +34,21 @@ Supported Features
|
||||
.. [1] This depends on the backing Redis store performance. For more
|
||||
information, see `Redis' benchmarks <http://redis.io/topics/benchmarks>`_.
|
||||
|
||||
Redis is only a storage driver, and can't be used as the sole backend for a
|
||||
Zaqar deployment.
|
||||
Redis can be used both a storage driver and management driver.
|
||||
For the management driver, you need to enable the redis storage options
|
||||
in redis.conf. Redis persistent storage supports two ways: RDB and AOF.
|
||||
The following is RDB way:
|
||||
The configuration is as follows:
|
||||
save <seconds> <changes>
|
||||
E.g
|
||||
save 900 1
|
||||
save 300 10
|
||||
save 60 10000
|
||||
|
||||
NOTE: save time, the above means that a changed key interval 900s
|
||||
for persistent storage; 10 changed keys 300s for storage;
|
||||
10000 changed keys 60s for storage.
|
||||
|
||||
|
||||
Unsupported Features
|
||||
--------------------
|
||||
@ -45,9 +58,11 @@ Unsupported Features
|
||||
.. [2] As an in-memory store, Redis doesn't support the durability guarantees
|
||||
the MongoDB or SQLAlchemy backends do.
|
||||
|
||||
Redis is not supported as the backend for the Management Store, which means
|
||||
either MongoDB or SQLAlchemy are required in addition to Redis for a working
|
||||
deployment.
|
||||
|
||||
|
||||
"""
|
||||
|
||||
from zaqar.storage.redis import driver
|
||||
|
||||
# Hoist classes into package namespace
|
||||
ControlDriver = driver.ControlDriver
|
||||
DataDriver = driver.DataDriver
|
||||
|
247
zaqar/storage/redis/catalogue.py
Normal file
247
zaqar/storage/redis/catalogue.py
Normal file
@ -0,0 +1,247 @@
|
||||
# Copyright (c) 2017 ZTE Corporation..
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
"""Redis storage controller for the queues catalogue.
|
||||
|
||||
Serves to construct an association between a project + queue -> pool.
|
||||
|
||||
::
|
||||
|
||||
{
|
||||
'p_q': project_queue :: six.text_type,
|
||||
's': pool_identifier :: six.text_type
|
||||
}
|
||||
"""
|
||||
from oslo_log import log as logging
|
||||
import redis
|
||||
import six
|
||||
|
||||
from zaqar.i18n import _
|
||||
from zaqar.storage import base
|
||||
from zaqar.storage import errors
|
||||
from zaqar.storage.redis import utils
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
CATALOGUE_SUFFIX = 'catalogue'
|
||||
COUNTING_BATCH_SIZE = 100
|
||||
|
||||
|
||||
class CatalogueController(base.CatalogueBase):
|
||||
"""Implements Catalogue resource operations using Redis.
|
||||
|
||||
* Project Index (Redis sorted set):
|
||||
|
||||
Set of all queue_ids for the given project, ordered by name.
|
||||
|
||||
Key: <project_id>.catalogue
|
||||
|
||||
+--------+-----------------------------+
|
||||
| Id | Value |
|
||||
+========+=============================+
|
||||
| name | <project_id>.<queue_name> |
|
||||
+--------+-----------------------------+
|
||||
|
||||
* Queue and pool Information (Redis hash):
|
||||
|
||||
Key: <project_id>.<queue_name>.catalogue
|
||||
|
||||
+----------------------+---------+
|
||||
| Name | Field |
|
||||
+======================+=========+
|
||||
| Project | p |
|
||||
+----------------------+---------+
|
||||
| Queue | p_q |
|
||||
+----------------------+---------+
|
||||
| Pool | p_p |
|
||||
+----------------------+---------+
|
||||
"""
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
super(CatalogueController, self).__init__(*args, **kwargs)
|
||||
self._client = self.driver.connection
|
||||
|
||||
@utils.raises_conn_error
|
||||
@utils.retries_on_connection_error
|
||||
def _insert(self, project, queue, pool):
|
||||
queue_key = utils.scope_queue_name(queue, project)
|
||||
catalogue_project_key = utils.scope_pool_catalogue(project,
|
||||
CATALOGUE_SUFFIX)
|
||||
catalogue_queue_key = utils.scope_pool_catalogue(queue_key,
|
||||
CATALOGUE_SUFFIX)
|
||||
# Check if the queue already exists.
|
||||
if self._exists(queue, project):
|
||||
return False
|
||||
|
||||
catalogue = {
|
||||
'p': project,
|
||||
'p_q': queue,
|
||||
'p_p': pool
|
||||
}
|
||||
# Pipeline ensures atomic inserts.
|
||||
with self._client.pipeline() as pipe:
|
||||
pipe.zadd(catalogue_project_key, 1, queue_key)
|
||||
pipe.hmset(catalogue_queue_key, catalogue)
|
||||
|
||||
try:
|
||||
pipe.execute()
|
||||
except redis.exceptions.ResponseError:
|
||||
msgtmpl = _(u'CatalogueController:insert %(prj)s:'
|
||||
'%(queue)s %(pool)s failed')
|
||||
LOG.exception(msgtmpl,
|
||||
{'prj': project, 'queue': queue, 'pool': pool})
|
||||
return False
|
||||
msgtmpl = _(u'CatalogueController:insert %(prj)s:%(queue)s'
|
||||
':%(pool)s, success')
|
||||
LOG.info(msgtmpl,
|
||||
{'prj': project, 'queue': queue, 'pool': pool})
|
||||
return True
|
||||
|
||||
@utils.raises_conn_error
|
||||
@utils.retries_on_connection_error
|
||||
def list(self, project):
|
||||
|
||||
catalogue_project_key = utils.scope_pool_catalogue(project,
|
||||
CATALOGUE_SUFFIX)
|
||||
|
||||
ctlgs = []
|
||||
offset = 0
|
||||
while True:
|
||||
queues = self._client.zrange(catalogue_project_key, offset,
|
||||
offset + COUNTING_BATCH_SIZE - 1)
|
||||
if not queues:
|
||||
break
|
||||
|
||||
offset += len(queues)
|
||||
|
||||
for queue in queues:
|
||||
catalogue_queue_key =\
|
||||
utils.scope_pool_catalogue(queue,
|
||||
CATALOGUE_SUFFIX)
|
||||
ctlg = self._client.hgetall(catalogue_queue_key)
|
||||
ctlgs.append(ctlg)
|
||||
return (_normalize(v) for v in ctlgs)
|
||||
|
||||
@utils.raises_conn_error
|
||||
@utils.retries_on_connection_error
|
||||
def get(self, project, queue):
|
||||
queue_key = utils.scope_queue_name(queue, project)
|
||||
catalogue_queue_key = \
|
||||
utils.scope_pool_catalogue(queue_key,
|
||||
CATALOGUE_SUFFIX)
|
||||
ctlg = self._client.hgetall(catalogue_queue_key)
|
||||
if ctlg is None or len(ctlg) == 0:
|
||||
raise errors.QueueNotMapped(queue, project)
|
||||
|
||||
return _normalize(ctlg)
|
||||
|
||||
@utils.raises_conn_error
|
||||
@utils.retries_on_connection_error
|
||||
def _exists(self, project, queue):
|
||||
queue_key = utils.scope_queue_name(queue, project)
|
||||
catalogue_queue_key = \
|
||||
utils.scope_pool_catalogue(queue_key,
|
||||
CATALOGUE_SUFFIX)
|
||||
return self._client.exists(catalogue_queue_key)
|
||||
|
||||
@utils.raises_conn_error
|
||||
@utils.retries_on_connection_error
|
||||
def exists(self, project, queue):
|
||||
return self._exists(project, queue)
|
||||
|
||||
def insert(self, project, queue, pool):
|
||||
self._insert(project, queue, pool)
|
||||
|
||||
@utils.raises_conn_error
|
||||
@utils.retries_on_connection_error
|
||||
def delete(self, project, queue):
|
||||
# (gengchc): Check if the queue already exists.
|
||||
if not self._exists(project, queue):
|
||||
return True
|
||||
|
||||
queue_key = utils.scope_queue_name(queue, project)
|
||||
catalogue_project_key = utils.scope_pool_catalogue(project,
|
||||
CATALOGUE_SUFFIX)
|
||||
catalogue_queue_key = utils.scope_pool_catalogue(queue_key,
|
||||
CATALOGUE_SUFFIX)
|
||||
# (gengchc) Pipeline ensures atomic inserts.
|
||||
with self._client.pipeline() as pipe:
|
||||
pipe.zrem(catalogue_project_key, queue_key)
|
||||
pipe.delete(catalogue_queue_key)
|
||||
try:
|
||||
pipe.execute()
|
||||
except redis.exceptions.ResponseError:
|
||||
msgtmpl = _(u'CatalogueController:delete %(prj)s'
|
||||
':%(queue)s failed')
|
||||
LOG.info(msgtmpl,
|
||||
{'prj': project, 'queue': queue})
|
||||
return False
|
||||
msgtmpl = _(u'CatalogueController:delete %(prj)s:%(queue)s success')
|
||||
LOG.info(msgtmpl,
|
||||
{'prj': project, 'queue': queue})
|
||||
|
||||
@utils.raises_conn_error
|
||||
@utils.retries_on_connection_error
|
||||
def _update(self, project, queue, pool):
|
||||
# Check if the queue already exists.
|
||||
if not self._exists(project, queue):
|
||||
raise errors.QueueNotMapped(queue, project)
|
||||
|
||||
queue_key = utils.scope_queue_name(queue, project)
|
||||
catalogue_queue_key = utils.scope_pool_catalogue(queue_key,
|
||||
CATALOGUE_SUFFIX)
|
||||
with self._client.pipeline() as pipe:
|
||||
pipe.hset(catalogue_queue_key, "pl", pool)
|
||||
try:
|
||||
pipe.execute()
|
||||
except redis.exceptions.ResponseError:
|
||||
msgtmpl = _(u'CatalogueController:_update %(prj)s'
|
||||
':%(queue)s:%(pool)s failed')
|
||||
LOG.exception(msgtmpl,
|
||||
{'prj': project, 'queue': queue, 'pool': pool})
|
||||
return False
|
||||
msgtmpl = _(u'CatalogueController:_update %(prj)s:%(queue)s'
|
||||
':%(pool)s')
|
||||
LOG.info(msgtmpl,
|
||||
{'prj': project, 'queue': queue, 'pool': pool})
|
||||
|
||||
@utils.raises_conn_error
|
||||
@utils.retries_on_connection_error
|
||||
def update(self, project, queue, pool=None):
|
||||
if pool is None:
|
||||
return False
|
||||
self._update(project, queue, pool)
|
||||
|
||||
@utils.raises_conn_error
|
||||
@utils.retries_on_connection_error
|
||||
def drop_all(self):
|
||||
allcatalogueobj_key = self._client.keys(pattern='*catalog')
|
||||
if len(allcatalogueobj_key) == 0:
|
||||
return
|
||||
with self._client.pipeline() as pipe:
|
||||
for key in allcatalogueobj_key:
|
||||
pipe.delete(key)
|
||||
try:
|
||||
pipe.execute()
|
||||
except redis.exceptions.ResponseError:
|
||||
return False
|
||||
|
||||
|
||||
def _normalize(entry):
|
||||
return {
|
||||
'queue': six.text_type(entry['p_q']),
|
||||
'project': six.text_type(entry['p']),
|
||||
'pool': six.text_type(entry['p_p'])
|
||||
}
|
@ -12,13 +12,18 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from zaqar.storage.redis import catalogue
|
||||
from zaqar.storage.redis import claims
|
||||
from zaqar.storage.redis import flavors
|
||||
from zaqar.storage.redis import messages
|
||||
from zaqar.storage.redis import pools
|
||||
from zaqar.storage.redis import queues
|
||||
from zaqar.storage.redis import subscriptions
|
||||
|
||||
|
||||
QueueController = queues.QueueController
|
||||
MessageController = messages.MessageController
|
||||
CatalogueController = catalogue.CatalogueController
|
||||
ClaimController = claims.ClaimController
|
||||
FlavorsController = flavors.FlavorsController
|
||||
MessageController = messages.MessageController
|
||||
QueueController = queues.QueueController
|
||||
PoolsController = pools.PoolsController
|
||||
SubscriptionController = subscriptions.SubscriptionController
|
||||
|
@ -265,15 +265,31 @@ class ControlDriver(storage.ControlDriverBase):
|
||||
|
||||
@property
|
||||
def pools_controller(self):
|
||||
raise NotImplementedError()
|
||||
controller = controllers.PoolsController(self)
|
||||
if (self.conf.profiler.enabled and
|
||||
self.conf.profiler.trace_management_store):
|
||||
return profiler.trace_cls("redis_pools_controller")(controller)
|
||||
else:
|
||||
return controller
|
||||
|
||||
@property
|
||||
def catalogue_controller(self):
|
||||
raise NotImplementedError()
|
||||
controller = controllers.CatalogueController(self)
|
||||
if (self.conf.profiler.enabled and
|
||||
self.conf.profiler.trace_management_store):
|
||||
return profiler.trace_cls("redis_catalogue_"
|
||||
"controller")(controller)
|
||||
else:
|
||||
return controller
|
||||
|
||||
@property
|
||||
def flavors_controller(self):
|
||||
raise NotImplementedError()
|
||||
controller = controllers.FlavorsController(self)
|
||||
if (self.conf.profiler.enabled and
|
||||
self.conf.profiler.trace_management_store):
|
||||
return profiler.trace_cls("redis_flavors_controller")(controller)
|
||||
else:
|
||||
return controller
|
||||
|
||||
|
||||
def _get_redis_client(driver):
|
||||
|
181
zaqar/storage/redis/flavors.py
Normal file
181
zaqar/storage/redis/flavors.py
Normal file
@ -0,0 +1,181 @@
|
||||
# Copyright (c) 2017 ZTE Corporation.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
|
||||
# use this file except in compliance with the License. You may obtain a copy
|
||||
# of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations under
|
||||
# the License.
|
||||
|
||||
import functools
|
||||
|
||||
import msgpack
|
||||
import redis
|
||||
|
||||
from zaqar.storage import base
|
||||
from zaqar.storage import errors
|
||||
from zaqar.storage.redis import utils
|
||||
|
||||
|
||||
class FlavorsController(base.FlavorsBase):
|
||||
"""Implements flavor resource operations using Redis.
|
||||
|
||||
Redis Data Structures:
|
||||
1 All flavor_ids (Redis sorted set):
|
||||
|
||||
Set of all flavor_ids, ordered by name. Used to
|
||||
delete the all records of table flavors
|
||||
|
||||
Key: flavors
|
||||
|
||||
+--------+-----------------------------+
|
||||
| Id | Value |
|
||||
+========+=============================+
|
||||
| name | <flavor> |
|
||||
+--------+-----------------------------+
|
||||
|
||||
2 Project Index (Redis sorted set):
|
||||
|
||||
Set of all flavors for the given project, ordered by name.
|
||||
|
||||
Key: <project_id>.flavors
|
||||
|
||||
+--------+-----------------------------+
|
||||
| Id | Value |
|
||||
+========+=============================+
|
||||
| name | <flavor> |
|
||||
+--------+-----------------------------+
|
||||
|
||||
3 Flavor Information (Redis hash):
|
||||
|
||||
Key: <flavor_id>.flavors
|
||||
|
||||
+----------------------+---------+
|
||||
| Name | Field |
|
||||
+======================+=========+
|
||||
| flavor | f |
|
||||
+----------------------+---------+
|
||||
| project | p |
|
||||
+----------------------+---------+
|
||||
| capabilities | c |
|
||||
+----------------------+---------+
|
||||
"""
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
super(FlavorsController, self).__init__(*args, **kwargs)
|
||||
self._client = self.driver.connection
|
||||
self._packer = msgpack.Packer(encoding='utf-8',
|
||||
use_bin_type=True).pack
|
||||
self._unpacker = functools.partial(msgpack.unpackb, encoding='utf-8')
|
||||
|
||||
@utils.raises_conn_error
|
||||
def list(self, project=None, marker=None, limit=10, detailed=False):
|
||||
client = self._client
|
||||
subset_key = utils.flavor_project_subset_key(project)
|
||||
marker_key = utils.flavor_name_hash_key(marker)
|
||||
rank = client.zrank(subset_key, marker_key)
|
||||
start = rank + 1 if rank is not None else 0
|
||||
|
||||
cursor = (f for f in client.zrange(subset_key, start,
|
||||
start + limit - 1))
|
||||
marker_next = {}
|
||||
|
||||
def normalizer(flavor):
|
||||
marker_next['next'] = flavor['f']
|
||||
return self._normalize(flavor, detailed=detailed)
|
||||
|
||||
yield utils.FlavorListCursor(self._client, cursor, normalizer)
|
||||
yield marker_next and marker_next['next']
|
||||
|
||||
@utils.raises_conn_error
|
||||
def get(self, name, project=None, detailed=False):
|
||||
hash_key = utils.flavor_name_hash_key(name)
|
||||
flavors = self._client.hgetall(hash_key)
|
||||
|
||||
if flavors is None or len(flavors) == 0:
|
||||
raise errors.FlavorDoesNotExist(name)
|
||||
|
||||
return self._normalize(flavors, detailed)
|
||||
|
||||
@utils.raises_conn_error
|
||||
def create(self, name, project=None, capabilities=None):
|
||||
|
||||
capabilities = {} if capabilities is None else capabilities
|
||||
subset_key = utils.flavor_project_subset_key(project)
|
||||
set_key = utils.flavor_set_key()
|
||||
hash_key = utils.flavor_name_hash_key(name)
|
||||
|
||||
flavors = self._client.hgetall(hash_key)
|
||||
if len(flavors) == 0:
|
||||
flavors = {
|
||||
'f': name,
|
||||
'p': project,
|
||||
'c': self._packer(capabilities or {}),
|
||||
}
|
||||
# Pipeline ensures atomic inserts.
|
||||
with self._client.pipeline() as pipe:
|
||||
pipe.zadd(set_key, 1, hash_key)
|
||||
pipe.zadd(subset_key, 1, hash_key)
|
||||
pipe.hmset(hash_key, flavors)
|
||||
pipe.execute()
|
||||
else:
|
||||
with self._client.pipeline() as pipe:
|
||||
pipe.hset(hash_key, "c", self._packer(capabilities))
|
||||
pipe.hset(hash_key, "p", project)
|
||||
pipe.execute()
|
||||
|
||||
@utils.raises_conn_error
|
||||
def exists(self, name, project=None):
|
||||
set_key = utils.flavor_set_key()
|
||||
hash_key = utils.flavor_name_hash_key(name)
|
||||
return self._client.zrank(set_key, hash_key) is not None
|
||||
|
||||
@utils.raises_conn_error
|
||||
def update(self, name, project=None, capabilities=None):
|
||||
hash_key = utils.flavor_name_hash_key(name)
|
||||
with self._client.pipeline() as pipe:
|
||||
pipe.hset(hash_key, "c", self._packer(capabilities))
|
||||
pipe.hset(hash_key, "p", project)
|
||||
try:
|
||||
pipe.execute()
|
||||
except redis.exceptions.ResponseError:
|
||||
raise errors.FlavorDoesNotExist(name)
|
||||
|
||||
@utils.raises_conn_error
|
||||
def delete(self, name, project=None):
|
||||
subset_key = utils.flavor_project_subset_key(project)
|
||||
set_key = utils.flavor_set_key()
|
||||
hash_key = utils.flavor_name_hash_key(name)
|
||||
if self._client.zrank(subset_key, hash_key) is not None:
|
||||
with self._client.pipeline() as pipe:
|
||||
pipe.zrem(set_key, hash_key)
|
||||
pipe.zrem(subset_key, hash_key)
|
||||
pipe.delete(hash_key)
|
||||
pipe.execute()
|
||||
|
||||
@utils.raises_conn_error
|
||||
def drop_all(self):
|
||||
allflavor_key = self._client.keys(pattern='*flavors')
|
||||
if len(allflavor_key) == 0:
|
||||
return
|
||||
with self._client.pipeline() as pipe:
|
||||
for key in allflavor_key:
|
||||
pipe.delete(key)
|
||||
try:
|
||||
pipe.execute()
|
||||
except redis.exceptions.ResponseError:
|
||||
return False
|
||||
|
||||
def _normalize(self, flavor, detailed=False):
|
||||
ret = {
|
||||
'name': flavor['f'],
|
||||
}
|
||||
|
||||
if detailed:
|
||||
ret['capabilities'] = self._unpacker(flavor['c'])
|
||||
return ret
|
265
zaqar/storage/redis/pools.py
Normal file
265
zaqar/storage/redis/pools.py
Normal file
@ -0,0 +1,265 @@
|
||||
# Copyright (c) 2017 ZTE Corporation.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
|
||||
# use this file except in compliance with the License. You may obtain a copy
|
||||
# of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations under
|
||||
# the License.
|
||||
|
||||
"""pools: an implementation of the pool management storage
|
||||
controller for redis.
|
||||
|
||||
Schema:
|
||||
'n': name :: six.text_type
|
||||
'u': uri :: six.text_type
|
||||
'w': weight :: int
|
||||
'o': options :: dict
|
||||
"""
|
||||
|
||||
import functools
|
||||
import msgpack
|
||||
from oslo_log import log as logging
|
||||
import redis
|
||||
|
||||
from zaqar.common import utils as common_utils
|
||||
from zaqar.storage import base
|
||||
from zaqar.storage import errors
|
||||
from zaqar.storage.redis import utils
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class PoolsController(base.PoolsBase):
|
||||
"""Implements Pools resource operations using Redis.
|
||||
|
||||
* All pool (Redis sorted set):
|
||||
|
||||
Set of all pool_ids, ordered by name. Used to delete the all
|
||||
records of table pools.
|
||||
|
||||
Key: pools
|
||||
|
||||
+--------+-----------------------------+
|
||||
| Id | Value |
|
||||
+========+=============================+
|
||||
| name | <pool> |
|
||||
+--------+-----------------------------+
|
||||
|
||||
* Flavor Index (Redis sorted set):
|
||||
|
||||
Set of all pool_ids for the given flavor, ordered by name.
|
||||
|
||||
Key: <flavor>.pools
|
||||
|
||||
+--------+-----------------------------+
|
||||
| Id | Value |
|
||||
+========+=============================+
|
||||
| name | <pool> |
|
||||
+--------+-----------------------------+
|
||||
|
||||
* Pools Information (Redis hash):
|
||||
|
||||
Key: <pool>.pools
|
||||
|
||||
+----------------------+---------+
|
||||
| Name | Field |
|
||||
+======================+=========+
|
||||
| pool | pl |
|
||||
+----------------------+---------+
|
||||
| uri | u |
|
||||
+----------------------+---------+
|
||||
| weight | w |
|
||||
+----------------------+---------+
|
||||
| options | o |
|
||||
+----------------------+---------+
|
||||
| flavor | f |
|
||||
+----------------------+---------+
|
||||
|
||||
"""
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
super(PoolsController, self).__init__(*args, **kwargs)
|
||||
self._client = self.driver.connection
|
||||
self.flavor_ctl = self.driver.flavors_controller
|
||||
self._packer = msgpack.Packer(encoding='utf-8',
|
||||
use_bin_type=True).pack
|
||||
self._unpacker = functools.partial(msgpack.unpackb, encoding='utf-8')
|
||||
|
||||
@utils.raises_conn_error
|
||||
@utils.retries_on_connection_error
|
||||
def _list(self, marker=None, limit=10, detailed=False):
|
||||
client = self._client
|
||||
set_key = utils.pools_set_key()
|
||||
marker_key = utils.pools_name_hash_key(marker)
|
||||
rank = client.zrank(set_key, marker_key)
|
||||
start = rank + 1 if rank is not None else 0
|
||||
|
||||
cursor = (f for f in client.zrange(set_key, start,
|
||||
start + limit - 1))
|
||||
marker_next = {}
|
||||
|
||||
def normalizer(pools):
|
||||
marker_next['next'] = pools['pl']
|
||||
return self._normalize(pools, detailed=detailed)
|
||||
|
||||
yield utils.PoolsListCursor(self._client, cursor, normalizer)
|
||||
yield marker_next and marker_next['next']
|
||||
|
||||
@utils.raises_conn_error
|
||||
@utils.retries_on_connection_error
|
||||
def _get(self, name, detailed=False):
|
||||
pool_key = utils.pools_name_hash_key(name)
|
||||
pool = self._client.hgetall(pool_key)
|
||||
if pool is None or len(pool) == 0:
|
||||
raise errors.PoolDoesNotExist(name)
|
||||
|
||||
return self._normalize(pool, detailed)
|
||||
|
||||
@utils.raises_conn_error
|
||||
@utils.retries_on_connection_error
|
||||
def _get_pools_by_flavor(self, flavor=None, detailed=False):
|
||||
cursor = None
|
||||
if flavor is None or flavor.get('name') is None:
|
||||
set_key = utils.pools_set_key()
|
||||
cursor = (pl for pl in self._client.zrange(set_key, 0, -1))
|
||||
elif flavor.get('name') is not None:
|
||||
subset_key = utils.pools_subset_key(flavor['name'])
|
||||
cursor = (pl for pl in self._client.zrange(subset_key, 0, -1))
|
||||
if cursor is None:
|
||||
return []
|
||||
normalizer = functools.partial(self._normalize, detailed=detailed)
|
||||
return utils.PoolsListCursor(self._client, cursor, normalizer)
|
||||
|
||||
@utils.raises_conn_error
|
||||
@utils.retries_on_connection_error
|
||||
def _create(self, name, weight, uri, group=None, flavor=None,
|
||||
options=None):
|
||||
|
||||
if group is not None:
|
||||
raise errors.PoolRedisNotSupportGroup
|
||||
|
||||
flavor = flavor if flavor is not None else None
|
||||
options = {} if options is None else options
|
||||
pool_key = utils.pools_name_hash_key(name)
|
||||
subset_key = utils.pools_subset_key(flavor)
|
||||
set_key = utils.pools_set_key()
|
||||
if self._exists(name):
|
||||
self._update(name, weight=weight, uri=uri,
|
||||
flavor=flavor, options=options)
|
||||
return
|
||||
|
||||
pool = {
|
||||
'pl': name,
|
||||
'u': uri,
|
||||
'w': weight,
|
||||
'o': self._packer(options),
|
||||
'f': flavor
|
||||
}
|
||||
# Pipeline ensures atomic inserts.
|
||||
with self._client.pipeline() as pipe:
|
||||
pipe.zadd(set_key, 1, pool_key)
|
||||
if flavor is not None:
|
||||
pipe.zadd(subset_key, 1, pool_key)
|
||||
pipe.hmset(pool_key, pool)
|
||||
pipe.execute()
|
||||
|
||||
@utils.raises_conn_error
|
||||
@utils.retries_on_connection_error
|
||||
def _exists(self, name):
|
||||
pool_key = utils.pools_name_hash_key(name)
|
||||
return self._client.exists(pool_key)
|
||||
|
||||
@utils.raises_conn_error
|
||||
@utils.retries_on_connection_error
|
||||
def _update(self, name, **kwargs):
|
||||
names = ('uri', 'weight', 'flavor', 'options')
|
||||
fields = common_utils.fields(kwargs, names,
|
||||
pred=lambda x: x is not None,
|
||||
key_transform=lambda x: x[0])
|
||||
assert fields, ('`weight`, `uri`, `flavor`, '
|
||||
'or `options` not found in kwargs')
|
||||
|
||||
if 'o' in fields:
|
||||
new_options = fields.get('o', None)
|
||||
fields['o'] = self._packer(new_options)
|
||||
|
||||
pool_key = utils.pools_name_hash_key(name)
|
||||
# (gengchc2): Pipeline ensures atomic inserts.
|
||||
with self._client.pipeline() as pipe:
|
||||
# (gengchc2): If flavor is changed, we need to change.pool key
|
||||
# in pools subset.
|
||||
if 'f' in fields:
|
||||
flavor_old = self._get(name).get('flavor')
|
||||
flavor_new = fields['f']
|
||||
if flavor_old != flavor_new:
|
||||
if flavor_new is not None:
|
||||
new_subset_key = utils.pools_subset_key(flavor_new)
|
||||
pipe.zadd(new_subset_key, 1, pool_key)
|
||||
# (gengchc2) remove pool from flavor_old.pools subset
|
||||
if flavor_old is not None:
|
||||
old_subset_key = utils.pools_subset_key(flavor_old)
|
||||
pipe.zrem(old_subset_key, pool_key)
|
||||
pipe.hmset(pool_key, fields)
|
||||
pipe.execute()
|
||||
|
||||
@utils.raises_conn_error
|
||||
@utils.retries_on_connection_error
|
||||
def _delete(self, name):
|
||||
try:
|
||||
pool = self.get(name)
|
||||
flavor = pool.get("flavor", None)
|
||||
# NOTE(gengchc2): If this is the only pool in the
|
||||
# flavor and it's being used by a flavor, don't allow
|
||||
# it to be deleted.
|
||||
if flavor is not None:
|
||||
flavor1 = {}
|
||||
flavor1['name'] = flavor
|
||||
pools_in_flavor = list(self.get_pools_by_flavor(
|
||||
flavor=flavor1))
|
||||
if self.flavor_ctl.exists(flavor)\
|
||||
and len(pools_in_flavor) == 1:
|
||||
raise errors.PoolInUseByFlavor(name, flavor)
|
||||
|
||||
pool_key = utils.pools_name_hash_key(name)
|
||||
subset_key = utils.pools_subset_key(flavor)
|
||||
set_key = utils.pools_set_key()
|
||||
with self._client.pipeline() as pipe:
|
||||
if flavor is not None:
|
||||
pipe.zrem(subset_key, pool_key)
|
||||
pipe.zrem(set_key, pool_key)
|
||||
pipe.delete(pool_key)
|
||||
pipe.execute()
|
||||
except errors.PoolDoesNotExist:
|
||||
pass
|
||||
|
||||
@utils.raises_conn_error
|
||||
@utils.retries_on_connection_error
|
||||
def _drop_all(self):
|
||||
poolsobj_key = self._client.keys(pattern='*pools')
|
||||
if len(poolsobj_key) == 0:
|
||||
return
|
||||
with self._client.pipeline() as pipe:
|
||||
for key in poolsobj_key:
|
||||
pipe.delete(key)
|
||||
try:
|
||||
pipe.execute()
|
||||
except redis.exceptions.ResponseError:
|
||||
return False
|
||||
|
||||
def _normalize(self, pool, detailed=False):
|
||||
ret = {
|
||||
'name': pool['pl'],
|
||||
'uri': pool['u'],
|
||||
'weight': int(pool['w']),
|
||||
'flavor': pool['f']
|
||||
}
|
||||
if detailed:
|
||||
ret['options'] = self._unpacker(pool['o'])
|
||||
|
||||
return ret
|
@ -28,6 +28,8 @@ from zaqar.storage import errors
|
||||
LOG = logging.getLogger(__name__)
|
||||
MESSAGE_IDS_SUFFIX = 'messages'
|
||||
SUBSCRIPTION_IDS_SUFFIX = 'subscriptions'
|
||||
FLAVORS_IDS_SUFFIX = 'flavors'
|
||||
POOLS_IDS_SUFFIX = 'pools'
|
||||
|
||||
|
||||
def descope_queue_name(scoped_name):
|
||||
@ -281,3 +283,145 @@ class SubscriptionListCursor(object):
|
||||
|
||||
def __next__(self):
|
||||
return self.next()
|
||||
|
||||
|
||||
def scope_flavors_ids_set(flavors_suffix=''):
|
||||
"""Scope flavors set with '.'
|
||||
|
||||
Returns a scoped name for the list of flavors in the form
|
||||
suffix
|
||||
"""
|
||||
|
||||
return flavors_suffix
|
||||
|
||||
|
||||
def scope_project_flavors_ids_set(project=None,
|
||||
flavors_suffix=''):
|
||||
"""Scope flavors set with '.'
|
||||
|
||||
Returns a scoped name for the list of flavors in the form
|
||||
project-id_suffix
|
||||
"""
|
||||
|
||||
return (normalize_none_str(project) + '.' + flavors_suffix)
|
||||
|
||||
|
||||
def scope_name_flavors_ids_set(name=None,
|
||||
flavors_suffix=''):
|
||||
"""Scope flavors set with '.'
|
||||
|
||||
Returns a scoped name for the list of flavors in the form
|
||||
flavors_name_suffix
|
||||
"""
|
||||
|
||||
return (normalize_none_str(name) + '.' + flavors_suffix)
|
||||
|
||||
|
||||
def flavor_set_key():
|
||||
return scope_flavors_ids_set(FLAVORS_IDS_SUFFIX)
|
||||
|
||||
|
||||
def flavor_project_subset_key(project=None):
|
||||
return scope_project_flavors_ids_set(project,
|
||||
FLAVORS_IDS_SUFFIX)
|
||||
|
||||
|
||||
def flavor_name_hash_key(name=None):
|
||||
return scope_name_flavors_ids_set(name,
|
||||
FLAVORS_IDS_SUFFIX)
|
||||
|
||||
|
||||
class FlavorListCursor(object):
|
||||
|
||||
def __init__(self, client, flavors, denormalizer):
|
||||
self.flavor_iter = flavors
|
||||
self.denormalizer = denormalizer
|
||||
self.client = client
|
||||
|
||||
def __iter__(self):
|
||||
return self
|
||||
|
||||
@raises_conn_error
|
||||
def next(self):
|
||||
curr = next(self.flavor_iter)
|
||||
flavor = self.client.hmget(curr, ['f', 'p', 'c'])
|
||||
flavor_dict = {}
|
||||
flavor_dict['f'] = flavor[0]
|
||||
flavor_dict['p'] = flavor[1]
|
||||
flavor_dict['c'] = flavor[2]
|
||||
return self.denormalizer(flavor_dict)
|
||||
|
||||
def __next__(self):
|
||||
return self.next()
|
||||
|
||||
|
||||
def scope_pools_ids_set(pools_suffix=''):
|
||||
"""Scope pools set with '.'
|
||||
|
||||
Returns a scoped name for the list of pools in the form
|
||||
suffix
|
||||
"""
|
||||
|
||||
return pools_suffix
|
||||
|
||||
|
||||
def scope_flavor_pools_ids_set(flavor=None,
|
||||
pools_suffix=''):
|
||||
"""Scope pools set with '.'
|
||||
|
||||
Returns a scoped name for the list of pools in the form
|
||||
project-id_suffix
|
||||
"""
|
||||
return (normalize_none_str(flavor) + '.' +
|
||||
pools_suffix)
|
||||
|
||||
|
||||
def scope_name_pools_ids_set(name=None,
|
||||
pools_suffix=''):
|
||||
"""Scope pools set with '.'
|
||||
|
||||
Returns a scoped name for the list of pools in the form
|
||||
pools_name_suffix
|
||||
"""
|
||||
return (normalize_none_str(name) + '.' +
|
||||
pools_suffix)
|
||||
|
||||
|
||||
def pools_set_key():
|
||||
return scope_pools_ids_set(POOLS_IDS_SUFFIX)
|
||||
|
||||
|
||||
def pools_subset_key(flavor=None):
|
||||
return scope_flavor_pools_ids_set(flavor,
|
||||
POOLS_IDS_SUFFIX)
|
||||
|
||||
|
||||
def pools_name_hash_key(name=None):
|
||||
return scope_name_pools_ids_set(name,
|
||||
POOLS_IDS_SUFFIX)
|
||||
|
||||
|
||||
class PoolsListCursor(object):
|
||||
|
||||
def __init__(self, client, pools, denormalizer):
|
||||
self.pools_iter = pools
|
||||
self.denormalizer = denormalizer
|
||||
self.client = client
|
||||
|
||||
def __iter__(self):
|
||||
return self
|
||||
|
||||
@raises_conn_error
|
||||
def next(self):
|
||||
curr = next(self.pools_iter)
|
||||
pools = self.client.hmget(curr, ['pl', 'u', 'w', 'f', 'o'])
|
||||
pool_dict = {}
|
||||
pool_dict['pl'] = pools[0]
|
||||
pool_dict['u'] = pools[1]
|
||||
pool_dict['w'] = pools[2]
|
||||
pool_dict['f'] = pools[3]
|
||||
pool_dict['o'] = pools[4]
|
||||
return self.denormalizer(pool_dict)
|
||||
|
||||
def __next__(self):
|
||||
return self.next()
|
||||
|
@ -24,7 +24,7 @@ import redis
|
||||
from zaqar.common import cache as oslo_cache
|
||||
from zaqar.common import errors
|
||||
from zaqar import storage
|
||||
from zaqar.storage import mongodb
|
||||
from zaqar.storage import pooling
|
||||
from zaqar.storage.redis import controllers
|
||||
from zaqar.storage.redis import driver
|
||||
from zaqar.storage.redis import messages
|
||||
@ -313,7 +313,7 @@ class RedisQueuesTest(base.QueueControllerTest):
|
||||
driver_class = driver.DataDriver
|
||||
config_file = 'wsgi_redis.conf'
|
||||
controller_class = controllers.QueueController
|
||||
control_driver_class = mongodb.ControlDriver
|
||||
control_driver_class = driver.ControlDriver
|
||||
|
||||
def setUp(self):
|
||||
super(RedisQueuesTest, self).setUp()
|
||||
@ -330,7 +330,7 @@ class RedisMessagesTest(base.MessageControllerTest):
|
||||
driver_class = driver.DataDriver
|
||||
config_file = 'wsgi_redis.conf'
|
||||
controller_class = controllers.MessageController
|
||||
control_driver_class = mongodb.ControlDriver
|
||||
control_driver_class = driver.ControlDriver
|
||||
gc_interval = 1
|
||||
|
||||
def setUp(self):
|
||||
@ -397,7 +397,7 @@ class RedisClaimsTest(base.ClaimControllerTest):
|
||||
driver_class = driver.DataDriver
|
||||
config_file = 'wsgi_redis.conf'
|
||||
controller_class = controllers.ClaimController
|
||||
control_driver_class = mongodb.ControlDriver
|
||||
control_driver_class = driver.ControlDriver
|
||||
|
||||
def setUp(self):
|
||||
super(RedisClaimsTest, self).setUp()
|
||||
@ -498,3 +498,104 @@ class RedisSubscriptionTests(base.SubscriptionControllerTest):
|
||||
config_file = 'wsgi_redis.conf'
|
||||
controller_class = controllers.SubscriptionController
|
||||
control_driver_class = driver.ControlDriver
|
||||
|
||||
|
||||
@testing.requires_redis
|
||||
class RedisPoolsTests(base.PoolsControllerTest):
|
||||
config_file = 'wsgi_redis.conf'
|
||||
driver_class = driver.ControlDriver
|
||||
controller_class = controllers.PoolsController
|
||||
control_driver_class = driver.ControlDriver
|
||||
|
||||
def setUp(self):
|
||||
super(RedisPoolsTests, self).setUp()
|
||||
self.pools_controller = self.driver.pools_controller
|
||||
# Let's create one pool
|
||||
self.pool = str(uuid.uuid1())
|
||||
self.pools_controller.create(self.pool, 100, 'localhost', options={})
|
||||
self.pool1 = str(uuid.uuid1())
|
||||
self.flavor = str(uuid.uuid1())
|
||||
self.flavors_controller.create(self.flavor,
|
||||
project=self.project,
|
||||
capabilities={})
|
||||
self.pools_controller.create(self.pool1, 100, 'localhost1',
|
||||
flavor=self.flavor, options={})
|
||||
self.flavors_controller = self.driver.flavors_controller
|
||||
|
||||
def tearDown(self):
|
||||
self.pools_controller.drop_all()
|
||||
super(RedisPoolsTests, self).tearDown()
|
||||
|
||||
def test_delete_pool_used_by_flavor(self):
|
||||
with testing.expect(storage.errors.PoolInUseByFlavor):
|
||||
self.pools_controller.delete(self.pool1)
|
||||
|
||||
def test_mismatching_capabilities_fifo(self):
|
||||
# NOTE(gengchc2): The fifo function is not implemented
|
||||
# in redis, we skip it.
|
||||
self.skip("The fifo function is not implemented")
|
||||
|
||||
def test_mismatching_capabilities1(self):
|
||||
# NOTE(gengchc2): This test is used for testing mismatchming
|
||||
# capabilities in pool with flavor
|
||||
with testing.expect(storage.errors.PoolCapabilitiesMismatch):
|
||||
self.pools_controller.create(str(uuid.uuid1()),
|
||||
100, 'mongodb://localhost',
|
||||
flavor=self.flavor,
|
||||
options={})
|
||||
|
||||
|
||||
@testing.requires_redis
|
||||
class RedisCatalogueTests(base.CatalogueControllerTest):
|
||||
driver_class = driver.ControlDriver
|
||||
controller_class = controllers.CatalogueController
|
||||
control_driver_class = driver.ControlDriver
|
||||
config_file = 'wsgi_redis.conf'
|
||||
|
||||
def setUp(self):
|
||||
super(RedisCatalogueTests, self).setUp()
|
||||
self.addCleanup(self.controller.drop_all)
|
||||
|
||||
|
||||
@testing.requires_redis
|
||||
class PooledMessageTests(base.MessageControllerTest):
|
||||
config_file = 'wsgi_redis_pooled.conf'
|
||||
controller_class = pooling.MessageController
|
||||
driver_class = pooling.DataDriver
|
||||
control_driver_class = driver.ControlDriver
|
||||
controller_base_class = storage.Message
|
||||
|
||||
# NOTE(kgriffs): Redis's TTL scavenger only runs once a minute
|
||||
gc_interval = 60
|
||||
|
||||
|
||||
@testing.requires_redis
|
||||
class PooledClaimsTests(base.ClaimControllerTest):
|
||||
config_file = 'wsgi_redis_pooled.conf'
|
||||
controller_class = pooling.ClaimController
|
||||
driver_class = pooling.DataDriver
|
||||
control_driver_class = driver.ControlDriver
|
||||
controller_base_class = storage.Claim
|
||||
|
||||
def setUp(self):
|
||||
super(PooledClaimsTests, self).setUp()
|
||||
self.connection = self.controller._pool_catalog.lookup(
|
||||
self.queue_name, self.project)._storage.\
|
||||
claim_controller.driver.connection
|
||||
|
||||
def tearDown(self):
|
||||
super(PooledClaimsTests, self).tearDown()
|
||||
self.connection.flushdb()
|
||||
|
||||
|
||||
# NOTE(gengchc2): Unittest for new flavor configure scenario.
|
||||
@testing.requires_redis
|
||||
class RedisFlavorsTest1(base.FlavorsControllerTest1):
|
||||
driver_class = driver.ControlDriver
|
||||
controller_class = controllers.FlavorsController
|
||||
control_driver_class = driver.ControlDriver
|
||||
config_file = 'wsgi_redis.conf'
|
||||
|
||||
def setUp(self):
|
||||
super(RedisFlavorsTest1, self).setUp()
|
||||
self.addCleanup(self.controller.drop_all)
|
||||
|
Loading…
x
Reference in New Issue
Block a user