
Added a fake_file class that implements minimal set of functions that are invoked by the code in GET. BUG: 987841 https://bugzilla.redhat.com/show_bug.cgi?id=987841 Change-Id: I5bdf5be1c0c4c8231f34c9be529e6edc83774f2e Signed-off-by: Chetan Risbud <crisbud@redhat.com> Reviewed-on: http://review.gluster.org/5511 Reviewed-by: Luis Pabon <lpabon@redhat.com> Tested-by: Luis Pabon <lpabon@redhat.com>
811 lines
35 KiB
Python
811 lines
35 KiB
Python
# Copyright (c) 2012-2013 Red Hat, Inc.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
|
# implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
|
|
import os
|
|
import stat
|
|
import fcntl
|
|
import errno
|
|
try:
|
|
from random import SystemRandom
|
|
random = SystemRandom()
|
|
except ImportError:
|
|
import random
|
|
import logging
|
|
from socket import gethostname
|
|
from hashlib import md5
|
|
from eventlet import sleep
|
|
from greenlet import getcurrent
|
|
from contextlib import contextmanager
|
|
from swift.common.utils import TRUE_VALUES, drop_buffer_cache, ThreadPool
|
|
from swift.common.exceptions import DiskFileNotExist, DiskFileError, \
|
|
DiskFileNoSpace, DiskFileDeviceUnavailable
|
|
|
|
from gluster.swift.common.exceptions import GlusterFileSystemOSError
|
|
from gluster.swift.common.Glusterfs import mount
|
|
from gluster.swift.common.fs_utils import do_fstat, do_open, do_close, \
|
|
do_unlink, do_chown, os_path, do_fsync, do_fchown, do_stat, Fake_file
|
|
from gluster.swift.common.utils import read_metadata, write_metadata, \
|
|
validate_object, create_object_metadata, rmobjdir, dir_is_object, \
|
|
get_object_metadata
|
|
from gluster.swift.common.utils import X_CONTENT_LENGTH, X_CONTENT_TYPE, \
|
|
X_TIMESTAMP, X_TYPE, X_OBJECT_TYPE, FILE, OBJECT, DIR_TYPE, \
|
|
FILE_TYPE, DEFAULT_UID, DEFAULT_GID, DIR_NON_OBJECT, DIR_OBJECT
|
|
from ConfigParser import ConfigParser, NoSectionError, NoOptionError
|
|
|
|
from swift.obj.diskfile import DiskFile as SwiftDiskFile
|
|
from swift.obj.diskfile import DiskWriter as SwiftDiskWriter
|
|
|
|
# FIXME: Hopefully we'll be able to move to Python 2.7+ where O_CLOEXEC will
|
|
# be back ported. See http://www.python.org/dev/peps/pep-0433/
|
|
O_CLOEXEC = 02000000
|
|
|
|
DEFAULT_DISK_CHUNK_SIZE = 65536
|
|
DEFAULT_BYTES_PER_SYNC = (512 * 1024 * 1024)
|
|
# keep these lower-case
|
|
DISALLOWED_HEADERS = set('content-length content-type deleted etag'.split())
|
|
|
|
MAX_RENAME_ATTEMPTS = 10
|
|
MAX_OPEN_ATTEMPTS = 10
|
|
|
|
_cur_pid = str(os.getpid())
|
|
_cur_host = str(gethostname())
|
|
|
|
|
|
def _random_sleep():
|
|
sleep(random.uniform(0.5, 0.15))
|
|
|
|
|
|
def _lock_parent(full_path):
|
|
parent_path, _ = full_path.rsplit(os.path.sep, 1)
|
|
try:
|
|
fd = os.open(parent_path, os.O_RDONLY | O_CLOEXEC)
|
|
except OSError as err:
|
|
if err.errno == errno.ENOENT:
|
|
# Cannot lock the parent because it does not exist, let the caller
|
|
# handle this situation.
|
|
return False
|
|
raise
|
|
else:
|
|
while True:
|
|
# Spin sleeping for 1/10th of a second until we get the lock.
|
|
# FIXME: Consider adding a final timeout just abort the operation.
|
|
try:
|
|
fcntl.flock(fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
|
|
except IOError as err:
|
|
if err.errno == errno.EAGAIN:
|
|
_random_sleep()
|
|
else:
|
|
# Don't leak an open file on an exception
|
|
os.close(fd)
|
|
raise
|
|
except Exception:
|
|
# Don't leak an open file for any other exception
|
|
os.close(fd)
|
|
raise
|
|
else:
|
|
break
|
|
return fd
|
|
|
|
|
|
def _make_directory_locked(full_path, uid, gid, metadata=None):
|
|
fd = _lock_parent(full_path)
|
|
if fd is False:
|
|
# Parent does not exist either, pass this situation on to the caller
|
|
# to handle.
|
|
return False, metadata
|
|
try:
|
|
# Check for directory existence
|
|
stats = do_stat(full_path)
|
|
if stats:
|
|
# It now exists, having acquired the lock of its parent directory,
|
|
# but verify it is actually a directory
|
|
is_dir = stat.S_ISDIR(stats.st_mode)
|
|
if not is_dir:
|
|
# It is not a directory!
|
|
raise DiskFileError("_make_directory_locked: non-directory"
|
|
" found at path %s when expecting a"
|
|
" directory", full_path)
|
|
return True, metadata
|
|
|
|
# We know the parent directory exists, and we have it locked, attempt
|
|
# the creation of the target directory.
|
|
return _make_directory_unlocked(full_path, uid, gid, metadata=metadata)
|
|
finally:
|
|
# We're done here, be sure to remove our lock and close our open FD.
|
|
try:
|
|
fcntl.flock(fd, fcntl.LOCK_UN)
|
|
except:
|
|
pass
|
|
os.close(fd)
|
|
|
|
|
|
def _make_directory_unlocked(full_path, uid, gid, metadata=None):
|
|
"""
|
|
Make a directory and change the owner ship as specified, and potentially
|
|
creating the object metadata if requested.
|
|
"""
|
|
try:
|
|
os.mkdir(full_path)
|
|
except OSError as err:
|
|
if err.errno == errno.ENOENT:
|
|
# Tell the caller some directory of the parent path does not
|
|
# exist.
|
|
return False, metadata
|
|
elif err.errno == errno.EEXIST:
|
|
# Possible race, in that the caller invoked this method when it
|
|
# had previously determined the file did not exist.
|
|
#
|
|
# FIXME: When we are confident, remove this stat() call as it is
|
|
# not necessary.
|
|
try:
|
|
stats = os.stat(full_path)
|
|
except OSError as serr:
|
|
# FIXME: Ideally we'd want to return an appropriate error
|
|
# message and code in the PUT Object REST API response.
|
|
raise DiskFileError("_make_directory_unlocked: os.mkdir failed"
|
|
" because path %s already exists, and"
|
|
" a subsequent os.stat on that same"
|
|
" path failed (%s)" % (full_path,
|
|
str(serr)))
|
|
else:
|
|
is_dir = stat.S_ISDIR(stats.st_mode)
|
|
if not is_dir:
|
|
# FIXME: Ideally we'd want to return an appropriate error
|
|
# message and code in the PUT Object REST API response.
|
|
raise DiskFileError("_make_directory_unlocked: os.mkdir"
|
|
" failed on path %s because it already"
|
|
" exists but not as a directory" % (
|
|
full_path))
|
|
return True, metadata
|
|
elif err.errno == errno.ENOTDIR:
|
|
# FIXME: Ideally we'd want to return an appropriate error
|
|
# message and code in the PUT Object REST API response.
|
|
raise DiskFileError("_make_directory_unlocked: os.mkdir failed"
|
|
" because some part of path %s is not in fact"
|
|
" a directory" % (full_path))
|
|
elif err.errno == errno.EIO:
|
|
# Sometimes Fuse will return an EIO error when it does not know
|
|
# how to handle an unexpected, but transient situation. It is
|
|
# possible the directory now exists, stat() it to find out after a
|
|
# short period of time.
|
|
_random_sleep()
|
|
try:
|
|
stats = os.stat(full_path)
|
|
except OSError as serr:
|
|
if serr.errno == errno.ENOENT:
|
|
errmsg = "_make_directory_unlocked: os.mkdir failed on" \
|
|
" path %s (EIO), and a subsequent os.stat on" \
|
|
" that same path did not find the file." % (
|
|
full_path,)
|
|
else:
|
|
errmsg = "_make_directory_unlocked: os.mkdir failed on" \
|
|
" path %s (%s), and a subsequent os.stat on" \
|
|
" that same path failed as well (%s)" % (
|
|
full_path, str(err), str(serr))
|
|
raise DiskFileError(errmsg)
|
|
else:
|
|
# The directory at least exists now
|
|
is_dir = stat.S_ISDIR(stats.st_mode)
|
|
if is_dir:
|
|
# Dump the stats to the log with the original exception.
|
|
logging.warn("_make_directory_unlocked: os.mkdir initially"
|
|
" failed on path %s (%s) but a stat()"
|
|
" following that succeeded: %r" % (full_path,
|
|
str(err),
|
|
stats))
|
|
# Assume another entity took care of the proper setup.
|
|
return True, metadata
|
|
else:
|
|
raise DiskFileError("_make_directory_unlocked: os.mkdir"
|
|
" initially failed on path %s (%s) but"
|
|
" now we see that it exists but is not"
|
|
" a directory (%r)" % (full_path,
|
|
str(err),
|
|
stats))
|
|
else:
|
|
# Some other potentially rare exception occurred that does not
|
|
# currently warrant a special log entry to help diagnose.
|
|
raise DiskFileError("_make_directory_unlocked: os.mkdir failed on"
|
|
" path %s (%s)" % (full_path, str(err)))
|
|
else:
|
|
if metadata:
|
|
# We were asked to set the initial metadata for this object.
|
|
metadata_orig = get_object_metadata(full_path)
|
|
metadata_orig.update(metadata)
|
|
write_metadata(full_path, metadata_orig)
|
|
metadata = metadata_orig
|
|
|
|
# We created it, so we are reponsible for always setting the proper
|
|
# ownership.
|
|
do_chown(full_path, uid, gid)
|
|
return True, metadata
|
|
|
|
|
|
_fs_conf = ConfigParser()
|
|
if _fs_conf.read(os.path.join('/etc/swift', 'fs.conf')):
|
|
try:
|
|
_mkdir_locking = _fs_conf.get('DEFAULT', 'mkdir_locking', "no") \
|
|
in TRUE_VALUES
|
|
except (NoSectionError, NoOptionError):
|
|
_mkdir_locking = False
|
|
try:
|
|
_use_put_mount = _fs_conf.get('DEFAULT', 'use_put_mount', "no") \
|
|
in TRUE_VALUES
|
|
except (NoSectionError, NoOptionError):
|
|
_use_put_mount = False
|
|
else:
|
|
_mkdir_locking = False
|
|
_use_put_mount = False
|
|
|
|
if _mkdir_locking:
|
|
make_directory = _make_directory_locked
|
|
else:
|
|
make_directory = _make_directory_unlocked
|
|
|
|
|
|
def _adjust_metadata(metadata):
|
|
# Fix up the metadata to ensure it has a proper value for the
|
|
# Content-Type metadata, as well as an X_TYPE and X_OBJECT_TYPE
|
|
# metadata values.
|
|
content_type = metadata[X_CONTENT_TYPE]
|
|
if not content_type:
|
|
# FIXME: How can this be that our caller supplied us with metadata
|
|
# that has a content type that evaluates to False?
|
|
#
|
|
# FIXME: If the file exists, we would already know it is a
|
|
# directory. So why are we assuming it is a file object?
|
|
metadata[X_CONTENT_TYPE] = FILE_TYPE
|
|
metadata[X_OBJECT_TYPE] = FILE
|
|
else:
|
|
if content_type.lower() == DIR_TYPE:
|
|
metadata[X_OBJECT_TYPE] = DIR_OBJECT
|
|
else:
|
|
metadata[X_OBJECT_TYPE] = FILE
|
|
|
|
metadata[X_TYPE] = OBJECT
|
|
return metadata
|
|
|
|
|
|
class DiskWriter(SwiftDiskWriter):
|
|
"""
|
|
Encapsulation of the write context for servicing PUT REST API
|
|
requests. Serves as the context manager object for DiskFile's writer()
|
|
method.
|
|
|
|
We just override the put() method for Gluster.
|
|
"""
|
|
def put(self, metadata, extension='.data'):
|
|
"""
|
|
Finalize writing the file on disk, and renames it from the temp file
|
|
to the real location. This should be called after the data has been
|
|
written to the temp file.
|
|
|
|
:param metadata: dictionary of metadata to be written
|
|
:param extension: extension to be used when making the file
|
|
"""
|
|
# Our caller will use '.data' here; we just ignore it since we map the
|
|
# URL directly to the file system.
|
|
|
|
assert self.tmppath is not None
|
|
metadata = _adjust_metadata(metadata)
|
|
df = self.disk_file
|
|
|
|
if dir_is_object(metadata):
|
|
if not df.data_file:
|
|
# Does not exist, create it
|
|
data_file = os.path.join(df._obj_path, df._obj)
|
|
_, df.metadata = self.threadpool.force_run_in_thread(
|
|
df._create_dir_object, data_file, metadata)
|
|
df.data_file = os.path.join(df._container_path, data_file)
|
|
elif not df.is_dir:
|
|
# Exists, but as a file
|
|
raise DiskFileError('DiskFile.put(): directory creation failed'
|
|
' since the target, %s, already exists as'
|
|
' a file' % df.data_file)
|
|
return
|
|
|
|
if df._is_dir:
|
|
# A pre-existing directory already exists on the file
|
|
# system, perhaps gratuitously created when another
|
|
# object was created, or created externally to Swift
|
|
# REST API servicing (UFO use case).
|
|
raise DiskFileError('DiskFile.put(): file creation failed since'
|
|
' the target, %s, already exists as a'
|
|
' directory' % df.data_file)
|
|
|
|
def finalize_put():
|
|
# Write out metadata before fsync() to ensure it is also forced to
|
|
# disk.
|
|
write_metadata(self.fd, metadata)
|
|
|
|
# We call fsync() before calling drop_cache() to lower the
|
|
# amount of redundant work the drop cache code will perform on
|
|
# the pages (now that after fsync the pages will be all
|
|
# clean).
|
|
do_fsync(self.fd)
|
|
# From the Department of the Redundancy Department, make sure
|
|
# we call drop_cache() after fsync() to avoid redundant work
|
|
# (pages all clean).
|
|
drop_buffer_cache(self.fd, 0, self.upload_size)
|
|
|
|
# At this point we know that the object's full directory path
|
|
# exists, so we can just rename it directly without using Swift's
|
|
# swift.common.utils.renamer(), which makes the directory path and
|
|
# adds extra stat() calls.
|
|
data_file = os.path.join(df.put_datadir, df._obj)
|
|
attempts = 1
|
|
while True:
|
|
try:
|
|
os.rename(self.tmppath, data_file)
|
|
except OSError as err:
|
|
if err.errno in (errno.ENOENT, errno.EIO) \
|
|
and attempts < MAX_RENAME_ATTEMPTS:
|
|
# FIXME: Why either of these two error conditions is
|
|
# happening is unknown at this point. This might be a
|
|
# FUSE issue of some sort or a possible race
|
|
# condition. So let's sleep on it, and double check
|
|
# the environment after a good nap.
|
|
_random_sleep()
|
|
# Tease out why this error occurred. The man page for
|
|
# rename reads:
|
|
# "The link named by tmppath does not exist; or, a
|
|
# directory component in data_file does not exist;
|
|
# or, tmppath or data_file is an empty string."
|
|
assert len(self.tmppath) > 0 and len(data_file) > 0
|
|
tpstats = do_stat(self.tmppath)
|
|
tfstats = do_fstat(self.fd)
|
|
assert tfstats
|
|
if not tpstats or tfstats.st_ino != tpstats.st_ino:
|
|
# Temporary file name conflict
|
|
raise DiskFileError(
|
|
'DiskFile.put(): temporary file, %s, was'
|
|
' already renamed (targeted for %s)' % (
|
|
self.tmppath, data_file))
|
|
else:
|
|
# Data file target name now has a bad path!
|
|
dfstats = do_stat(df.put_datadir)
|
|
if not dfstats:
|
|
raise DiskFileError(
|
|
'DiskFile.put(): path to object, %s, no'
|
|
' longer exists (targeted for %s)' % (
|
|
df.put_datadir,
|
|
data_file))
|
|
else:
|
|
is_dir = stat.S_ISDIR(dfstats.st_mode)
|
|
if not is_dir:
|
|
raise DiskFileError(
|
|
'DiskFile.put(): path to object, %s,'
|
|
' no longer a directory (targeted for'
|
|
' %s)' % (df.put_datadir,
|
|
data_file))
|
|
else:
|
|
# Let's retry since everything looks okay
|
|
logging.warn(
|
|
"DiskFile.put(): os.rename('%s','%s')"
|
|
" initially failed (%s) but a"
|
|
" stat('%s') following that succeeded:"
|
|
" %r" % (
|
|
self.tmppath, data_file,
|
|
str(err), df.put_datadir,
|
|
dfstats))
|
|
attempts += 1
|
|
continue
|
|
else:
|
|
raise GlusterFileSystemOSError(
|
|
err.errno, "%s, os.rename('%s', '%s')" % (
|
|
err.strerror, self.tmppath, data_file))
|
|
else:
|
|
# Success!
|
|
break
|
|
# Close here so the calling context does not have to perform this
|
|
# in a thread.
|
|
do_close(self.fd)
|
|
|
|
self.threadpool.force_run_in_thread(finalize_put)
|
|
|
|
# Avoid the unlink() system call as part of the mkstemp context
|
|
# cleanup
|
|
self.tmppath = None
|
|
|
|
df.metadata = metadata
|
|
df._filter_metadata()
|
|
|
|
# Mark that it actually exists now
|
|
df.data_file = os.path.join(df.datadir, df._obj)
|
|
|
|
|
|
class DiskFile(SwiftDiskFile):
|
|
"""
|
|
Manage object files on disk.
|
|
|
|
Object names ending or beginning with a '/' as in /a, a/, /a/b/,
|
|
etc, or object names with multiple consecutive slahes, like a//b,
|
|
are not supported. The proxy server's contraints filter
|
|
gluster.common.constrains.gluster_check_object_creation() should
|
|
reject such requests.
|
|
|
|
:param path: path to devices on the node/mount path for UFO.
|
|
:param device: device name/account_name for UFO.
|
|
:param partition: partition on the device the object lives in
|
|
:param account: account name for the object
|
|
:param container: container name for the object
|
|
:param obj: object name for the object
|
|
:param logger: logger object for writing out log file messages
|
|
:param keep_data_fp: if True, don't close the fp, otherwise close it
|
|
:param disk_chunk_Size: size of chunks on file reads
|
|
:param bytes_per_sync: number of bytes between fdatasync calls
|
|
:param iter_hook: called when __iter__ returns a chunk
|
|
:param threadpool: thread pool in which to do blocking operations
|
|
:param obj_dir: ignored
|
|
:param mount_check: check the target device is a mount point and not on the
|
|
root volume
|
|
:param uid: user ID disk object should assume (file or directory)
|
|
:param gid: group ID disk object should assume (file or directory)
|
|
"""
|
|
|
|
def __init__(self, path, device, partition, account, container, obj,
|
|
logger, keep_data_fp=False,
|
|
disk_chunk_size=DEFAULT_DISK_CHUNK_SIZE,
|
|
bytes_per_sync=DEFAULT_BYTES_PER_SYNC, iter_hook=None,
|
|
threadpool=None, obj_dir='objects', mount_check=False,
|
|
disallowed_metadata_keys=None, uid=DEFAULT_UID,
|
|
gid=DEFAULT_GID):
|
|
if mount_check and not mount(path, device):
|
|
raise DiskFileDeviceUnavailable()
|
|
self.disk_chunk_size = disk_chunk_size
|
|
self.bytes_per_sync = bytes_per_sync
|
|
self.iter_hook = iter_hook
|
|
self.threadpool = threadpool or ThreadPool(nthreads=0)
|
|
obj = obj.strip(os.path.sep)
|
|
|
|
if os.path.sep in obj:
|
|
self._obj_path, self._obj = os.path.split(obj)
|
|
else:
|
|
self._obj_path = ''
|
|
self._obj = obj
|
|
|
|
if self._obj_path:
|
|
self.name = os.path.join(container, self._obj_path)
|
|
else:
|
|
self.name = container
|
|
# Absolute path for object directory.
|
|
self.datadir = os.path.join(path, device, self.name)
|
|
self.device_path = os.path.join(path, device)
|
|
self._container_path = os.path.join(path, device, container)
|
|
if _use_put_mount:
|
|
self.put_datadir = os.path.join(self.device_path + '_PUT',
|
|
self.name)
|
|
else:
|
|
self.put_datadir = self.datadir
|
|
self._is_dir = False
|
|
self.logger = logger
|
|
self.metadata = {}
|
|
self.meta_file = None
|
|
self.fp = None
|
|
self.iter_etag = None
|
|
self.started_at_0 = False
|
|
self.read_to_eof = False
|
|
self.quarantined_dir = None
|
|
self.keep_cache = False
|
|
self.uid = int(uid)
|
|
self.gid = int(gid)
|
|
self.suppress_file_closing = False
|
|
|
|
# Don't store a value for data_file until we know it exists.
|
|
self.data_file = None
|
|
data_file = os.path.join(self.put_datadir, self._obj)
|
|
|
|
try:
|
|
stats = do_stat(data_file)
|
|
except OSError as err:
|
|
if err.errno == errno.ENOTDIR:
|
|
return
|
|
else:
|
|
if not stats:
|
|
return
|
|
|
|
self.data_file = data_file
|
|
self._is_dir = stat.S_ISDIR(stats.st_mode)
|
|
|
|
self.metadata = read_metadata(data_file)
|
|
if not self.metadata:
|
|
create_object_metadata(data_file)
|
|
self.metadata = read_metadata(data_file)
|
|
|
|
if not validate_object(self.metadata):
|
|
create_object_metadata(data_file)
|
|
self.metadata = read_metadata(data_file)
|
|
|
|
self._filter_metadata()
|
|
|
|
if keep_data_fp:
|
|
if not self._is_dir:
|
|
# The caller has an assumption that the "fp" field of this
|
|
# object is an file object if keep_data_fp is set. However,
|
|
# this implementation of the DiskFile object does not need to
|
|
# open the file for internal operations. So if the caller
|
|
# requests it, we'll just open the file for them.
|
|
self.fp = do_open(data_file, 'rb')
|
|
else:
|
|
self.fp = Fake_file(data_file)
|
|
|
|
def drop_cache(self, fd, offset, length):
|
|
if fd >= 0:
|
|
super(DiskFile, self).drop_cache(fd, offset, length)
|
|
|
|
def close(self, verify_file=True):
|
|
"""
|
|
Close the file. Will handle quarantining file if necessary.
|
|
|
|
:param verify_file: Defaults to True. If false, will not check
|
|
file to see if it needs quarantining.
|
|
"""
|
|
if self.fp:
|
|
do_close(self.fp)
|
|
self.fp = None
|
|
|
|
def _filter_metadata(self):
|
|
if X_TYPE in self.metadata:
|
|
self.metadata.pop(X_TYPE)
|
|
if X_OBJECT_TYPE in self.metadata:
|
|
self.metadata.pop(X_OBJECT_TYPE)
|
|
|
|
def _create_dir_object(self, dir_path, metadata=None):
|
|
"""
|
|
Create a directory object at the specified path. No check is made to
|
|
see if the directory object already exists, that is left to the caller
|
|
(this avoids a potentially duplicate stat() system call).
|
|
|
|
The "dir_path" must be relative to its container, self._container_path.
|
|
|
|
The "metadata" object is an optional set of metadata to apply to the
|
|
newly created directory object. If not present, no initial metadata is
|
|
applied.
|
|
|
|
The algorithm used is as follows:
|
|
|
|
1. An attempt is made to create the directory, assuming the parent
|
|
directory already exists
|
|
|
|
* Directory creation races are detected, returning success in
|
|
those cases
|
|
|
|
2. If the directory creation fails because some part of the path to
|
|
the directory does not exist, then a search back up the path is
|
|
performed to find the first existing ancestor directory, and then
|
|
the missing parents are successively created, finally creating
|
|
the target directory
|
|
"""
|
|
full_path = os.path.join(self._container_path, dir_path)
|
|
cur_path = full_path
|
|
stack = []
|
|
while True:
|
|
md = None if cur_path != full_path else metadata
|
|
ret, newmd = make_directory(cur_path, self.uid, self.gid, md)
|
|
if ret:
|
|
break
|
|
# Some path of the parent did not exist, so loop around and
|
|
# create that, pushing this parent on the stack.
|
|
if os.path.sep not in cur_path:
|
|
raise DiskFileError("DiskFile._create_dir_object(): failed to"
|
|
" create directory path while exhausting"
|
|
" path elements to create: %s" % full_path)
|
|
cur_path, child = cur_path.rsplit(os.path.sep, 1)
|
|
assert child
|
|
stack.append(child)
|
|
|
|
child = stack.pop() if stack else None
|
|
while child:
|
|
cur_path = os.path.join(cur_path, child)
|
|
md = None if cur_path != full_path else metadata
|
|
ret, newmd = make_directory(cur_path, self.uid, self.gid, md)
|
|
if not ret:
|
|
raise DiskFileError("DiskFile._create_dir_object(): failed to"
|
|
" create directory path to target, %s,"
|
|
" on subpath: %s" % (full_path, cur_path))
|
|
child = stack.pop() if stack else None
|
|
return True, newmd
|
|
|
|
@contextmanager
|
|
def writer(self, size=None):
|
|
"""
|
|
Contextmanager to make a temporary file, optionally of a specified
|
|
initial size.
|
|
|
|
For Gluster, we first optimistically create the temporary file using
|
|
the "rsync-friendly" .NAME.random naming. If we find that some path to
|
|
the file does not exist, we then create that path and then create the
|
|
temporary file again. If we get file name conflict, we'll retry using
|
|
different random suffixes 1,000 times before giving up.
|
|
"""
|
|
data_file = os.path.join(self.put_datadir, self._obj)
|
|
|
|
# Assume the full directory path exists to the file already, and
|
|
# construct the proper name for the temporary file.
|
|
attempts = 1
|
|
cur_thread = str(getcurrent())
|
|
while True:
|
|
postfix = md5(self._obj + _cur_host + _cur_pid + cur_thread
|
|
+ str(random.random())).hexdigest()
|
|
tmpfile = '.' + self._obj + '.' + postfix
|
|
tmppath = os.path.join(self.put_datadir, tmpfile)
|
|
try:
|
|
fd = do_open(tmppath,
|
|
os.O_WRONLY | os.O_CREAT | os.O_EXCL | O_CLOEXEC)
|
|
except GlusterFileSystemOSError as gerr:
|
|
if gerr.errno == errno.ENOSPC:
|
|
# Raise DiskFileNoSpace to be handled by upper layers
|
|
raise DiskFileNoSpace()
|
|
if gerr.errno not in (errno.ENOENT, errno.EEXIST, errno.EIO):
|
|
# FIXME: Other cases we should handle?
|
|
raise
|
|
if attempts >= MAX_OPEN_ATTEMPTS:
|
|
# We failed after N attempts to create the temporary
|
|
# file.
|
|
raise DiskFileError('DiskFile.mkstemp(): failed to'
|
|
' successfully create a temporary file'
|
|
' without running into a name conflict'
|
|
' after %d of %d attempts for: %s' % (
|
|
attempts, MAX_OPEN_ATTEMPTS,
|
|
data_file))
|
|
if gerr.errno == errno.EEXIST:
|
|
# Retry with a different random number.
|
|
attempts += 1
|
|
elif gerr.errno == errno.EIO:
|
|
# FIXME: Possible FUSE issue or race condition, let's
|
|
# sleep on it and retry the operation.
|
|
_random_sleep()
|
|
logging.warn("DiskFile.mkstemp(): %s ... retrying in"
|
|
" 0.1 secs", gerr)
|
|
attempts += 1
|
|
elif not self._obj_path:
|
|
# No directory hierarchy and the create failed telling us
|
|
# the container or volume directory does not exist. This
|
|
# could be a FUSE issue or some race condition, so let's
|
|
# sleep a bit and retry.
|
|
_random_sleep()
|
|
logging.warn("DiskFile.mkstemp(): %s ... retrying in"
|
|
" 0.1 secs", gerr)
|
|
attempts += 1
|
|
elif attempts > 1:
|
|
# Got ENOENT after previously making the path. This could
|
|
# also be a FUSE issue or some race condition, nap and
|
|
# retry.
|
|
_random_sleep()
|
|
logging.warn("DiskFile.mkstemp(): %s ... retrying in"
|
|
" 0.1 secs" % gerr)
|
|
attempts += 1
|
|
else:
|
|
# It looks like the path to the object does not already
|
|
# exist; don't count this as an attempt, though, since
|
|
# we perform the open() system call optimistically.
|
|
self._create_dir_object(self._obj_path)
|
|
else:
|
|
break
|
|
dw = None
|
|
try:
|
|
# Ensure it is properly owned before we make it available.
|
|
do_fchown(fd, self.uid, self.gid)
|
|
# NOTE: we do not perform the fallocate() call at all. We ignore
|
|
# it completely.
|
|
dw = DiskWriter(self, fd, tmppath, self.threadpool)
|
|
yield dw
|
|
finally:
|
|
try:
|
|
if dw.fd:
|
|
do_close(dw.fd)
|
|
except OSError:
|
|
pass
|
|
if dw.tmppath:
|
|
do_unlink(dw.tmppath)
|
|
|
|
def put_metadata(self, metadata, tombstone=False):
|
|
"""
|
|
Short hand for putting metadata to .meta and .ts files.
|
|
|
|
:param metadata: dictionary of metadata to be written
|
|
:param tombstone: whether or not we are writing a tombstone
|
|
"""
|
|
if tombstone:
|
|
# We don't write tombstone files. So do nothing.
|
|
return
|
|
assert self.data_file is not None, \
|
|
"put_metadata: no file to put metadata into"
|
|
metadata = _adjust_metadata(metadata)
|
|
self.threadpool.run_in_thread(write_metadata, self.data_file, metadata)
|
|
self.metadata = metadata
|
|
self._filter_metadata()
|
|
|
|
def unlinkold(self, timestamp):
|
|
"""
|
|
Remove any older versions of the object file. Any file that has an
|
|
older timestamp than timestamp will be deleted.
|
|
|
|
:param timestamp: timestamp to compare with each file
|
|
"""
|
|
if not self.metadata or self.metadata[X_TIMESTAMP] >= timestamp:
|
|
return
|
|
|
|
assert self.data_file, \
|
|
"Have metadata, %r, but no data_file" % self.metadata
|
|
|
|
def _unlinkold():
|
|
if self._is_dir:
|
|
# Marker, or object, directory.
|
|
#
|
|
# Delete from the filesystem only if it contains no objects.
|
|
# If it does contain objects, then just remove the object
|
|
# metadata tag which will make this directory a
|
|
# fake-filesystem-only directory and will be deleted when the
|
|
# container or parent directory is deleted.
|
|
metadata = read_metadata(self.data_file)
|
|
if dir_is_object(metadata):
|
|
metadata[X_OBJECT_TYPE] = DIR_NON_OBJECT
|
|
write_metadata(self.data_file, metadata)
|
|
rmobjdir(self.data_file)
|
|
else:
|
|
# Delete file object
|
|
do_unlink(self.data_file)
|
|
|
|
# Garbage collection of non-object directories. Now that we
|
|
# deleted the file, determine if the current directory and any
|
|
# parent directory may be deleted.
|
|
dirname = os.path.dirname(self.data_file)
|
|
while dirname and dirname != self._container_path:
|
|
# Try to remove any directories that are not objects.
|
|
if not rmobjdir(dirname):
|
|
# If a directory with objects has been found, we can stop
|
|
# garabe collection
|
|
break
|
|
else:
|
|
dirname = os.path.dirname(dirname)
|
|
|
|
self.threadpool.run_in_thread(_unlinkold)
|
|
|
|
self.metadata = {}
|
|
self.data_file = None
|
|
|
|
def get_data_file_size(self):
|
|
"""
|
|
Returns the os_path.getsize for the file. Raises an exception if this
|
|
file does not match the Content-Length stored in the metadata, or if
|
|
self.data_file does not exist.
|
|
|
|
:returns: file size as an int
|
|
:raises DiskFileError: on file size mismatch.
|
|
:raises DiskFileNotExist: on file not existing (including deleted)
|
|
"""
|
|
if self._is_dir:
|
|
# Directories have no size.
|
|
return 0
|
|
try:
|
|
file_size = 0
|
|
if self.data_file:
|
|
def _old_getsize():
|
|
file_size = os_path.getsize(self.data_file)
|
|
if X_CONTENT_LENGTH in self.metadata:
|
|
metadata_size = int(self.metadata[X_CONTENT_LENGTH])
|
|
if file_size != metadata_size:
|
|
# FIXME - bit rot detection?
|
|
self.metadata[X_CONTENT_LENGTH] = file_size
|
|
write_metadata(self.data_file, self.metadata)
|
|
return file_size
|
|
file_size = self.threadpool.run_in_thread(_old_getsize)
|
|
return file_size
|
|
except OSError as err:
|
|
if err.errno != errno.ENOENT:
|
|
raise
|
|
raise DiskFileNotExist('Data File does not exist.')
|