From 2f9af244deb118931b68883331327c66d2596a97 Mon Sep 17 00:00:00 2001 From: Radoslav Gerganov Date: Fri, 1 Jul 2016 14:28:09 +0300 Subject: [PATCH] Refactor the image transfer The image transfer is unnecessary complicated and buggy. This patch replaces the complex usage of blocking queues and threads with a simple read+write loop. It has the same performance and the code is much cleaner. The NFC lease is updated with the loopingcall utility. Change-Id: I2b04173cd23c59162056360c03d419efbce77ba1 Closes-Bug: #1384840 --- oslo_vmware/image_transfer.py | 421 +++-------------------- oslo_vmware/tests/test_image_transfer.py | 304 +--------------- 2 files changed, 61 insertions(+), 664 deletions(-) diff --git a/oslo_vmware/image_transfer.py b/oslo_vmware/image_transfer.py index c8389e8..c06e0bb 100644 --- a/oslo_vmware/image_transfer.py +++ b/oslo_vmware/image_transfer.py @@ -17,16 +17,14 @@ Functions and classes for image transfer between ESX/VC & image service. """ -import errno import logging import tarfile -from eventlet import event -from eventlet import greenthread -from eventlet import queue from eventlet import timeout +from oslo_utils import units from oslo_vmware._i18n import _ +from oslo_vmware.common import loopingcall from oslo_vmware import constants from oslo_vmware import exceptions from oslo_vmware import image_util @@ -37,363 +35,40 @@ from oslo_vmware import vim_util LOG = logging.getLogger(__name__) -IMAGE_SERVICE_POLL_INTERVAL = 5 -FILE_READ_WRITE_TASK_SLEEP_TIME = 0.01 -BLOCKING_QUEUE_SIZE = 10 +NFC_LEASE_UPDATE_PERIOD = 60 # update NFC lease every 60sec. +CHUNK_SIZE = 64 * units.Ki # default chunk size for image transfer -class BlockingQueue(queue.LightQueue): - """Producer-Consumer queue to share data between reader/writer threads.""" - - def __init__(self, max_size, max_transfer_size): - """Initializes the queue with the given parameters. - - :param max_size: maximum queue size; if max_size is less than zero or - None, the queue size is infinite. - :param max_transfer_size: maximum amount of data that can be - _transferred using this queue - """ - queue.LightQueue.__init__(self, max_size) - self._max_transfer_size = max_transfer_size - self._transferred = 0 - - def read(self, chunk_size): - """Read data from the queue. - - This method blocks until data is available. The input chunk size is - ignored since we have ensured that the data chunks written to the pipe - by the image reader thread is the same as the chunks asked for by the - image writer thread. - """ - if (self._max_transfer_size is 0 or - self._transferred < self._max_transfer_size): - data_item = self.get() - self._transferred += len(data_item) - return data_item - else: - LOG.debug("Completed transfer of size %s.", self._transferred) - return "" - - def write(self, data): - """Write data into the queue. - - :param data: data to be written - """ - self.put(data) - - # Below methods are provided in order to enable treating the queue - # as a file handle. - - def seek(self, offset, whence=0): - """Set the file's current position at the offset. - - This method throws IOError since seek cannot be supported for a pipe. - """ - raise IOError(errno.ESPIPE, "Illegal seek") - - def tell(self): - """Get the current file position.""" - return self._transferred - - def close(self): - pass - - def __str__(self): - return "blocking queue" - - -class ImageWriter(object): - """Class to write the image to the image service from an input file.""" - - def __init__(self, context, input_file, image_service, image_id, - image_meta=None): - """Initializes the image writer instance with given parameters. - - :param context: write context needed by the image service - :param input_file: file to read the image data from - :param image_service: handle to image service - :param image_id: ID of the image in the image service - :param image_meta: image meta-data - """ - if not image_meta: - image_meta = {} - - self._context = context - self._input_file = input_file - self._image_service = image_service - self._image_id = image_id - self._image_meta = image_meta - self._running = False - - def start(self): - """Start the image write task. - - :returns: the event indicating the status of the write task - """ - self._done = event.Event() - - def _inner(): - """Task performing the image write operation. - - This method performs image data transfer through an update call. - After the update, it waits until the image state becomes - 'active', 'killed' or unknown. If the final state is not 'active' - an instance of ImageTransferException is thrown. - - :raises: ImageTransferException - """ - LOG.debug("Calling image service update on image: %(image)s " - "with meta: %(meta)s", - {'image': self._image_id, - 'meta': self._image_meta}) - - try: - self._image_service.update(self._context, - self._image_id, - self._image_meta, - data=self._input_file) - self._running = True - while self._running: - LOG.debug("Retrieving status of image: %s.", - self._image_id) - image_meta = self._image_service.show(self._context, - self._image_id) - image_status = image_meta.get('status') - if image_status == 'active': - self.stop() - LOG.debug("Image: %s is now active.", - self._image_id) - self._done.send(True) - elif image_status == 'killed': - self.stop() - excep_msg = (_("Image: %s is in killed state.") % - self._image_id) - LOG.error(excep_msg) - excep = exceptions.ImageTransferException(excep_msg) - self._done.send_exception(excep) - elif image_status in ['saving', 'queued']: - LOG.debug("Image: %(image)s is in %(state)s state; " - "sleeping for %(sleep)d seconds.", - {'image': self._image_id, - 'state': image_status, - 'sleep': IMAGE_SERVICE_POLL_INTERVAL}) - greenthread.sleep(IMAGE_SERVICE_POLL_INTERVAL) - else: - self.stop() - excep_msg = (_("Image: %(image)s is in unknown " - "state: %(state)s.") % - {'image': self._image_id, - 'state': image_status}) - LOG.error(excep_msg) - excep = exceptions.ImageTransferException(excep_msg) - self._done.send_exception(excep) - except Exception as excep: - self.stop() - excep_msg = (_("Error occurred while writing image: %s") % - self._image_id) - LOG.exception(excep_msg) - excep = exceptions.ImageTransferException(excep_msg, excep) - self._done.send_exception(excep) - - LOG.debug("Starting image write task for image: %(image)s with" - " source: %(source)s.", - {'source': self._input_file, - 'image': self._image_id}) - greenthread.spawn(_inner) - return self._done - - def stop(self): - """Stop the image writing task.""" - LOG.debug("Stopping the writing task for image: %s.", - self._image_id) - self._running = False - - def wait(self): - """Wait for the image writer task to complete. - - This method returns True if the writer thread completes successfully. - In case of error, it raises ImageTransferException. - - :raises ImageTransferException - """ - return self._done.wait() - - def close(self): - """This is a NOP.""" - pass - - def __str__(self): - string = "Image Writer " % (self._input_file, - self._image_id) - return string - - -class FileReadWriteTask(object): - """Task which reads data from the input file and writes to the output file. - - This class defines the task which copies the given input file to the given - output file. The copy operation involves reading chunks of data from the - input file and writing the same to the output file. - """ - - def __init__(self, input_file, output_file): - """Initializes the read-write task with the given input parameters. - - :param input_file: the input file handle - :param output_file: the output file handle - """ - self._input_file = input_file - self._output_file = output_file - self._running = False - - def start(self): - """Start the file read - file write task. - - :returns: the event indicating the status of the read-write task - """ - self._done = event.Event() - - def _inner(): - """Task performing the file read-write operation.""" - self._running = True - while self._running: - try: - data = self._input_file.read(rw_handles.READ_CHUNKSIZE) - if not data: - LOG.debug("File read-write task is done.") - self.stop() - self._done.send(True) - self._output_file.write(data) - - # update lease progress if applicable - if hasattr(self._input_file, "update_progress"): - self._input_file.update_progress() - if hasattr(self._output_file, "update_progress"): - self._output_file.update_progress() - - greenthread.sleep(FILE_READ_WRITE_TASK_SLEEP_TIME) - except Exception as excep: - self.stop() - excep_msg = _("Error occurred during file read-write " - "task.") - LOG.exception(excep_msg) - excep = exceptions.ImageTransferException(excep_msg, excep) - self._done.send_exception(excep) - - LOG.debug("Starting file read-write task with source: %(source)s " - "and destination: %(dest)s.", - {'source': self._input_file, - 'dest': self._output_file}) - greenthread.spawn(_inner) - return self._done - - def stop(self): - """Stop the read-write task.""" - LOG.debug("Stopping the file read-write task.") - self._running = False - - def wait(self): - """Wait for the file read-write task to complete. - - This method returns True if the read-write thread completes - successfully. In case of error, it raises ImageTransferException. - - :raises: ImageTransferException - """ - return self._done.wait() - - def __str__(self): - string = ("File Read-Write Task " % - (self._input_file, self._output_file)) - return string - - -# Functions to perform image transfer between VMware servers and image service. - - -def _start_transfer(context, timeout_secs, read_file_handle, max_data_size, - write_file_handle=None, image_service=None, image_id=None, - image_meta=None): - """Start the image transfer. - - The image reader reads the data from the image source and writes to the - blocking queue. The image source is always a file handle (VmdkReadHandle - or ImageReadHandle); therefore, a FileReadWriteTask is created for this - transfer. The image writer reads the data from the blocking queue and - writes it to the image destination. The image destination is either a - file or VMDK in VMware datastore or an image in the image service. - - If the destination is a file or VMDK in VMware datastore, the method - creates a FileReadWriteTask which reads from the blocking queue and - writes to either FileWriteHandle or VmdkWriteHandle. In the case of - image service as the destination, an instance of ImageWriter task is - created which reads from the blocking queue and writes to the image - service. - - :param context: write context needed for the image service - :param timeout_secs: time in seconds to wait for the transfer to complete - :param read_file_handle: handle to read data from - :param max_data_size: maximum transfer size - :param write_file_handle: handle to write data to; if this is None, then - param image_service and param image_id should - be set. - :param image_service: image service handle - :param image_id: ID of the image in the image service - :param image_meta: image meta-data - :raises: ImageTransferException, ValueError - """ - - # Create the blocking queue - blocking_queue = BlockingQueue(BLOCKING_QUEUE_SIZE, max_data_size) - - # Create the image reader - reader = FileReadWriteTask(read_file_handle, blocking_queue) - - # Create the image writer - if write_file_handle: - # File or VMDK in VMware datastore is the image destination - writer = FileReadWriteTask(blocking_queue, write_file_handle) - elif image_service and image_id: - # Image service image is the destination - writer = ImageWriter(context, - blocking_queue, - image_service, - image_id, - image_meta) - else: - excep_msg = _("No image destination given.") - LOG.error(excep_msg) - raise ValueError(excep_msg) - - # Start the reader and writer - LOG.debug("Starting image transfer with reader: %(reader)s and writer: " - "%(writer)s", - {'reader': reader, - 'writer': writer}) - reader.start() - writer.start() +def _start_transfer(read_handle, write_handle, timeout_secs): + # write_handle could be an NFC lease, so we need to periodically + # update its progress + update_cb = getattr(write_handle, 'update_progress', lambda: None) + updater = loopingcall.FixedIntervalLoopingCall(update_cb) timer = timeout.Timeout(timeout_secs) try: - # Wait for the reader and writer to complete - reader.wait() - writer.wait() - except (timeout.Timeout, exceptions.ImageTransferException) as excep: - excep_msg = (_("Error occurred during image transfer with reader: " - "%(reader)s and writer: %(writer)s") % - {'reader': reader, - 'writer': writer}) - LOG.exception(excep_msg) - reader.stop() - writer.stop() - - if isinstance(excep, exceptions.ImageTransferException): - raise - raise exceptions.ImageTransferException(excep_msg, excep) + updater.start(interval=NFC_LEASE_UPDATE_PERIOD) + while True: + data = read_handle.read(CHUNK_SIZE) + if not data: + break + write_handle.write(data) + except timeout.Timeout as excep: + msg = (_('Timeout, read_handle: "%(src)s", write_handle: "%(dest)s"') % + {'src': read_handle, + 'dest': write_handle}) + LOG.exception(msg) + raise exceptions.ImageTransferException(msg, excep) + except Exception as excep: + msg = (_('Error, read_handle: "%(src)s", write_handle: "%(dest)s"') % + {'src': read_handle, + 'dest': write_handle}) + LOG.exception(msg) + raise exceptions.ImageTransferException(msg, excep) finally: timer.cancel() - read_file_handle.close() - if write_file_handle: - write_file_handle.close() + updater.stop() + read_handle.close() + write_handle.close() def download_image(image, image_meta, session, datastore, rel_path, @@ -427,8 +102,7 @@ def download_image(image, image_meta, session, datastore, rel_path, conn.write = conn.send read_handle = rw_handles.ImageReadHandle(image) - _start_transfer(None, timeout_secs, read_handle, image_size, - write_file_handle=conn) + _start_transfer(read_handle, conn, timeout_secs) def download_flat_image(context, timeout_secs, image_service, image_id, @@ -458,11 +132,7 @@ def download_flat_image(context, timeout_secs, image_service, image_id, kwargs.get('file_path'), file_size, cacerts=kwargs.get('cacerts')) - _start_transfer(context, - timeout_secs, - read_handle, - file_size, - write_file_handle=write_handle) + _start_transfer(read_handle, write_handle, timeout_secs) LOG.debug("Downloaded image: %s from image service as a flat file.", image_id) @@ -490,11 +160,7 @@ def download_stream_optimized_data(context, timeout_secs, read_handle, kwargs.get('vm_folder'), kwargs.get('vm_import_spec'), file_size) - _start_transfer(context, - timeout_secs, - read_handle, - file_size, - write_file_handle=write_handle) + _start_transfer(read_handle, write_handle, timeout_secs) return write_handle.get_imported_vm() @@ -578,8 +244,7 @@ def copy_stream_optimized_disk( kwargs.get('vm'), kwargs.get('vmdk_file_path'), file_size) - _start_transfer(context, timeout_secs, read_handle, file_size, - write_file_handle=write_handle) + _start_transfer(read_handle, write_handle, timeout_secs) LOG.debug("Downloaded virtual disk: %s.", vmdk_file_path) @@ -623,14 +288,12 @@ def upload_image(context, timeout_secs, image_service, image_id, owner_id, kwargs.get('image_version'), 'vmware_disktype': 'streamOptimized', 'owner_id': owner_id}} - - # Passing 0 as the file size since data size to be transferred cannot be - # predetermined. - _start_transfer(context, - timeout_secs, - read_handle, - 0, - image_service=image_service, - image_id=image_id, - image_meta=image_metadata) + updater = loopingcall.FixedIntervalLoopingCall(read_handle.update_progress) + try: + updater.start(interval=NFC_LEASE_UPDATE_PERIOD) + image_service.update(context, image_id, image_metadata, + data=read_handle) + finally: + updater.stop() + read_handle.close() LOG.debug("Uploaded image: %s.", image_id) diff --git a/oslo_vmware/tests/test_image_transfer.py b/oslo_vmware/tests/test_image_transfer.py index 2634e15..d998543 100644 --- a/oslo_vmware/tests/test_image_transfer.py +++ b/oslo_vmware/tests/test_image_transfer.py @@ -17,280 +17,23 @@ Unit tests for functions and classes for image transfer. """ -import math - -from eventlet import greenthread -from eventlet import timeout import mock +import six from oslo_vmware import exceptions from oslo_vmware import image_transfer -from oslo_vmware import rw_handles from oslo_vmware.tests import base -class BlockingQueueTest(base.TestCase): - """Tests for BlockingQueue.""" - - def test_read(self): - max_size = 10 - chunk_size = 10 - max_transfer_size = 30 - queue = image_transfer.BlockingQueue(max_size, max_transfer_size) - - def get_side_effect(): - return [1] * chunk_size - - queue.get = mock.Mock(side_effect=get_side_effect) - while True: - data_item = queue.read(chunk_size) - if not data_item: - break - - self.assertEqual(max_transfer_size, queue._transferred) - exp_calls = [mock.call()] * int(math.ceil(float(max_transfer_size) / - chunk_size)) - self.assertEqual(exp_calls, queue.get.call_args_list) - - def test_write(self): - queue = image_transfer.BlockingQueue(10, 30) - queue.put = mock.Mock() - write_count = 10 - for _ in range(0, write_count): - queue.write([1]) - exp_calls = [mock.call([1])] * write_count - self.assertEqual(exp_calls, queue.put.call_args_list) - - def test_seek(self): - queue = image_transfer.BlockingQueue(10, 30) - self.assertRaises(IOError, queue.seek, 5) - - def test_tell(self): - queue = image_transfer.BlockingQueue(10, 30) - self.assertEqual(0, queue.tell()) - queue.get = mock.Mock(return_value=[1] * 10) - queue.read(10) - self.assertEqual(10, queue.tell()) - - -class ImageWriterTest(base.TestCase): - """Tests for ImageWriter class.""" - - def _create_image_writer(self): - self.image_service = mock.Mock() - self.context = mock.Mock() - self.input_file = mock.Mock() - self.image_id = mock.Mock() - return image_transfer.ImageWriter(self.context, self.input_file, - self.image_service, self.image_id) - - @mock.patch.object(greenthread, 'sleep') - def test_start(self, mock_sleep): - writer = self._create_image_writer() - status_list = ['queued', 'saving', 'active'] - - def image_service_show_side_effect(context, image_id): - status = status_list.pop(0) - return {'status': status} - - self.image_service.show.side_effect = image_service_show_side_effect - exp_calls = [mock.call(self.context, self.image_id)] * len(status_list) - writer.start() - self.assertTrue(writer.wait()) - self.image_service.update.assert_called_once_with(self.context, - self.image_id, {}, - data=self.input_file) - self.assertEqual(exp_calls, self.image_service.show.call_args_list) - - def test_start_with_killed_status(self): - writer = self._create_image_writer() - - def image_service_show_side_effect(_context, _image_id): - return {'status': 'killed'} - - self.image_service.show.side_effect = image_service_show_side_effect - writer.start() - self.assertRaises(exceptions.ImageTransferException, - writer.wait) - self.image_service.update.assert_called_once_with(self.context, - self.image_id, {}, - data=self.input_file) - self.image_service.show.assert_called_once_with(self.context, - self.image_id) - - def test_start_with_unknown_status(self): - writer = self._create_image_writer() - - def image_service_show_side_effect(_context, _image_id): - return {'status': 'unknown'} - - self.image_service.show.side_effect = image_service_show_side_effect - writer.start() - self.assertRaises(exceptions.ImageTransferException, - writer.wait) - self.image_service.update.assert_called_once_with(self.context, - self.image_id, {}, - data=self.input_file) - self.image_service.show.assert_called_once_with(self.context, - self.image_id) - - def test_start_with_image_service_show_exception(self): - writer = self._create_image_writer() - self.image_service.show.side_effect = RuntimeError() - writer.start() - self.assertRaises(exceptions.ImageTransferException, writer.wait) - self.image_service.update.assert_called_once_with(self.context, - self.image_id, {}, - data=self.input_file) - self.image_service.show.assert_called_once_with(self.context, - self.image_id) - - -class FileReadWriteTaskTest(base.TestCase): - """Tests for FileReadWriteTask class.""" - - def test_start(self): - data_items = [[1] * 10, [1] * 20, [1] * 5, []] - - def input_file_read_side_effect(arg): - self.assertEqual(arg, rw_handles.READ_CHUNKSIZE) - data = data_items[input_file_read_side_effect.i] - input_file_read_side_effect.i += 1 - return data - - input_file_read_side_effect.i = 0 - input_file = mock.Mock() - input_file.read.side_effect = input_file_read_side_effect - output_file = mock.Mock() - rw_task = image_transfer.FileReadWriteTask(input_file, output_file) - rw_task.start() - self.assertTrue(rw_task.wait()) - self.assertEqual(len(data_items), input_file.read.call_count) - - exp_calls = [] - for i in range(0, len(data_items)): - exp_calls.append(mock.call(data_items[i])) - self.assertEqual(exp_calls, output_file.write.call_args_list) - - self.assertEqual(len(data_items), - input_file.update_progress.call_count) - self.assertEqual(len(data_items), - output_file.update_progress.call_count) - - def test_start_with_read_exception(self): - input_file = mock.Mock() - input_file.read.side_effect = RuntimeError() - output_file = mock.Mock() - rw_task = image_transfer.FileReadWriteTask(input_file, output_file) - rw_task.start() - self.assertRaises(exceptions.ImageTransferException, rw_task.wait) - input_file.read.assert_called_once_with(rw_handles.READ_CHUNKSIZE) - - class ImageTransferUtilityTest(base.TestCase): """Tests for image_transfer utility methods.""" - @mock.patch.object(timeout, 'Timeout') - @mock.patch.object(image_transfer, 'ImageWriter') - @mock.patch.object(image_transfer, 'FileReadWriteTask') - @mock.patch.object(image_transfer, 'BlockingQueue') - def test_start_transfer(self, fake_BlockingQueue, fake_FileReadWriteTask, - fake_ImageWriter, fake_Timeout): - - context = mock.Mock() - read_file_handle = mock.Mock() - read_file_handle.close = mock.Mock() - image_service = mock.Mock() - image_id = mock.Mock() - blocking_queue = mock.Mock() - - write_file_handle1 = mock.Mock() - write_file_handle1.close = mock.Mock() - write_file_handle2 = None - write_file_handles = [write_file_handle1, write_file_handle2] - - timeout_secs = 10 - blocking_queue_size = 10 - image_meta = {} - max_data_size = 30 - - fake_BlockingQueue.return_value = blocking_queue - fake_timer = mock.Mock() - fake_timer.cancel = mock.Mock() - fake_Timeout.return_value = fake_timer - - for write_file_handle in write_file_handles: - image_transfer._start_transfer(context, - timeout_secs, - read_file_handle, - max_data_size, - write_file_handle=write_file_handle, - image_service=image_service, - image_id=image_id, - image_meta=image_meta) - - exp_calls = [mock.call(blocking_queue_size, - max_data_size)] * len(write_file_handles) - self.assertEqual(exp_calls, - fake_BlockingQueue.call_args_list) - - exp_calls2 = [mock.call(read_file_handle, blocking_queue), - mock.call(blocking_queue, write_file_handle1), - mock.call(read_file_handle, blocking_queue)] - self.assertEqual(exp_calls2, - fake_FileReadWriteTask.call_args_list) - - exp_calls3 = mock.call(context, blocking_queue, image_service, - image_id, image_meta) - self.assertEqual(exp_calls3, - fake_ImageWriter.call_args) - - exp_calls4 = [mock.call(timeout_secs)] * len(write_file_handles) - self.assertEqual(exp_calls4, - fake_Timeout.call_args_list) - - self.assertEqual(len(write_file_handles), - fake_timer.cancel.call_count) - - self.assertEqual(len(write_file_handles), - read_file_handle.close.call_count) - - write_file_handle1.close.assert_called_once_with() - - @mock.patch.object(image_transfer, 'FileReadWriteTask') - @mock.patch.object(image_transfer, 'BlockingQueue') - def test_start_transfer_with_no_image_destination(self, fake_BlockingQueue, - fake_FileReadWriteTask): - - context = mock.Mock() - read_file_handle = mock.Mock() - write_file_handle = None - image_service = None - image_id = None - timeout_secs = 10 - image_meta = {} - blocking_queue_size = 10 - max_data_size = 30 - blocking_queue = mock.Mock() - - fake_BlockingQueue.return_value = blocking_queue - - self.assertRaises(ValueError, - image_transfer._start_transfer, - context, - timeout_secs, - read_file_handle, - max_data_size, - write_file_handle=write_file_handle, - image_service=image_service, - image_id=image_id, - image_meta=image_meta) - - fake_BlockingQueue.assert_called_once_with(blocking_queue_size, - max_data_size) - - fake_FileReadWriteTask.assert_called_once_with(read_file_handle, - blocking_queue) + def test_start_transfer(self): + data = 'image-data-here' + read_handle = six.StringIO(data) + write_handle = mock.Mock() + image_transfer._start_transfer(read_handle, write_handle, None) + write_handle.write.assert_called_once_with(data) @mock.patch('oslo_vmware.rw_handles.FileWriteHandle') @mock.patch('oslo_vmware.rw_handles.ImageReadHandle') @@ -349,11 +92,9 @@ class ImageTransferUtilityTest(base.TestCase): cacerts=None) fake_transfer.assert_called_once_with( - context, - timeout_secs, fake_ImageReadHandle, - image_size, - write_file_handle=fake_FileWriteHandle) + fake_FileWriteHandle, + timeout_secs) @mock.patch('oslo_vmware.rw_handles.VmdkWriteHandle') @mock.patch.object(image_transfer, '_start_transfer') @@ -396,12 +137,9 @@ class ImageTransferUtilityTest(base.TestCase): vm_import_spec, image_size) - fake_transfer.assert_called_once_with( - context, - timeout_secs, - read_handle, - image_size, - write_file_handle=fake_VmdkWriteHandle) + fake_transfer.assert_called_once_with(read_handle, + fake_VmdkWriteHandle, + timeout_secs) fake_VmdkWriteHandle.get_imported_vm.assert_called_once_with() @@ -573,9 +311,8 @@ class ImageTransferUtilityTest(base.TestCase): vmdk_read_handle.assert_called_once_with( session, host, port, vm, vmdk_file_path, vmdk_size) - start_transfer.assert_called_once_with( - context, timeout, read_handle, vmdk_size, - write_file_handle=write_handle) + start_transfer.assert_called_once_with(read_handle, write_handle, + timeout) @mock.patch('oslo_vmware.rw_handles.VmdkReadHandle') @mock.patch.object(image_transfer, '_start_transfer') @@ -601,7 +338,7 @@ class ImageTransferUtilityTest(base.TestCase): image_name = 'fake_image' image_version = 1 - fake_VmdkReadHandle = 'fake_VmdkReadHandle' + fake_VmdkReadHandle = mock.Mock() fake_rw_handles_VmdkReadHandle.return_value = fake_VmdkReadHandle image_transfer.upload_image(context, @@ -633,10 +370,7 @@ class ImageTransferUtilityTest(base.TestCase): 'vmware_disktype': 'streamOptimized', 'owner_id': owner_id}} - fake_transfer.assert_called_once_with(context, - timeout_secs, - fake_VmdkReadHandle, - 0, - image_service=image_service, - image_id=image_id, - image_meta=image_metadata) + image_service.update.assert_called_once_with(context, + image_id, + image_metadata, + data=fake_VmdkReadHandle)