Support Storage Policy - Rev 1

* Inherit DiskFileMananger class and few other methods from Swift
  and reuse them.
* Unit tests for write_pickle method.
* Added policy_idx args, wherever missing.

Signed-off-by: Prashanth Pai <ppai@redhat.com>
This commit is contained in:
Prashanth Pai 2014-05-30 11:25:01 +05:30
parent 3f4d0f98de
commit 7ab8e35ff6
2 changed files with 47 additions and 45 deletions
gluster/swift/obj
test/unit/common

@ -51,8 +51,8 @@ from gluster.swift.common.utils import X_CONTENT_TYPE, \
FILE_TYPE, DEFAULT_UID, DEFAULT_GID, DIR_NON_OBJECT, DIR_OBJECT, \ FILE_TYPE, DEFAULT_UID, DEFAULT_GID, DIR_NON_OBJECT, DIR_OBJECT, \
X_ETAG, X_CONTENT_LENGTH X_ETAG, X_CONTENT_LENGTH
from ConfigParser import ConfigParser, NoSectionError, NoOptionError from ConfigParser import ConfigParser, NoSectionError, NoOptionError
from swift.common.storage_policy import get_policy_string from swift.obj.diskfile import get_async_dir
from functools import partial from swift.obj.diskfile import DiskFileManager as SwiftDiskFileManager
# FIXME: Hopefully we'll be able to move to Python 2.7+ where O_CLOEXEC will # FIXME: Hopefully we'll be able to move to Python 2.7+ where O_CLOEXEC will
# be back ported. See http://www.python.org/dev/peps/pep-0433/ # be back ported. See http://www.python.org/dev/peps/pep-0433/
@ -70,9 +70,6 @@ MAX_OPEN_ATTEMPTS = 10
_cur_pid = str(os.getpid()) _cur_pid = str(os.getpid())
_cur_host = str(gethostname()) _cur_host = str(gethostname())
ASYNCDIR_BASE = 'async_pending'
get_async_dir = partial(get_policy_string, ASYNCDIR_BASE)
def _random_sleep(): def _random_sleep():
sleep(random.uniform(0.5, 0.15)) sleep(random.uniform(0.5, 0.15))
@ -229,7 +226,7 @@ def _adjust_metadata(metadata):
return metadata return metadata
class DiskFileManager(object): class DiskFileManager(SwiftDiskFileManager):
""" """
Management class for devices, providing common place for shared parameters Management class for devices, providing common place for shared parameters
and methods not provided by the DiskFile class (which primarily services and methods not provided by the DiskFile class (which primarily services
@ -247,29 +244,9 @@ class DiskFileManager(object):
:param logger: caller provided logger :param logger: caller provided logger
""" """
def __init__(self, conf, logger): def __init__(self, conf, logger):
self.logger = logger super(DiskFileManager, self).__init__(conf, logger)
self.disk_chunk_size = int(conf.get('disk_chunk_size',
DEFAULT_DISK_CHUNK_SIZE))
self.keep_cache_size = int(conf.get('keep_cache_size',
DEFAULT_KEEP_CACHE_SIZE))
self.bytes_per_sync = int(conf.get('mb_per_sync',
DEFAULT_MB_PER_SYNC)) * 1024 * 1024
self.devices = conf.get('devices', '/srv/node/')
self.mount_check = config_true_value(conf.get('mount_check', 'true'))
threads_per_disk = int(conf.get('threads_per_disk', '0'))
self.threadpools = defaultdict(
lambda: ThreadPool(nthreads=threads_per_disk))
def construct_dev_path(self, device): def get_dev_path(self, device, mount_check=None):
"""
Construct the path to a device without checking if it is mounted.
:param device: name of target device
:returns: full path to the device
"""
return os.path.join(self.devices, device)
def _get_dev_path(self, device):
""" """
Return the path to a device, checking to see that it is a proper mount Return the path to a device, checking to see that it is a proper mount
point based on a configuration parameter. point based on a configuration parameter.
@ -278,7 +255,8 @@ class DiskFileManager(object):
:returns: full path to the device, None if the path to the device is :returns: full path to the device, None if the path to the device is
not a proper mount point. not a proper mount point.
""" """
if self.mount_check and not mount(self.devices, device): should_check = self.mount_check if mount_check is None else mount_check
if should_check and not mount(self.devices, device):
dev_path = None dev_path = None
else: else:
dev_path = os.path.join(self.devices, device) dev_path = os.path.join(self.devices, device)
@ -286,24 +264,11 @@ class DiskFileManager(object):
def get_diskfile(self, device, account, container, obj, def get_diskfile(self, device, account, container, obj,
policy_idx=0, **kwargs): policy_idx=0, **kwargs):
dev_path = self._get_dev_path(device) dev_path = self.get_dev_path(device)
if not dev_path: if not dev_path:
raise DiskFileDeviceUnavailable() raise DiskFileDeviceUnavailable()
return DiskFile(self, dev_path, self.threadpools[device], return DiskFile(self, dev_path, self.threadpools[device],
account, container, obj, **kwargs) account, container, obj, policy_idx, **kwargs)
def pickle_async_update(self, device, account, container, obj, data,
timestamp, policy_idx):
device_path = self.construct_dev_path(device)
async_dir = os.path.join(device_path, get_async_dir(policy_idx))
ohash = hash_path(account, container, obj)
self.threadpools[device].run_in_thread(
write_pickle,
data,
os.path.join(async_dir, ohash[-3:], ohash + '-' +
normalize_timestamp(timestamp)),
os.path.join(device_path, 'tmp'))
self.logger.increment('async_pendings')
class DiskFileWriter(object): class DiskFileWriter(object):
@ -622,7 +587,7 @@ class DiskFile(object):
:param gid: group ID disk object should assume (file or directory) :param gid: group ID disk object should assume (file or directory)
""" """
def __init__(self, mgr, dev_path, threadpool, account, container, obj, def __init__(self, mgr, dev_path, threadpool, account, container, obj,
uid=DEFAULT_UID, gid=DEFAULT_GID): policy_idx=0, uid=DEFAULT_UID, gid=DEFAULT_GID):
self._mgr = mgr self._mgr = mgr
self._device_path = dev_path self._device_path = dev_path
self._threadpool = threadpool or ThreadPool(nthreads=0) self._threadpool = threadpool or ThreadPool(nthreads=0)
@ -634,6 +599,7 @@ class DiskFile(object):
self._fd = None self._fd = None
# Don't store a value for data_file until we know it exists. # Don't store a value for data_file until we know it exists.
self._data_file = None self._data_file = None
self._policy_idx = int(policy_idx)
if not hasattr(self._mgr, 'reseller_prefix'): if not hasattr(self._mgr, 'reseller_prefix'):
self._mgr.reseller_prefix = 'AUTH_' self._mgr.reseller_prefix = 'AUTH_'

@ -712,6 +712,42 @@ class TestUtils(unittest.TestCase):
ret = utils.validate_object(md) ret = utils.validate_object(md)
assert ret assert ret
def test_write_pickle(self):
td = tempfile.mkdtemp()
try:
fpp = os.path.join(td, 'pp')
# FIXME: Remove this patch when coverage.py can handle eventlet
with patch("gluster.swift.common.fs_utils.do_fsync",
_mock_os_fsync):
utils.write_pickle('pickled peppers', fpp)
with open(fpp, "rb") as f:
contents = f.read()
s = pickle.loads(contents)
assert s == 'pickled peppers', repr(s)
finally:
shutil.rmtree(td)
def test_write_pickle_ignore_tmp(self):
tf = tempfile.NamedTemporaryFile()
td = tempfile.mkdtemp()
try:
fpp = os.path.join(td, 'pp')
# Also test an explicity pickle protocol
# FIXME: Remove this patch when coverage.py can handle eventlet
with patch("gluster.swift.common.fs_utils.do_fsync",
_mock_os_fsync):
utils.write_pickle('pickled peppers', fpp, tmp=tf.name,
pickle_protocol=2)
with open(fpp, "rb") as f:
contents = f.read()
s = pickle.loads(contents)
assert s == 'pickled peppers', repr(s)
with open(tf.name, "rb") as f:
contents = f.read()
assert contents == ''
finally:
shutil.rmtree(td)
class TestUtilsDirObjects(unittest.TestCase): class TestUtilsDirObjects(unittest.TestCase):