From 7ab8e35ff63943d74b7cd04fbbfdb04c4feff920 Mon Sep 17 00:00:00 2001 From: Prashanth Pai Date: Fri, 30 May 2014 11:25:01 +0530 Subject: [PATCH] Support Storage Policy - Rev 1 * Inherit DiskFileMananger class and few other methods from Swift and reuse them. * Unit tests for write_pickle method. * Added policy_idx args, wherever missing. Signed-off-by: Prashanth Pai --- gluster/swift/obj/diskfile.py | 56 +++++++--------------------------- test/unit/common/test_utils.py | 36 ++++++++++++++++++++++ 2 files changed, 47 insertions(+), 45 deletions(-) diff --git a/gluster/swift/obj/diskfile.py b/gluster/swift/obj/diskfile.py index 332ec83..54dd0e7 100644 --- a/gluster/swift/obj/diskfile.py +++ b/gluster/swift/obj/diskfile.py @@ -51,8 +51,8 @@ from gluster.swift.common.utils import X_CONTENT_TYPE, \ FILE_TYPE, DEFAULT_UID, DEFAULT_GID, DIR_NON_OBJECT, DIR_OBJECT, \ X_ETAG, X_CONTENT_LENGTH from ConfigParser import ConfigParser, NoSectionError, NoOptionError -from swift.common.storage_policy import get_policy_string -from functools import partial +from swift.obj.diskfile import get_async_dir +from swift.obj.diskfile import DiskFileManager as SwiftDiskFileManager # FIXME: Hopefully we'll be able to move to Python 2.7+ where O_CLOEXEC will # be back ported. See http://www.python.org/dev/peps/pep-0433/ @@ -70,9 +70,6 @@ MAX_OPEN_ATTEMPTS = 10 _cur_pid = str(os.getpid()) _cur_host = str(gethostname()) -ASYNCDIR_BASE = 'async_pending' -get_async_dir = partial(get_policy_string, ASYNCDIR_BASE) - def _random_sleep(): sleep(random.uniform(0.5, 0.15)) @@ -229,7 +226,7 @@ def _adjust_metadata(metadata): return metadata -class DiskFileManager(object): +class DiskFileManager(SwiftDiskFileManager): """ Management class for devices, providing common place for shared parameters and methods not provided by the DiskFile class (which primarily services @@ -247,29 +244,9 @@ class DiskFileManager(object): :param logger: caller provided logger """ def __init__(self, conf, logger): - self.logger = logger - self.disk_chunk_size = int(conf.get('disk_chunk_size', - DEFAULT_DISK_CHUNK_SIZE)) - self.keep_cache_size = int(conf.get('keep_cache_size', - DEFAULT_KEEP_CACHE_SIZE)) - self.bytes_per_sync = int(conf.get('mb_per_sync', - DEFAULT_MB_PER_SYNC)) * 1024 * 1024 - self.devices = conf.get('devices', '/srv/node/') - self.mount_check = config_true_value(conf.get('mount_check', 'true')) - threads_per_disk = int(conf.get('threads_per_disk', '0')) - self.threadpools = defaultdict( - lambda: ThreadPool(nthreads=threads_per_disk)) + super(DiskFileManager, self).__init__(conf, logger) - def construct_dev_path(self, device): - """ - Construct the path to a device without checking if it is mounted. - - :param device: name of target device - :returns: full path to the device - """ - return os.path.join(self.devices, device) - - def _get_dev_path(self, device): + def get_dev_path(self, device, mount_check=None): """ Return the path to a device, checking to see that it is a proper mount point based on a configuration parameter. @@ -278,7 +255,8 @@ class DiskFileManager(object): :returns: full path to the device, None if the path to the device is not a proper mount point. """ - if self.mount_check and not mount(self.devices, device): + should_check = self.mount_check if mount_check is None else mount_check + if should_check and not mount(self.devices, device): dev_path = None else: dev_path = os.path.join(self.devices, device) @@ -286,24 +264,11 @@ class DiskFileManager(object): def get_diskfile(self, device, account, container, obj, policy_idx=0, **kwargs): - dev_path = self._get_dev_path(device) + dev_path = self.get_dev_path(device) if not dev_path: raise DiskFileDeviceUnavailable() return DiskFile(self, dev_path, self.threadpools[device], - account, container, obj, **kwargs) - - def pickle_async_update(self, device, account, container, obj, data, - timestamp, policy_idx): - device_path = self.construct_dev_path(device) - async_dir = os.path.join(device_path, get_async_dir(policy_idx)) - ohash = hash_path(account, container, obj) - self.threadpools[device].run_in_thread( - write_pickle, - data, - os.path.join(async_dir, ohash[-3:], ohash + '-' + - normalize_timestamp(timestamp)), - os.path.join(device_path, 'tmp')) - self.logger.increment('async_pendings') + account, container, obj, policy_idx, **kwargs) class DiskFileWriter(object): @@ -622,7 +587,7 @@ class DiskFile(object): :param gid: group ID disk object should assume (file or directory) """ def __init__(self, mgr, dev_path, threadpool, account, container, obj, - uid=DEFAULT_UID, gid=DEFAULT_GID): + policy_idx=0, uid=DEFAULT_UID, gid=DEFAULT_GID): self._mgr = mgr self._device_path = dev_path self._threadpool = threadpool or ThreadPool(nthreads=0) @@ -634,6 +599,7 @@ class DiskFile(object): self._fd = None # Don't store a value for data_file until we know it exists. self._data_file = None + self._policy_idx = int(policy_idx) if not hasattr(self._mgr, 'reseller_prefix'): self._mgr.reseller_prefix = 'AUTH_' diff --git a/test/unit/common/test_utils.py b/test/unit/common/test_utils.py index dd03bd8..a998bf5 100644 --- a/test/unit/common/test_utils.py +++ b/test/unit/common/test_utils.py @@ -712,6 +712,42 @@ class TestUtils(unittest.TestCase): ret = utils.validate_object(md) assert ret + def test_write_pickle(self): + td = tempfile.mkdtemp() + try: + fpp = os.path.join(td, 'pp') + # FIXME: Remove this patch when coverage.py can handle eventlet + with patch("gluster.swift.common.fs_utils.do_fsync", + _mock_os_fsync): + utils.write_pickle('pickled peppers', fpp) + with open(fpp, "rb") as f: + contents = f.read() + s = pickle.loads(contents) + assert s == 'pickled peppers', repr(s) + finally: + shutil.rmtree(td) + + def test_write_pickle_ignore_tmp(self): + tf = tempfile.NamedTemporaryFile() + td = tempfile.mkdtemp() + try: + fpp = os.path.join(td, 'pp') + # Also test an explicity pickle protocol + # FIXME: Remove this patch when coverage.py can handle eventlet + with patch("gluster.swift.common.fs_utils.do_fsync", + _mock_os_fsync): + utils.write_pickle('pickled peppers', fpp, tmp=tf.name, + pickle_protocol=2) + with open(fpp, "rb") as f: + contents = f.read() + s = pickle.loads(contents) + assert s == 'pickled peppers', repr(s) + with open(tf.name, "rb") as f: + contents = f.read() + assert contents == '' + finally: + shutil.rmtree(td) + class TestUtilsDirObjects(unittest.TestCase):