attempt to fix issues with HPSS-specific features; this won't work 100% until we get the HPSS API instead of FUSE
This commit is contained in:
parent
37ce2e360d
commit
33b930271e
@ -55,7 +55,7 @@ from swift.obj.diskfile import get_async_dir
|
||||
|
||||
# FIXME: Hopefully we'll be able to move to Python 2.7+ where O_CLOEXEC will
|
||||
# be back ported. See http://www.python.org/dev/peps/pep-0433/
|
||||
O_CLOEXEC = 0o20000000
|
||||
O_CLOEXEC = 0o2000000
|
||||
|
||||
MAX_RENAME_ATTEMPTS = 10
|
||||
MAX_OPEN_ATTEMPTS = 10
|
||||
@ -299,7 +299,7 @@ class DiskFileWriter(object):
|
||||
df._threadpool.run_in_thread(self._write_entire_chunk, chunk)
|
||||
return self._upload_size
|
||||
|
||||
def _finalize_put(self, metadata, purgelock=False, has_etag=True):
|
||||
def _finalize_put(self, metadata, purgelock=False):
|
||||
# Write out metadata before fsync() to ensure it is also forced to
|
||||
# disk.
|
||||
write_metadata(self._fd, metadata)
|
||||
@ -310,15 +310,6 @@ class DiskFileWriter(object):
|
||||
# clean).
|
||||
do_fsync(self._fd)
|
||||
|
||||
# (HPSS) Purge lock the file now if we're asked to.
|
||||
if purgelock:
|
||||
try:
|
||||
hpssfs.ioctl(self._fd, hpssfs.HPSSFS_PURGE_LOCK, int(purgelock))
|
||||
except IOError as err:
|
||||
raise SwiftOnFileSystemIOError(err.errno,
|
||||
'%s, hpssfs.ioctl("%s", ...)' % (
|
||||
err.strerror, self._fd))
|
||||
|
||||
# From the Department of the Redundancy Department, make sure
|
||||
# we call drop_cache() after fsync() to avoid redundant work
|
||||
# (pages all clean).
|
||||
@ -392,33 +383,20 @@ class DiskFileWriter(object):
|
||||
# Success!
|
||||
break
|
||||
|
||||
# (HPSS) Purge lock the file now if we're asked to.
|
||||
if purgelock:
|
||||
try:
|
||||
hpssfs.ioctl(self._fd, hpssfs.HPSSFS_PURGE_LOCK, int(purgelock))
|
||||
except IOError as err:
|
||||
raise SwiftOnFileSystemIOError(err.errno,
|
||||
'%s, hpssfs.ioctl("%s", ...)' % (
|
||||
err.strerror, self._fd))
|
||||
|
||||
# Close here so the calling context does not have to perform this
|
||||
# in a thread.
|
||||
self.close()
|
||||
|
||||
# TODO: see if this is really the right way of getting the ETag
|
||||
# TODO: add timeout in case we should end up never having an ETag
|
||||
if not has_etag:
|
||||
try:
|
||||
etag = None
|
||||
# We sit here and wait until hpssfs-cksum finishes calculating
|
||||
# the checksum.
|
||||
while etag is None:
|
||||
time.sleep(.25)
|
||||
xattrs = xattr.xattr(df._data_file)
|
||||
if 'system.hpss.hash' in xattrs:
|
||||
etag = xattrs['system.hpss.hash']
|
||||
elif 'user.hash.checksum' in xattrs:
|
||||
etag = xattrs['user.hash.checksum']
|
||||
metadata['ETag'] = etag
|
||||
write_metadata(df._data_file, metadata)
|
||||
except IOError as err:
|
||||
raise DiskFileError(
|
||||
err.errno,
|
||||
"Could not get xattrs for file '%s', reason: %s"
|
||||
% (df._data_file, err.strerror))
|
||||
|
||||
def put(self, metadata, purgelock=False, has_etag=True):
|
||||
def put(self, metadata, purgelock=False):
|
||||
"""
|
||||
Finalize writing the file on disk, and renames it from the temp file
|
||||
to the real location. This should be called after the data has been
|
||||
@ -448,7 +426,7 @@ class DiskFileWriter(object):
|
||||
' as a directory' % df._data_file)
|
||||
|
||||
df._threadpool.force_run_in_thread(self._finalize_put, metadata,
|
||||
purgelock, has_etag)
|
||||
purgelock)
|
||||
|
||||
# Avoid the unlink() system call as part of the mkstemp context
|
||||
# cleanup
|
||||
@ -838,11 +816,13 @@ class DiskFile(object):
|
||||
file_levels = raw_file_levels.split(";")
|
||||
top_level = file_levels[0].split(':')
|
||||
bytes_on_disk = top_level[2].rstrip(' ')
|
||||
if bytes_on_disk == 'nodata':
|
||||
bytes_on_disk = '0'
|
||||
except ValueError:
|
||||
raise SwiftOnFileSystemIOError("Couldn't get system.hpss.level!")
|
||||
return int(bytes_on_disk) != self._stat.st_size
|
||||
|
||||
def get_hpss_headers(self):
|
||||
def read_hpss_system_metadata(self):
|
||||
header_to_xattr = {'X-HPSS-Account': 'account',
|
||||
'X-HPSS-Bitfile-ID': 'bitfile',
|
||||
'X-HPSS-Comment': 'comment',
|
||||
@ -916,28 +896,15 @@ class DiskFile(object):
|
||||
|
||||
def read_metadata(self):
|
||||
"""
|
||||
Return the metadata for an object without opening the object's file on
|
||||
disk.
|
||||
Return the metadata for an object without requiring the caller to open
|
||||
the object first.
|
||||
|
||||
:returns: metadata dictionary for an object
|
||||
:raises DiskFileError: this implementation will raise the same
|
||||
errors as the `open()` method.
|
||||
"""
|
||||
# FIXME: pull a lot of this and the copy of it from open() out to
|
||||
# another function
|
||||
|
||||
# Do not actually open the file, in order to duck hpssfs checksum
|
||||
# validation and resulting timeouts
|
||||
# This means we do a few things DiskFile.open() does.
|
||||
try:
|
||||
self._is_dir = os.path.isdir(self._data_file)
|
||||
self._metadata = read_metadata(self._data_file)
|
||||
except IOError:
|
||||
raise DiskFileNotExist
|
||||
if not self._validate_object_metadata():
|
||||
self._create_object_metadata(self._data_file)
|
||||
self._filter_metadata()
|
||||
return self._metadata
|
||||
with self.open():
|
||||
return self.get_metadata()
|
||||
|
||||
def reader(self, iter_hook=None, keep_cache=False):
|
||||
"""
|
||||
@ -1032,13 +999,13 @@ class DiskFile(object):
|
||||
temporary file again. If we get file name conflict, we'll retry using
|
||||
different random suffixes 1,000 times before giving up.
|
||||
|
||||
:param cos:
|
||||
.. note::
|
||||
|
||||
An implementation is not required to perform on-disk
|
||||
preallocations even if the parameter is specified. But if it does
|
||||
and it fails, it must raise a `DiskFileNoSpace` exception.
|
||||
|
||||
:param cos:
|
||||
:param size: optional initial size of file to explicitly allocate on
|
||||
disk
|
||||
:raises DiskFileNoSpace: if a size is specified and allocation fails
|
||||
@ -1074,7 +1041,7 @@ class DiskFile(object):
|
||||
raise SwiftOnFileSystemIOError(err.errno,
|
||||
'%s, hpssfs.ioctl("%s", SET_COS)' % (
|
||||
err.strerror, fd))
|
||||
elif size:
|
||||
if size:
|
||||
try:
|
||||
hpssfs.ioctl(fd, hpssfs.HPSSFS_SET_FSIZE_HINT,
|
||||
long(size))
|
||||
|
@ -78,7 +78,6 @@ class ObjectController(server.ObjectController):
|
||||
# Replaces Swift's DiskFileRouter object reference with ours.
|
||||
self._diskfile_router = SwiftOnFileDiskFileRouter(conf, self.logger)
|
||||
self.swift_dir = conf.get('swift_dir', '/etc/swift')
|
||||
self.handle_md5 = conf.get('handle_md5', False)
|
||||
self.container_ring = None
|
||||
# This conf option will be deprecated and eventualy removed in
|
||||
# future releases
|
||||
@ -153,21 +152,18 @@ class ObjectController(server.ObjectController):
|
||||
orig_delete_at = int(orig_metadata.get('X-Delete-At') or 0)
|
||||
upload_expiration = time.time() + self.max_upload_time
|
||||
|
||||
if self.handle_md5:
|
||||
etag = md5()
|
||||
etag = md5()
|
||||
elapsed_time = 0
|
||||
|
||||
# (HPSS) Check for HPSS-specific metadata headers
|
||||
cos = request.headers.get('X-Object-Meta-COS')
|
||||
purgelock = request.headers.get('X-Object-Meta-PurgeLock')
|
||||
cos = request.headers.get('X-HPSS-Class-Of-Service-ID', None)
|
||||
purgelock = request.headers.get('X-HPSS-Purgelock-Status', 'false')
|
||||
purgelock = purgelock.lower() in ['true', '1', 'yes']
|
||||
|
||||
try:
|
||||
# Feed DiskFile our HPSS-specific stuff
|
||||
with disk_file.create(size=fsize, cos=cos) as writer:
|
||||
upload_size = 0
|
||||
# FIXME: Need to figure out how to store MIME type
|
||||
# information, to retrieve with a GET later! Or if
|
||||
# this has already been done for us.
|
||||
|
||||
def timeout_reader():
|
||||
with ChunkReadTimeout(self.client_timeout):
|
||||
@ -180,8 +176,7 @@ class ObjectController(server.ObjectController):
|
||||
if start_time > upload_expiration:
|
||||
self.logger.increment('PUT.timeouts')
|
||||
return HTTPRequestTimeout(request=request)
|
||||
if self.handle_md5:
|
||||
etag.update(chunk)
|
||||
etag.update(chunk)
|
||||
upload_size = writer.write(chunk)
|
||||
elapsed_time += time.time() - start_time
|
||||
except ChunkReadTimeout:
|
||||
@ -191,10 +186,8 @@ class ObjectController(server.ObjectController):
|
||||
elapsed_time, upload_size)
|
||||
if fsize and fsize != upload_size:
|
||||
return HTTPClientDisconnect(request=request)
|
||||
if self.handle_md5:
|
||||
etag = etag.hexdigest()
|
||||
else:
|
||||
etag = ''
|
||||
etag = etag.hexdigest()
|
||||
|
||||
if 'etag' in request.headers \
|
||||
and request.headers['etag'].lower() != etag:
|
||||
return HTTPUnprocessableEntity(request=request)
|
||||
@ -215,41 +208,44 @@ class ObjectController(server.ObjectController):
|
||||
header_caps = header_key.title()
|
||||
metadata[header_caps] = request.headers[header_key]
|
||||
|
||||
# (HPSS) Purge lock the file
|
||||
writer.put(metadata, purgelock=purgelock,
|
||||
has_etag=self.handle_md5)
|
||||
# (HPSS) Write the file, with added options
|
||||
writer.put(metadata, purgelock=purgelock)
|
||||
|
||||
except DiskFileNoSpace:
|
||||
return HTTPInsufficientStorage(drive=device, request=request)
|
||||
except SwiftOnFileSystemIOError:
|
||||
except SwiftOnFileSystemIOError as e:
|
||||
logging.debug('IOError in writing file')
|
||||
return HTTPServiceUnavailable(request=request)
|
||||
|
||||
# FIXME: this stuff really should be handled in DiskFile somehow?
|
||||
if self.handle_md5:
|
||||
# (HPSS) Set checksum on file ourselves, if hpssfs won't do it
|
||||
# for us.
|
||||
# we set the hpss checksum in here, so both systems have valid
|
||||
# and current checksum metadata
|
||||
|
||||
# (HPSS) Set checksum on file ourselves, if hpssfs won't do it
|
||||
# for us.
|
||||
data_file = disk_file._data_file
|
||||
try:
|
||||
xattr.setxattr(data_file, 'system.hpss.hash',
|
||||
"md5:%s" % etag)
|
||||
except IOError:
|
||||
logging.debug("Could not write ETag to system.hpss.hash,"
|
||||
" trying user.hash.checksum")
|
||||
try:
|
||||
xattr.setxattr(disk_file._data_file, 'system.hpss.hash',
|
||||
"md5:%s" % etag)
|
||||
except IOError:
|
||||
logging.debug("Could not write ETag to system.hpss.hash,"
|
||||
" trying user.hash.checksum")
|
||||
try:
|
||||
xattr.setxattr(disk_file._data_file,
|
||||
'user.hash.checksum', etag)
|
||||
xattr.setxattr(disk_file._data_file,
|
||||
'user.hash.algorithm', 'md5')
|
||||
xattr.setxattr(disk_file._data_file,
|
||||
'user.hash.state', 'Valid')
|
||||
xattr.setxattr(disk_file._data_file,
|
||||
'user.hash.filesize', str(upload_size))
|
||||
xattr.setxattr(disk_file._data_file,
|
||||
'user.hash.app', 'swiftonhpss')
|
||||
except IOError as err:
|
||||
raise SwiftOnFileSystemIOError(
|
||||
err.errno,
|
||||
'Could not write MD5 checksum to HPSS filesystem: '
|
||||
'%s' % err.strerror)
|
||||
xattr.setxattr(data_file,
|
||||
'user.hash.checksum', etag)
|
||||
xattr.setxattr(data_file,
|
||||
'user.hash.algorithm', 'md5')
|
||||
xattr.setxattr(data_file,
|
||||
'user.hash.state', 'Valid')
|
||||
xattr.setxattr(data_file,
|
||||
'user.hash.filesize', str(upload_size))
|
||||
xattr.setxattr(data_file,
|
||||
'user.hash.app', 'swiftonhpss')
|
||||
except IOError as err:
|
||||
raise SwiftOnFileSystemIOError(
|
||||
err.errno,
|
||||
'Could not write MD5 checksum to HPSS filesystem: '
|
||||
'%s' % err.strerror)
|
||||
|
||||
# Update container metadata
|
||||
if orig_delete_at != new_delete_at:
|
||||
@ -277,6 +273,7 @@ class ObjectController(server.ObjectController):
|
||||
self._object_symlink(request, disk_file._data_file, device,
|
||||
account)
|
||||
except SwiftOnFileSystemOSError:
|
||||
logging.debug('could not make account symlink')
|
||||
return HTTPServiceUnavailable(request=request)
|
||||
return HTTPCreated(request=request, etag=etag)
|
||||
|
||||
@ -373,7 +370,7 @@ class ObjectController(server.ObjectController):
|
||||
response.headers[key] = value
|
||||
response.etag = metadata['ETag']
|
||||
ts = Timestamp(metadata['X-Timestamp'])
|
||||
|
||||
response.last_modified = math.ceil(float(ts))
|
||||
# Needed for container sync feature
|
||||
response.headers['X-Timestamp'] = ts.normal
|
||||
response.headers['X-Backend-Timestamp'] = ts.internal
|
||||
@ -383,11 +380,15 @@ class ObjectController(server.ObjectController):
|
||||
except KeyError:
|
||||
pass
|
||||
|
||||
try:
|
||||
hpss_headers = disk_file.get_hpss_headers()
|
||||
response.headers.update(hpss_headers)
|
||||
except SwiftOnFileSystemIOError:
|
||||
return HTTPServiceUnavailable(request=request)
|
||||
# (HPSS) Inject HPSS xattr metadata into headers
|
||||
want_hpss_metadata = request.headers.get('X-HPSS-Get-Metadata',
|
||||
False)
|
||||
if config_true_value(want_hpss_metadata):
|
||||
try:
|
||||
hpss_headers = disk_file.read_hpss_system_metadata()
|
||||
response.headers.update(hpss_headers)
|
||||
except SwiftOnFileSystemIOError:
|
||||
return HTTPServiceUnavailable(request=request)
|
||||
|
||||
if 'X-Object-Sysmeta-Update-Container' in response.headers:
|
||||
self._sof_container_update(request, response)
|
||||
@ -449,11 +450,14 @@ class ObjectController(server.ObjectController):
|
||||
response.headers['X-Timestamp'] = file_x_ts.normal
|
||||
response.headers['X-Backend-Timestamp'] = file_x_ts.internal
|
||||
# (HPSS) Inject HPSS xattr metadata into headers
|
||||
try:
|
||||
hpss_headers = disk_file.get_hpss_headers()
|
||||
response.headers.update(hpss_headers)
|
||||
except SwiftOnFileSystemIOError:
|
||||
return HTTPServiceUnavailable(request=request)
|
||||
want_hpss_metadata = request.headers.get('X-HPSS-Get-Metadata',
|
||||
False)
|
||||
if config_true_value(want_hpss_metadata):
|
||||
try:
|
||||
hpss_headers = disk_file.read_hpss_system_metadata()
|
||||
response.headers.update(hpss_headers)
|
||||
except SwiftOnFileSystemIOError:
|
||||
return HTTPServiceUnavailable(request=request)
|
||||
return request.get_response(response)
|
||||
except (DiskFileNotExist, DiskFileQuarantined) as e:
|
||||
headers = {}
|
||||
@ -482,7 +486,7 @@ class ObjectController(server.ObjectController):
|
||||
return HTTPInsufficientStorage(drive=device, request=request)
|
||||
|
||||
# Set Purgelock status if we got it
|
||||
purgelock = request.headers.get('X-Object-Meta-PurgeLock')
|
||||
purgelock = request.headers.get('X-HPSS-Purgelock-Status')
|
||||
if purgelock:
|
||||
try:
|
||||
hpssfs.ioctl(disk_file._fd, hpssfs.HPSSFS_PURGE_LOCK,
|
||||
@ -493,6 +497,18 @@ class ObjectController(server.ObjectController):
|
||||
'%s, xattr.getxattr("%s", ...)' %
|
||||
(err.strerror, disk_file._fd))
|
||||
|
||||
# Set class of service if we got it
|
||||
cos = request.headers.get('X-HPSS-Class-Of-Service-ID')
|
||||
if cos:
|
||||
try:
|
||||
hpssfs.ioctl(disk_file._fd, hpssfs.HPSSFS_SET_COS_HINT,
|
||||
int(cos))
|
||||
except IOError as err:
|
||||
raise SwiftOnFileSystemIOError(
|
||||
err.errno,
|
||||
'%s, xattr.getxattr("%s", ...)' %
|
||||
(err.strerror, disk_file._fd))
|
||||
|
||||
# Update metadata from request
|
||||
try:
|
||||
orig_metadata = disk_file.read_metadata()
|
||||
@ -548,12 +564,12 @@ class ObjectController(server.ObjectController):
|
||||
orig_metadata = {}
|
||||
response_class = HTTPNotFound
|
||||
|
||||
# If the file got deleted outside of Swift, we won't see it. So just say
|
||||
# it got deleted, even if it never existed in the first place.
|
||||
# If the file got deleted outside of Swift, we won't see it.
|
||||
# So we say "file, what file?" and delete it from the container.
|
||||
except DiskFileNotExist:
|
||||
orig_timestamp = 0
|
||||
orig_metadata = {}
|
||||
response_class = HTTPNoContent
|
||||
response_class = HTTPNotFound
|
||||
except DiskFileQuarantined:
|
||||
orig_timestamp = 0
|
||||
orig_metadata = {}
|
||||
|
Loading…
x
Reference in New Issue
Block a user