Add base driver structure (and a zookeeper driver)
This commit is contained in:
parent
fb6a482034
commit
ad652ca149
@ -0,0 +1,37 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from oslo_utils import netutils
|
||||
import stevedore.driver
|
||||
|
||||
ENGINE_NAMESPACE = "delimiter.engines"
|
||||
DEFAULT_KIND = "mysql"
|
||||
|
||||
|
||||
def create_engine(uri):
|
||||
"""Create a new ``Engine`` instance."""
|
||||
parsed_uri = netutils.urlsplit(uri, scheme=DEFAULT_KIND)
|
||||
kind = parsed_uri.scheme
|
||||
try:
|
||||
mgr = stevedore.driver.DriverManager(
|
||||
ENGINE_NAMESPACE, kind,
|
||||
invoke_on_load=True,
|
||||
invoke_args=[parsed_uri])
|
||||
engine = mgr.driver
|
||||
except RuntimeError:
|
||||
raise ValueError("Could not find"
|
||||
"engine '%s' (from uri '%s')" % (kind, uri))
|
||||
else:
|
||||
return engine
|
0
delimiter/drivers/__init__.py
Normal file
0
delimiter/drivers/__init__.py
Normal file
20
delimiter/drivers/sql.py
Normal file
20
delimiter/drivers/sql.py
Normal file
@ -0,0 +1,20 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from delimiter import engine
|
||||
|
||||
|
||||
class SqlQuotaEngine(engine.QuotaEngine):
|
||||
"""Engine based on sql primitives."""
|
133
delimiter/drivers/zookeeper.py
Normal file
133
delimiter/drivers/zookeeper.py
Normal file
@ -0,0 +1,133 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import json
|
||||
|
||||
from kazoo import client
|
||||
from kazoo import exceptions
|
||||
from kazoo.protocol import paths
|
||||
|
||||
from delimiter import engine
|
||||
|
||||
|
||||
class ZookeeperQuotaEngine(engine.QuotaEngine):
|
||||
"""Engine based on zookeeper primitives.
|
||||
|
||||
This engine uses zookeeper transcations, paths and values and versions
|
||||
identifiers to ensure a consistent backend storage of user quotas
|
||||
and limits on some set of resources.
|
||||
"""
|
||||
|
||||
def __init__(self, uri):
|
||||
super(ZookeeperQuotaEngine, self).__init__(uri)
|
||||
if not self.uri.path or self.uri.path == "/":
|
||||
raise ValueError("A non-empty url path is required")
|
||||
self.client = None
|
||||
|
||||
def start(self):
|
||||
self.client = client.KazooClient(hosts=self.url.netloc)
|
||||
self.client.start()
|
||||
self.client.ensure_path(self.url.path)
|
||||
|
||||
def read_limits(self, for_who):
|
||||
who_path = paths.join(self.url.path, for_who)
|
||||
try:
|
||||
child_nodes = self.client.get_children(who_path)
|
||||
except exceptions.NoNodeError:
|
||||
return []
|
||||
else:
|
||||
limits = []
|
||||
for resource in child_nodes:
|
||||
try:
|
||||
blob, _znode = self.client.get(paths.join(who_path,
|
||||
resource))
|
||||
except exceptions.NoNodeError:
|
||||
pass
|
||||
else:
|
||||
limits.append((resource, json.loads(blob)))
|
||||
return limits
|
||||
|
||||
def create_or_update_limits(self, for_who, resources, limits):
|
||||
who_path = paths.join(self.url.path, for_who)
|
||||
self.client.ensure_path(who_path)
|
||||
for resource, limit in zip(resources, limits):
|
||||
resource_path = paths.join(who_path, resource)
|
||||
try:
|
||||
self.client.create(resource_path, json.dumps(limit))
|
||||
except exceptions.NodeExistsError:
|
||||
blob, znode = self.client.get(resource_path)
|
||||
cur_limit = json.loads(blob)
|
||||
cur_limit.update(limit)
|
||||
# Ensure we pass in the version that we read this on so
|
||||
# that if it was changed by some other actor that we can
|
||||
# avoid overwriting that value (and retry, or handle in some
|
||||
# other manner).
|
||||
self.client.set(resource_path, json.dumps(cur_limit),
|
||||
version=znode.version)
|
||||
|
||||
def consume_many(self, for_who, resources, amounts):
|
||||
who_path = paths.join(self.url.path, for_who)
|
||||
values_to_save = []
|
||||
for resource, amount in zip(resources, amounts):
|
||||
resource_path = paths.join(who_path, resource)
|
||||
blob, znode = self.client.get(resource_path)
|
||||
cur_limit = json.loads(blob)
|
||||
try:
|
||||
cur_consumed = cur_limit['consumed']
|
||||
except KeyError:
|
||||
cur_consumed = 0
|
||||
max_resource = cur_limit['max']
|
||||
if cur_consumed + amount > max_resource:
|
||||
raise ValueError("Limit reached, can not"
|
||||
" consume %s of %s" % (resource, amount))
|
||||
else:
|
||||
cur_limit['consumed'] = cur_consumed + amount
|
||||
values_to_save.append((resource_path,
|
||||
json.dumps(cur_limit),
|
||||
znode.version))
|
||||
# Commit all changes at once, so that we can ensure that all the
|
||||
# changes will happen, or none will...
|
||||
if values_to_save:
|
||||
with self.client.transaction() as txn:
|
||||
for path, value, version in values_to_save:
|
||||
txn.set_data(path, value, version=version)
|
||||
|
||||
def consume(self, for_who, resource, amount):
|
||||
who_path = paths.join(self.url.path, for_who)
|
||||
resource_path = paths.join(who_path, resource)
|
||||
blob, znode = self.client.get(resource_path)
|
||||
cur_limit = json.loads(blob)
|
||||
try:
|
||||
cur_consumed = cur_limit['consumed']
|
||||
except KeyError:
|
||||
cur_consumed = 0
|
||||
max_resource = cur_limit['max']
|
||||
if cur_consumed + amount > max_resource:
|
||||
raise ValueError("Limit reached, can not"
|
||||
" consume %s of %s" % (resource, amount))
|
||||
else:
|
||||
cur_limit['consumed'] = cur_consumed + amount
|
||||
# Ensure we pass in the version that we read this on so
|
||||
# that if it was changed by some other actor that we can
|
||||
# avoid overwriting that value (and retry, or handle in some
|
||||
# other manner).
|
||||
self.client.set(resource_path, json.dumps(cur_limit),
|
||||
version=znode.version)
|
||||
|
||||
def close(self):
|
||||
if self.client is not None:
|
||||
self.client.stop()
|
||||
self.client.close()
|
||||
self.client = None
|
57
delimiter/engine.py
Normal file
57
delimiter/engine.py
Normal file
@ -0,0 +1,57 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import abc
|
||||
|
||||
import six
|
||||
|
||||
|
||||
@six.add_metaclass(abc.ABCMeta)
|
||||
class QuotaEngine(object):
|
||||
"""The abstraction that all quota engines derive from."""
|
||||
|
||||
def __init__(self, uri):
|
||||
self._uri = uri
|
||||
|
||||
def start(self):
|
||||
"""Performs any engine startup (connection setup, validation...)"""
|
||||
|
||||
def close(self):
|
||||
"""Performs engine teardown (connection closing...)"""
|
||||
|
||||
@abc.abstractmethod
|
||||
def read_limits(self, for_who):
|
||||
"""Reads the limits of some entity."""
|
||||
|
||||
@abc.abstractmethod
|
||||
def create_or_update_limits(self, for_who, resources, limits):
|
||||
"""Creates or updates a set of resource limits for some entity.
|
||||
|
||||
Must operate transactionally; either all created/updated or none.
|
||||
"""
|
||||
|
||||
@abc.abstractmethod
|
||||
def consume_many(self, for_who, resources, amounts):
|
||||
"""Consumes a given amount of resources for some entity.
|
||||
|
||||
Must operate transactionally; either all consumed or none.
|
||||
"""
|
||||
|
||||
@abc.abstractmethod
|
||||
def consume(self, for_who, resource, amount):
|
||||
"""Consumes a amount of resource for some entity.
|
||||
|
||||
Must operate transactionally; either consumed or not.
|
||||
"""
|
@ -3,3 +3,7 @@
|
||||
# process, which may cause wedges in the gate later.
|
||||
|
||||
pbr>=1.6 # Apache-2.0
|
||||
Babel>=1.3
|
||||
six>=1.9.0 # MIT
|
||||
stevedore>=1.10.0 # Apache-2.0
|
||||
oslo.utils>=3.5.0 # Apache-2.0
|
||||
|
@ -26,6 +26,11 @@ packages =
|
||||
[pbr]
|
||||
warnerrors = true
|
||||
|
||||
[entry_points]
|
||||
delimiter.engines =
|
||||
zookeeper = delimiter.drivers.zookeeper:ZookeeperQuotaEngine
|
||||
sql = delimiter.drivers.sql:SqlQuotaEngine
|
||||
|
||||
[build_sphinx]
|
||||
source-dir = doc/source
|
||||
build-dir = doc/build
|
||||
|
@ -8,3 +8,10 @@ oslotest>=1.5.1
|
||||
# These are needed for docs generation
|
||||
oslosphinx>=2.5.0
|
||||
sphinx!=1.2.0,!=1.3b1,<1.3,>=1.1.2
|
||||
|
||||
# Used for testing zookeeper & backends.
|
||||
zake>=0.1.6 # Apache-2.0
|
||||
kazoo>=2.2 # Apache-2.0
|
||||
|
||||
# Used for testing database persistence backends.
|
||||
PyMySQL>=0.6.2 # MIT License
|
||||
|
Loading…
x
Reference in New Issue
Block a user