Dima Kuznetsov 56173f841e Linting
2017-04-18 16:14:06 +03:00

241 lines
9.3 KiB
Python

# Copyright 2016 Huawei corp.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from neutron.common import rpc as n_rpc
from neutron.db import agents_db
from neutron_lib import exceptions
from neutron_lib.plugins import directory
from oslo_config import cfg
from oslo_log import log as logging
import oslo_messaging as messaging
from oslo_utils import importutils
from wan_qos.common import api
from wan_qos.common import constants
from wan_qos.common import topics
from wan_qos.db import wan_qos_db
from wan_qos.extensions import wantc
from wan_qos.extensions import wantcclass
from wan_qos.extensions import wantcdevice
from wan_qos.extensions import wantcfilter
LOG = logging.getLogger(__name__)
class PluginRpcCallback(object):
target = messaging.Target(version='1.0')
def __init__(self, plugin):
super(PluginRpcCallback, self).__init__()
self.plugin = plugin
LOG.debug('rpc callback started.')
def agent_up_notification(self, context, host_info):
LOG.debug('got up notification from %s', host_info['host'])
self.plugin.db.agent_up_notification(context, host_info)
def device_heartbeat(self, context, host):
self.plugin.db.device_heartbeat(context, host)
def get_configuration_from_db(self, context, host):
conf = {
'class_tree': self.plugin.db.get_class_tree(),
'filters': self.plugin.db.get_wan_tc_filters(context)
}
return conf
def get_class_by_id(self, context, id):
return self.plugin.db.get_class_by_id(context, id)
class WanQosPlugin(wantcfilter.WanTcFilterPluginBase,
wantcdevice.WanTcDevicePluginBase,
wantcclass.WanTcClassPluginBase,
wantc.WanTcPluginBase):
supported_extension_aliases = ['wan-tc-filter', 'wan-tc-device',
'wan-tc-class', 'wan-tc']
def __init__(self):
self.db = wan_qos_db.WanTcDb()
rpc_callback = importutils.import_object(
'wan_qos.services.plugin.PluginRpcCallback', self)
endpoints = (
[rpc_callback, agents_db.AgentExtRpcCallback()])
self.agent_rpc = api.TcAgentApi(cfg.CONF.host)
self.conn = n_rpc.create_connection()
self.conn.create_consumer(topics.TC_PLUGIN,
endpoints,
fanout=False)
self.conn.consume_in_threads()
def get_plugin_type(self):
"""Get type of the plugin."""
return constants.WANTC
def get_plugin_description(self):
"""Get description of the plugin."""
return 'Plugin for rate limiting on WAN links.'
@property
def _core_plugin(self):
return directory.get_plugin()
def delete_wan_tc_device(self, context, id):
self.db.delete_wan_tc_device(context, id)
def get_wan_tc_device(self, context, id, fields=None):
return self.db.get_device(context, id)
def get_wan_tc_devices(self, context, filters=None, fields=None,
sorts=None, limit=None, marker=None,
page_reverse=False):
return self.db.get_all_devices(context, filters, fields, sorts, limit,
marker, page_reverse)
def get_wan_tc_class(self, context, id, fields=None):
return self.db.get_class_by_id(context, id)
def update_wan_tc_class(self, context, id, wan_tc_class):
pass
def create_wan_tc_class(self, context, wan_tc_class):
LOG.debug('got new class request: %s', wan_tc_class)
wtc_class_db = self.db.create_wan_tc_class(context,
wan_tc_class[
'wan_tc_class'])
self.agent_rpc.create_wtc_class(context, wtc_class_db)
return wtc_class_db
def delete_wan_tc_class(self, context, id):
LOG.debug('Got request to delete class id: %s', id)
class_tree = self.db.get_class_tree(id)
self.db.delete_wtc_class(context, id)
self.agent_rpc.delete_wtc_class(context, class_tree)
def get_wan_tc_classs(self, context, filters=None, fields=None, sorts=None,
limit=None, marker=None, page_reverse=False):
return self.db.get_all_classes(context, filters, fields, sorts, limit,
marker, page_reverse)
def delete_wan_tc_filter(self, context, id):
wtc_filter = self.get_wan_tc_filter(context, id)
if wtc_filter:
wtc_class = self.get_wan_tc_class(context, wtc_filter['class_id'])
self.db.delete_wan_tc_filter(context, id)
wtc_filter['class'] = wtc_class
self.agent_rpc.delete_wtc_filter(context, wtc_filter)
def get_wan_tc_filters(self, context, filters=None, fields=None,
sorts=None, limit=None, marker=None,
page_reverse=False):
return self.db.get_wan_tc_filters(context, filters, fields, sorts,
limit,
marker, page_reverse)
def create_wan_tc_filter(self, context, wan_tc_filter):
wtc_filter = self.db.create_wan_tc_filter(context,
wan_tc_filter[
'wan_tc_filter'])
# wtc_class = self.get_wan_tc_class(context, wtc_filter['class_id'])
# wtc_filter['class'] = wtc_class
self.agent_rpc.create_wtc_filter(context, wtc_filter)
return wtc_filter
def update_wan_tc_filter(self, context, id, wan_tc_filter):
pass
def get_wan_tc_filter(self, context, id, fields=None):
return self.db.get_wan_tc_filter(context, id, fields)
@staticmethod
def _get_tenant_id_for_create(self, context, resource):
"""Get tenant id for creation of resources."""
if context.is_admin and 'tenant_id' in resource:
tenant_id = resource['tenant_id']
elif (
'tenant_id' in resource and
resource['tenant_id'] != context.tenant_id
):
reason = 'Cannot create resource for another tenant'
raise exceptions.AdminRequired(reason=reason)
else:
tenant_id = context.tenant_id
return tenant_id
def get_wan_tc(self, context, id, fields=None):
filter_db = self.get_wan_tc_filter(context, id, fields)
class_db = self.get_wan_tc_class(context, filter_db['class_id'])
filter_db['min'] = class_db['min']
filter_db['max'] = class_db['max']
return filter_db
def get_wan_tcs(self, context, filters=None, fields=None, sorts=None,
limit=None, marker=None, page_reverse=False):
filters = self.get_wan_tc_filters(context, filters, fields, sorts,
limit, marker, page_reverse)
for filter_db in filters:
class_db = self.get_wan_tc_class(context, filter_db['class_id'])
filter_db['min'] = class_db['min']
filter_db['max'] = class_db['max']
return filters
def create_wan_tc(self, context, wan_tc):
LOG.debug('got WAN_TC: %s', wan_tc)
wan_tc_req = wan_tc['wan_tc']
filter_db = self.get_wan_tc_filters(context, filters={
'network': [wan_tc_req['network']]})
if filter_db:
raise exceptions.InvalidInput(
error_message='Network already has limiter')
network = self._core_plugin.get_network(context, wan_tc_req['network'])
if network['provider:network_type'] != 'vxlan':
raise exceptions.InvalidInput()
vni = network['provider:segmentation_id']
tc_class = {'wan_tc_class': {
'direction': 'both',
'min': wan_tc_req['min']
}
}
if 'max' in wan_tc_req:
tc_class['wan_tc_class']['max'] = wan_tc_req['max']
tc_class_db = self.create_wan_tc_class(context, tc_class)
tc_filter_req = {'wan_tc_filter': {
'protocol': 'vxlan',
'match': 'vni=%s' % vni,
'class_id': tc_class_db['id'],
'network': network['id']
}
}
tc_filter_db = self.create_wan_tc_filter(context, tc_filter_req)
tc_filter_db['min'] = tc_class_db['min']
tc_filter_db['max'] = tc_class_db['max']
return tc_filter_db
def update_wan_tc(self, context, id, wan_tc):
raise exceptions.BadRequest(msg='Not implemented yet!')
def delete_wan_tc(self, context, id):
LOG.debug('Deleting TC: %s', id)
tc_filter = self.get_wan_tc_filter(context, id)
class_id = tc_filter['class_id']
self.delete_wan_tc_filter(context, id)
self.delete_wan_tc_class(context, class_id)