diff --git a/test/unit/common/test_fs_utils.py b/test/unit/common/test_fs_utils.py index 186e07d..28354ee 100644 --- a/test/unit/common/test_fs_utils.py +++ b/test/unit/common/test_fs_utils.py @@ -16,19 +16,29 @@ import os import shutil import random +import errno import unittest +import eventlet +from nose import SkipTest +from mock import patch from tempfile import mkdtemp, mkstemp from gluster.swift.common import fs_utils as fs from gluster.swift.common.exceptions import NotDirectoryError, \ FileOrDirNotFoundError +def mock_os_fsync(fd): + return True + +def mock_tpool_execute(func, *args, **kwargs): + func(*args, **kwargs) + class TestUtils(unittest.TestCase): """ Tests for common.utils """ def test_do_walk(self): + # create directory structure + tmpparent = mkdtemp() try: - # create directory structure - tmpparent = mkdtemp() tmpdirs = [] tmpfiles = [] for i in range(5): @@ -44,9 +54,10 @@ class TestUtils(unittest.TestCase): finally: shutil.rmtree(tmpparent) + def test_do_open(self): + fd, tmpfile = mkstemp() try: - fd, tmpfile = mkstemp() f = fs.do_open(tmpfile, 'r') try: f.write('test') @@ -54,11 +65,13 @@ class TestUtils(unittest.TestCase): pass else: self.fail("IOError expected") + finally: + f.close() finally: - f.close() os.close(fd) os.remove(tmpfile) + def test_do_open_err(self): try: fs.do_open(os.path.join('/tmp', str(random.random())), 'r') @@ -67,9 +80,18 @@ class TestUtils(unittest.TestCase): else: self.fail("IOError expected") - def test_do_write(self): + def test_do_open_err_int_mode(self): + try: + fs.do_open(os.path.join('/tmp', str(random.random())), + os.O_RDONLY) + except OSError: + pass + else: + self.fail("IOError expected") + + def test_do_write(self): + fd, tmpfile = mkstemp() try: - fd, tmpfile = mkstemp() cnt = fs.do_write(fd, "test") assert cnt == len("test") finally: @@ -77,17 +99,22 @@ class TestUtils(unittest.TestCase): os.remove(tmpfile) def test_do_write_err(self): + fd, tmpfile = mkstemp() try: - fd, tmpfile = mkstemp() fd1 = os.open(tmpfile, os.O_RDONLY) - fs.do_write(fd1, "test") - except OSError: - pass + try: + fs.do_write(fd1, "test") + except OSError: + pass + else: + self.fail("OSError expected") + except OSError as ose: + self.fail("Open failed with %s" %ose.strerror) else: - self.fail("OSError expected") + os.close(fd1) finally: os.close(fd) - os.close(fd1) + os.remove(tmpfile) def test_do_mkdir(self): try: @@ -119,8 +146,8 @@ class TestUtils(unittest.TestCase): shutil.rmtree(subdir) def test_do_listdir(self): + tmpdir = mkdtemp() try: - tmpdir = mkdtemp() subdir = [] for i in range(5): subdir.append(mkdtemp(dir=tmpdir).rsplit(os.path.sep, 1)[1]) @@ -139,8 +166,8 @@ class TestUtils(unittest.TestCase): self.fail("OSError expected") def test_do_stat(self): + tmpdir = mkdtemp() try: - tmpdir = mkdtemp() fd, tmpfile = mkstemp(dir=tmpdir) buf1 = os.stat(tmpfile) buf2 = fs.do_stat(fd) @@ -162,8 +189,8 @@ class TestUtils(unittest.TestCase): self.fail("OSError expected") def test_do_close(self): + fd, tmpfile = mkstemp() try: - fd, tmpfile = mkstemp() fs.do_close(fd); try: os.write(fd, "test") @@ -176,9 +203,23 @@ class TestUtils(unittest.TestCase): finally: os.remove(tmpfile) - def test_do_unlink(self): + def test_do_close_err(self): + fd, tmpfile = mkstemp() + try: + fs.do_close(fd); + + try: + fs.do_close(fd); + except OSError: + pass + else: + self.fail("OSError expected") + finally: + os.remove(tmpfile) + + def test_do_unlink(self): + fd, tmpfile = mkstemp() try: - fd, tmpfile = mkstemp() fs.do_unlink(tmpfile) assert not os.path.exists(tmpfile) assert fs.do_unlink(os.path.join('/tmp', str(random.random()))) @@ -186,8 +227,8 @@ class TestUtils(unittest.TestCase): os.close(fd) def test_do_unlink_err(self): + tmpdir = mkdtemp() try: - tmpdir = mkdtemp() fs.do_unlink(tmpdir) except OSError: pass @@ -203,8 +244,8 @@ class TestUtils(unittest.TestCase): assert not fs.do_rmdir(os.path.join('/tmp', str(random.random()))) def test_do_rmdir_err(self): + fd, tmpfile = mkstemp() try: - fd, tmpfile = mkstemp() fs.do_rmdir(tmpfile) except OSError: pass @@ -215,8 +256,8 @@ class TestUtils(unittest.TestCase): os.remove(tmpfile) def test_do_rename(self): + srcpath = mkdtemp() try: - srcpath = mkdtemp() destpath = os.path.join('/tmp', str(random.random())) fs.do_rename(srcpath, destpath) assert not os.path.exists(srcpath) @@ -235,8 +276,8 @@ class TestUtils(unittest.TestCase): self.fail("OSError expected") def test_dir_empty(self): + tmpdir = mkdtemp() try: - tmpdir = mkdtemp() subdir = mkdtemp(dir=tmpdir) assert not fs.dir_empty(tmpdir) assert fs.dir_empty(subdir) @@ -264,8 +305,8 @@ class TestUtils(unittest.TestCase): os.unlink(tmpfile) def test_rmdirs(self): + tmpdir = mkdtemp() try: - tmpdir = mkdtemp() subdir = mkdtemp(dir=tmpdir) fd, tmpfile = mkstemp(dir=tmpdir) assert not fs.rmdirs(tmpfile) @@ -275,3 +316,85 @@ class TestUtils(unittest.TestCase): finally: os.close(fd) shutil.rmtree(tmpdir) + + def test_chown_dir(self): + tmpdir = mkdtemp() + try: + subdir = mkdtemp(dir=tmpdir) + buf = os.stat(subdir) + if buf.st_uid == 0: + raise SkipTest + else: + try: + fs.do_chown(subdir, 20000, 20000) + except OSError as ex: + if ex.errno != errno.EPERM: + self.fail("Expected OSError") + else: + self.fail("Expected OSError") + finally: + shutil.rmtree(tmpdir) + + def test_chown_file(self): + tmpdir = mkdtemp() + try: + fd, tmpfile = mkstemp(dir=tmpdir) + buf = os.stat(tmpfile) + if buf.st_uid == 0: + raise SkipTest + else: + try: + fs.do_chown(tmpfile, 20000, 20000) + except OSError as ex: + if ex.errno != errno.EPERM: + self.fail("Expected OSError") + else: + self.fail("Expected OSError") + finally: + os.close(fd) + shutil.rmtree(tmpdir) + + def test_chown_file_err(self): + try: + fs.do_chown(os.path.join('/tmp', str(random.random())), + 20000, 20000) + except OSError: + pass + else: + self.fail("Expected OSError") + + + def test_do_fsync(self): + tmpdir = mkdtemp() + try: + fd, tmpfile = mkstemp(dir=tmpdir) + try: + os.write(fd, 'test') + with patch('eventlet.tpool.execute', mock_tpool_execute): + with patch('os.fsync', mock_os_fsync): + assert fs.do_fsync(fd) + except OSError as ose: + self.fail('Opening a temporary file failed with %s' %ose.strerror) + else: + os.close(fd) + finally: + shutil.rmtree(tmpdir) + + + def test_do_fsync_err(self): + tmpdir = mkdtemp() + try: + fd, tmpfile = mkstemp(dir=tmpdir) + os.write(fd, 'test') + with patch('eventlet.tpool.execute', mock_tpool_execute): + with patch('os.fsync', mock_os_fsync): + assert fs.do_fsync(fd) + os.close(fd) + try: + fs.do_fsync(fd) + except OSError: + pass + else: + self.fail("Expected OSError") + finally: + shutil.rmtree(tmpdir)