Split bus adapter into volume and instance bus adapter (#23)

This commit is contained in:
Frédéric Guillot 2016-06-10 15:05:43 -04:00 committed by Sebastien Delisle
parent 40554b98e3
commit 077daf622f
6 changed files with 350 additions and 249 deletions

View File

@ -18,6 +18,8 @@ import kombu
from kombu.mixins import ConsumerMixin
from almanach import config
from almanach.adapters.instance_bus_adapter import InstanceBusAdapter
from almanach.adapters.volume_bus_adapter import VolumeBusAdapter
class BusAdapter(ConsumerMixin):
@ -42,32 +44,9 @@ class BusAdapter(ConsumerMixin):
notification = json.loads(notification)
event_type = notification.get("event_type")
logging.info(event_type)
if event_type == "compute.instance.create.end":
self._instance_created(notification)
elif event_type == "compute.instance.delete.end":
self._instance_deleted(notification)
elif event_type == "compute.instance.resize.confirm.end":
self._instance_resized(notification)
elif event_type == "compute.instance.rebuild.end":
self._instance_rebuilt(notification)
elif event_type == "volume.create.end":
self._volume_created(notification)
elif event_type == "volume.delete.end":
self._volume_deleted(notification)
elif event_type == "volume.resize.end":
self._volume_resized(notification)
elif event_type == "volume.attach.end":
self._volume_attached(notification)
elif event_type == "volume.detach.end":
self._volume_detached(notification)
elif event_type == "volume.update.end":
self._volume_renamed(notification)
elif event_type == "volume.exists":
self._volume_renamed(notification)
elif event_type == "volume_type.create":
self._volume_type_create(notification)
logging.info("Received event: '{0}'".format(event_type))
InstanceBusAdapter(self.controller).handle_events(event_type, notification)
VolumeBusAdapter(self.controller).handle_events(event_type, notification)
def get_consumers(self, consumer, channel):
queue = kombu.Queue(config.rabbitmq_queue(), routing_key=config.rabbitmq_routing_key())
@ -81,108 +60,3 @@ class BusAdapter(ConsumerMixin):
super(BusAdapter, self).run(_tokens)
except KeyboardInterrupt:
pass
def _instance_created(self, notification):
payload = notification.get("payload")
project_id = payload.get("tenant_id")
date = payload.get("created_at")
instance_id = payload.get("instance_id")
flavor = payload.get("instance_type")
os_type = payload.get("image_meta").get("os_type")
distro = payload.get("image_meta").get("distro")
version = payload.get("image_meta").get("version")
name = payload.get("hostname")
metadata = payload.get("metadata")
if isinstance(metadata, list):
metadata = {}
self.controller.create_instance(
instance_id,
project_id,
date,
flavor,
os_type,
distro,
version,
name,
metadata
)
def _instance_deleted(self, notification):
payload = notification.get("payload")
date = payload.get("terminated_at")
instance_id = payload.get("instance_id")
self.controller.delete_instance(instance_id, date)
def _instance_resized(self, notification):
payload = notification.get("payload")
date = notification.get("timestamp")
flavor = payload.get("instance_type")
instance_id = payload.get("instance_id")
self.controller.resize_instance(instance_id, flavor, date)
def _volume_created(self, notification):
payload = notification.get("payload")
date = payload.get("created_at")
project_id = payload.get("tenant_id")
volume_id = payload.get("volume_id")
volume_name = payload.get("display_name")
volume_type = payload.get("volume_type")
volume_size = payload.get("size")
self.controller.create_volume(volume_id, project_id, date, volume_type, volume_size, volume_name)
def _volume_deleted(self, notification):
payload = notification.get("payload")
volume_id = payload.get("volume_id")
end_date = notification.get("timestamp")
self.controller.delete_volume(volume_id, end_date)
def _volume_renamed(self, notification):
payload = notification.get("payload")
volume_id = payload.get("volume_id")
volume_name = payload.get("display_name")
self.controller.rename_volume(volume_id, volume_name)
def _volume_resized(self, notification):
payload = notification.get("payload")
date = notification.get("timestamp")
volume_id = payload.get("volume_id")
volume_size = payload.get("size")
self.controller.resize_volume(volume_id, volume_size, date)
def _volume_attached(self, notification):
payload = notification.get("payload")
volume_id = payload.get("volume_id")
event_date = notification.get("timestamp")
self.controller.attach_volume(volume_id, event_date, self._get_attached_instances(payload))
def _volume_detached(self, notification):
payload = notification.get("payload")
volume_id = payload.get("volume_id")
event_date = notification.get("timestamp")
self.controller.detach_volume(volume_id, event_date, self._get_attached_instances(payload))
@staticmethod
def _get_attached_instances(payload):
instances_ids = []
if "volume_attachment" in payload:
for instance in payload["volume_attachment"]:
instances_ids.append(instance.get("instance_uuid"))
elif payload.get("instance_uuid") is not None:
instances_ids.append(payload.get("instance_uuid"))
return instances_ids
def _instance_rebuilt(self, notification):
payload = notification.get("payload")
date = notification.get("timestamp")
instance_id = payload.get("instance_id")
distro = payload.get("image_meta").get("distro")
version = payload.get("image_meta").get("version")
os_type = payload.get("image_meta").get("os_type")
self.controller.rebuild_instance(instance_id, distro, version, os_type, date)
def _volume_type_create(self, notification):
volume_types = notification.get("payload").get("volume_types")
volume_type_id = volume_types.get("id")
volume_type_name = volume_types.get("name")
self.controller.create_volume_type(volume_type_id, volume_type_name)

View File

@ -0,0 +1,69 @@
# Copyright 2016 Internap.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
class InstanceBusAdapter(object):
def __init__(self, controller):
self.controller = controller
def handle_events(self, event_type, notification):
if event_type == "compute.instance.create.end":
self.on_instance_created(notification)
elif event_type == "compute.instance.delete.end":
self.on_instance_deleted(notification)
elif event_type == "compute.instance.resize.confirm.end":
self.on_instance_resized(notification)
elif event_type == "compute.instance.rebuild.end":
self.on_instance_rebuilt(notification)
def on_instance_created(self, notification):
payload = notification.get("payload")
metadata = payload.get("metadata")
if isinstance(metadata, list):
metadata = {}
self.controller.create_instance(
payload.get("instance_id"),
payload.get("tenant_id"),
payload.get("created_at"),
payload.get("instance_type"),
payload.get("image_meta").get("os_type"),
payload.get("image_meta").get("distro"),
payload.get("image_meta").get("version"),
payload.get("hostname"),
metadata
)
def on_instance_deleted(self, notification):
payload = notification.get("payload")
date = payload.get("terminated_at")
instance_id = payload.get("instance_id")
self.controller.delete_instance(instance_id, date)
def on_instance_resized(self, notification):
payload = notification.get("payload")
date = notification.get("timestamp")
flavor = payload.get("instance_type")
instance_id = payload.get("instance_id")
self.controller.resize_instance(instance_id, flavor, date)
def on_instance_rebuilt(self, notification):
payload = notification.get("payload")
date = notification.get("timestamp")
instance_id = payload.get("instance_id")
distro = payload.get("image_meta").get("distro")
version = payload.get("image_meta").get("version")
os_type = payload.get("image_meta").get("os_type")
self.controller.rebuild_instance(instance_id, distro, version, os_type, date)

View File

@ -0,0 +1,93 @@
# Copyright 2016 Internap.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
class VolumeBusAdapter(object):
def __init__(self, controller):
self.controller = controller
def handle_events(self, event_type, notification):
if event_type == "volume.create.end":
self.on_volume_created(notification)
elif event_type == "volume.delete.end":
self.on_volume_deleted(notification)
elif event_type == "volume.resize.end":
self.on_volume_resized(notification)
elif event_type == "volume.attach.end":
self.on_volume_attached(notification)
elif event_type == "volume.detach.end":
self.on_volume_detached(notification)
elif event_type == "volume.update.end":
self.on_volume_renamed(notification)
elif event_type == "volume.exists":
self.on_volume_renamed(notification)
elif event_type == "volume_type.create":
self.on_volume_type_create(notification)
def on_volume_created(self, notification):
payload = notification.get("payload")
date = payload.get("created_at")
project_id = payload.get("tenant_id")
volume_id = payload.get("volume_id")
volume_name = payload.get("display_name")
volume_type = payload.get("volume_type")
volume_size = payload.get("size")
self.controller.create_volume(volume_id, project_id, date, volume_type, volume_size, volume_name)
def on_volume_deleted(self, notification):
payload = notification.get("payload")
volume_id = payload.get("volume_id")
end_date = notification.get("timestamp")
self.controller.delete_volume(volume_id, end_date)
def on_volume_renamed(self, notification):
payload = notification.get("payload")
volume_id = payload.get("volume_id")
volume_name = payload.get("display_name")
self.controller.rename_volume(volume_id, volume_name)
def on_volume_resized(self, notification):
payload = notification.get("payload")
date = notification.get("timestamp")
volume_id = payload.get("volume_id")
volume_size = payload.get("size")
self.controller.resize_volume(volume_id, volume_size, date)
def on_volume_attached(self, notification):
payload = notification.get("payload")
volume_id = payload.get("volume_id")
event_date = notification.get("timestamp")
self.controller.attach_volume(volume_id, event_date, self._get_attached_instances(payload))
def on_volume_detached(self, notification):
payload = notification.get("payload")
volume_id = payload.get("volume_id")
event_date = notification.get("timestamp")
self.controller.detach_volume(volume_id, event_date, self._get_attached_instances(payload))
def on_volume_type_create(self, notification):
volume_types = notification.get("payload").get("volume_types")
volume_type_id = volume_types.get("id")
volume_type_name = volume_types.get("name")
self.controller.create_volume_type(volume_type_id, volume_type_name)
@staticmethod
def _get_attached_instances(payload):
instances_ids = []
if "volume_attachment" in payload:
for instance in payload["volume_attachment"]:
instances_ids.append(instance.get("instance_uuid"))
elif payload.get("instance_uuid") is not None:
instances_ids.append(payload.get("instance_uuid"))
return instances_ids

View File

@ -170,13 +170,6 @@ class BusAdapterTest(unittest.TestCase):
self.bus_adapter.on_message(notification, message)
def test_rebuild(self):
notification = messages.get_instance_rebuild_end_sample()
self.controller \
.should_receive("rebuild_instance") \
.once()
self.bus_adapter._instance_rebuilt(notification)
def test_on_message_with_volume(self):
volume_id = "vol_id"
tenant_id = "tenant_id"
@ -237,101 +230,6 @@ class BusAdapterTest(unittest.TestCase):
self.bus_adapter.on_message(notification, message)
def test_deleted_volume(self):
notification = messages.get_volume_delete_end_sample()
self.controller.should_receive('delete_volume').once()
self.bus_adapter._volume_deleted(notification)
def test_resize_volume(self):
notification = messages.get_volume_update_end_sample()
self.controller.should_receive('resize_volume').once()
self.bus_adapter._volume_resized(notification)
def test_deleted_instance(self):
notification = messages.get_instance_delete_end_sample()
self.controller.should_receive('delete_instance').once()
self.bus_adapter._instance_deleted(notification)
def test_instance_resized(self):
notification = messages.get_instance_rebuild_end_sample()
self.controller.should_receive('resize_instance').once()
self.bus_adapter._instance_resized(notification)
def test_updated_volume(self):
notification = messages.get_volume_update_end_sample()
self.controller.should_receive('resize_volume').once()
self.bus_adapter._volume_resized(notification)
def test_attach_volume_with_icehouse_payload(self):
notification = messages.get_volume_attach_icehouse_end_sample(
volume_id="my-volume-id",
creation_timestamp=datetime(2014, 2, 14, 17, 18, 35, tzinfo=pytz.utc), attached_to="my-instance-id"
)
self.controller \
.should_receive('attach_volume') \
.with_args("my-volume-id", "2014-02-14T17:18:36.000000Z", ["my-instance-id"]) \
.once()
self.bus_adapter._volume_attached(notification)
def test_attach_volume_with_kilo_payload(self):
notification = messages.get_volume_attach_kilo_end_sample(
volume_id="my-volume-id",
timestamp=datetime(2014, 2, 14, 17, 18, 35, tzinfo=pytz.utc),
attached_to=["I1"]
)
self.controller \
.should_receive('attach_volume') \
.with_args("my-volume-id", "2014-02-14T17:18:36.000000Z", ["I1"]) \
.once()
self.bus_adapter._volume_attached(notification)
def test_attach_volume_with_kilo_payload_and_empty_attachments(self):
notification = messages.get_volume_attach_kilo_end_sample(
volume_id="my-volume-id",
timestamp=datetime(2014, 2, 14, 17, 18, 35, tzinfo=pytz.utc),
attached_to=[]
)
self.controller \
.should_receive('attach_volume') \
.with_args("my-volume-id", "2014-02-14T17:18:36.000000Z", []) \
.once()
self.bus_adapter._volume_attached(notification)
def test_detached_volume(self):
notification = messages.get_volume_detach_end_sample()
(self.controller
.should_receive('detach_volume')
.once())
self.bus_adapter._volume_detached(notification)
def test_renamed_volume_with_volume_update_end(self):
notification = messages.get_volume_update_end_sample()
self.controller \
.should_receive('rename_volume') \
.once()
self.bus_adapter._volume_renamed(notification)
def test_renamed_volume_with_volume_exists(self):
notification = messages.get_volume_exists_sample()
self.controller.should_receive('rename_volume').once()
self.bus_adapter._volume_renamed(notification)
def test_failing_notification_get_retry(self):
notification = messages.get_instance_rebuild_end_sample()
message = flexmock()
@ -351,19 +249,3 @@ class BusAdapterTest(unittest.TestCase):
self.retry.should_receive('publish_to_dead_letter').with_args(message).once()
self.bus_adapter.on_message(notification, message)
def test_get_attached_instances(self):
self.assertEqual(["truc"], self.bus_adapter._get_attached_instances({"instance_uuid": "truc"}))
self.assertEqual([], self.bus_adapter._get_attached_instances({"instance_uuid": None}))
self.assertEqual([], self.bus_adapter._get_attached_instances({}))
self.assertEqual(
["a", "b"],
self.bus_adapter._get_attached_instances(
{"volume_attachment": [{"instance_uuid": "a"}, {"instance_uuid": "b"}]}
)
)
self.assertEqual(
["a"],
self.bus_adapter._get_attached_instances({"volume_attachment": [{"instance_uuid": "a"}]})
)
self.assertEqual([], self.bus_adapter._get_attached_instances({"volume_attachment": []}))

View File

@ -0,0 +1,50 @@
# Copyright 2016 Internap.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import unittest
from flexmock import flexmock, flexmock_teardown
from almanach.adapters.instance_bus_adapter import InstanceBusAdapter
from integration_tests.builders import messages
class InstanceBusAdapterTest(unittest.TestCase):
def setUp(self):
self.controller = flexmock()
self.retry = flexmock()
self.instance_bus_adapter = InstanceBusAdapter(self.controller)
def tearDown(self):
flexmock_teardown()
def test_deleted_instance(self):
notification = messages.get_instance_delete_end_sample()
self.controller.should_receive('delete_instance').once()
self.instance_bus_adapter.on_instance_deleted(notification)
def test_instance_resized(self):
notification = messages.get_instance_rebuild_end_sample()
self.controller.should_receive('resize_instance').once()
self.instance_bus_adapter.on_instance_resized(notification)
def test_instance_rebuild(self):
notification = messages.get_instance_rebuild_end_sample()
self.controller \
.should_receive("rebuild_instance") \
.once()
self.instance_bus_adapter.on_instance_rebuilt(notification)

View File

@ -0,0 +1,133 @@
# Copyright 2016 Internap.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import unittest
from datetime import datetime
import pytz
from flexmock import flexmock, flexmock_teardown
from almanach.adapters.volume_bus_adapter import VolumeBusAdapter
from integration_tests.builders import messages
class VolumeBusAdapterTest(unittest.TestCase):
def setUp(self):
self.controller = flexmock()
self.retry = flexmock()
self.volume_bus_adapter = VolumeBusAdapter(self.controller)
def tearDown(self):
flexmock_teardown()
def test_deleted_volume(self):
notification = messages.get_volume_delete_end_sample()
self.controller.should_receive('delete_volume').once()
self.volume_bus_adapter.on_volume_deleted(notification)
def test_resize_volume(self):
notification = messages.get_volume_update_end_sample()
self.controller.should_receive('resize_volume').once()
self.volume_bus_adapter.on_volume_resized(notification)
def test_updated_volume(self):
notification = messages.get_volume_update_end_sample()
self.controller.should_receive('resize_volume').once()
self.volume_bus_adapter.on_volume_resized(notification)
def test_attach_volume_with_kilo_payload_and_empty_attachments(self):
notification = messages.get_volume_attach_kilo_end_sample(
volume_id="my-volume-id",
timestamp=datetime(2014, 2, 14, 17, 18, 35, tzinfo=pytz.utc),
attached_to=[]
)
self.controller \
.should_receive('attach_volume') \
.with_args("my-volume-id", "2014-02-14T17:18:36.000000Z", []) \
.once()
self.volume_bus_adapter.on_volume_attached(notification)
def test_detached_volume(self):
notification = messages.get_volume_detach_end_sample()
(self.controller
.should_receive('detach_volume')
.once())
self.volume_bus_adapter.on_volume_detached(notification)
def test_renamed_volume_with_volume_update_end(self):
notification = messages.get_volume_update_end_sample()
self.controller \
.should_receive('rename_volume') \
.once()
self.volume_bus_adapter.on_volume_renamed(notification)
def test_renamed_volume_with_volume_exists(self):
notification = messages.get_volume_exists_sample()
self.controller.should_receive('rename_volume').once()
self.volume_bus_adapter.on_volume_renamed(notification)
def test_attach_volume_with_icehouse_payload(self):
notification = messages.get_volume_attach_icehouse_end_sample(
volume_id="my-volume-id",
creation_timestamp=datetime(2014, 2, 14, 17, 18, 35, tzinfo=pytz.utc), attached_to="my-instance-id"
)
self.controller \
.should_receive('attach_volume') \
.with_args("my-volume-id", "2014-02-14T17:18:36.000000Z", ["my-instance-id"]) \
.once()
self.volume_bus_adapter.on_volume_attached(notification)
def test_attach_volume_with_kilo_payload(self):
notification = messages.get_volume_attach_kilo_end_sample(
volume_id="my-volume-id",
timestamp=datetime(2014, 2, 14, 17, 18, 35, tzinfo=pytz.utc),
attached_to=["I1"]
)
self.controller \
.should_receive('attach_volume') \
.with_args("my-volume-id", "2014-02-14T17:18:36.000000Z", ["I1"]) \
.once()
self.volume_bus_adapter.on_volume_attached(notification)
def test_get_attached_instances(self):
self.assertEqual(["truc"], self.volume_bus_adapter._get_attached_instances({"instance_uuid": "truc"}))
self.assertEqual([], self.volume_bus_adapter._get_attached_instances({"instance_uuid": None}))
self.assertEqual([], self.volume_bus_adapter._get_attached_instances({}))
self.assertEqual(
["a", "b"],
self.volume_bus_adapter._get_attached_instances(
{"volume_attachment": [{"instance_uuid": "a"}, {"instance_uuid": "b"}]}
)
)
self.assertEqual(
["a"],
self.volume_bus_adapter._get_attached_instances({"volume_attachment": [{"instance_uuid": "a"}]})
)
self.assertEqual([], self.volume_bus_adapter._get_attached_instances({"volume_attachment": []}))