diff --git a/setup.cfg b/setup.cfg index 7474359..c1d9db4 100644 --- a/setup.cfg +++ b/setup.cfg @@ -1,6 +1,6 @@ [metadata] name = shoebox -version = 0.5 +version = 0.6 author = Dark Secret Software Inc. author-email = admin@darksecretsoftware.com summary = data archiving library diff --git a/shoebox/roll_manager.py b/shoebox/roll_manager.py index 7512aed..151f9c4 100644 --- a/shoebox/roll_manager.py +++ b/shoebox/roll_manager.py @@ -13,6 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import datetime import fnmatch import hashlib import os @@ -142,24 +143,38 @@ class WritingRollManager(RollManager): class WritingJSONRollManager(object): - """No archiver. No roll checker. Just write the file locally. - Once the working_directory gets big enough, the files are - .tar.gz'ed into the destination_directory. Then - the working_directory is erased. + """No archiver. No roll checker. Just write 1 file line per json payload. + Once the file gets big enough, .tar.gz the file and move + into the destination_directory. Expects an external tool like rsync to move the file. - A SHA-256 of the payload may be included in the filename.""" + A SHA-256 of the payload may be included in the tarball filename.""" def __init__(self, *args, **kwargs): self.filename_template = args[0] self.directory = kwargs.get('directory', '.') self.destination_directory = kwargs.get('destination_directory', '.') self.roll_size_mb = int(kwargs.get('roll_size_mb', 1000)) + minutes = kwargs.get('roll_minutes', 15) + self.roll_after = datetime.timedelta(minutes=minutes) - # Read the directory at the start, but incrementally - # update the size as we write more files. Doing a full - # disk stat every time is way too expensive. - # Of course, this means we are not accounting for - # block size. - self.directory_size = self._get_directory_size() + # Look in the working directory for any files. Move them to the + # destination directory before we start. This implies we + # have to make sure multiple workers don't point at the same + # working directory. + self.handle = None + self.size = 0 + self.start_time = self._get_time() + + self._archive_working_files() + + def _get_time(self): + # Broken out for testing ... + return datetime.datetime.utcnow() + + def _archive_working_files(self): + for f in os.listdir(self.directory): + full = os.path.join(self.directory, f) + if os.path.isfile(full): + self._do_roll(full) def _make_filename(self, crc, prefix): now = notification_utils.now() @@ -172,53 +187,65 @@ class WritingJSONRollManager(object): f = f.replace("[[TIMESTAMP]]", dt) return os.path.join(prefix, f) - def _should_tar(self): - return (self.directory_size / 1048576) >= self.roll_size_mb + def _should_roll(self, size): + return ((size / 1048576) >= self.roll_size_mb or + (size > 0 and + self._get_time() >= (self.start_time + self.roll_after))) - def _get_directory_size(self): - size = 0 - for f in os.listdir(self.directory): - full = os.path.join(self.directory, f) - if os.path.isfile(full): - size += os.path.getsize(full) - return size + def _get_file_sha(self, filename): + block_size=2**20 + sha256 = hashlib.sha256() + # no context manager, just to keep testing simple. + f = open(filename, "r") + while True: + data = f.read(block_size) + if not data: + break + sha256.update(data) + f.close() + return sha256.hexdigest() - def _tar_directory(self): + def _tar_working_file(self, filename): # tar all the files in working directory into an archive # in destination_directory. - crc = "archive" - filename = self._make_filename(crc, self.destination_directory) + \ - ".tar.gz" + crc = self._get_file_sha(filename) # No contextmgr for tarfile in 2.6 :( - tar = tarfile.open(filename, "w:gz") - for f in os.listdir(self.directory): - full = os.path.join(self.directory, f) - if os.path.isfile(full): - tar.add(full) + fn = self._make_filename(crc, self.destination_directory) + ".tar.gz" + tar = tarfile.open(fn, "w:gz") + just_name = os.path.basename(filename) + tar.add(filename, arcname=just_name) tar.close() - def _clean_working_directory(self): - for f in os.listdir(self.directory): - full = os.path.join(self.directory, f) - if os.path.isfile(full): - os.remove(full) - self.directory_size = self._get_directory_size() + def _clean_working_directory(self, filename): + os.remove(filename) + self.size = 0 + + def _do_roll(self, filename): + self.close() + self._tar_working_file(filename) + self._clean_working_directory(filename) def write(self, metadata, json_payload): # Metadata is ignored. - crc = hashlib.sha256(json_payload).hexdigest() - filename = self._make_filename(crc, self.directory) - with open(filename, "w") as f: - f.write(json_payload) + handle = self._get_handle() + handle.write("%s\n" % json_payload) + handle.flush() - self.directory_size += len(json_payload) + self.size += len(json_payload) - if not self._should_tar(): - return + if self._should_roll(self.size): + self._do_roll(self.filename) - self._tar_directory() - self._clean_working_directory() + def _get_handle(self): + if not self.handle: + self.filename = self._make_filename('[[CRC]]', self.directory) + self.handle = open(self.filename, "w") + self.start_time = self._get_time() + + return self.handle def close(self): - pass + if self.handle: + self.handle.close() + self.handle = None diff --git a/test/integration/test_json_tarball.py b/test/integration/test_json_tarball.py index 0f25487..54ba4e1 100644 --- a/test/integration/test_json_tarball.py +++ b/test/integration/test_json_tarball.py @@ -13,16 +13,16 @@ import notigen from shoebox import roll_manager -TEMPDIR = "test_temp" +TEMPDIR = "test_temp/working" DESTDIR = "test_temp/output" +EXTRACTDIR = "test_temp/extract" class TestDirectory(unittest.TestCase): def setUp(self): - shutil.rmtree(TEMPDIR, ignore_errors=True) - shutil.rmtree(DESTDIR, ignore_errors=True) - os.mkdir(TEMPDIR) - os.mkdir(DESTDIR) + for d in [TEMPDIR, DESTDIR, EXTRACTDIR]: + shutil.rmtree(d, ignore_errors=True) + os.mkdir(d) def test_size_rolling(self): manager = roll_manager.WritingJSONRollManager( @@ -42,42 +42,40 @@ class TestDirectory(unittest.TestCase): json_event = json.dumps(event, cls=notification_utils.DateTimeEncoder) manager.write(metadata, json_event) - crc = hashlib.sha256(json_event).hexdigest() - entries[crc] = json_event + msg_id = event['message_id'] + entries[msg_id] = json_event now = g.move_to_next_tick(now) manager.close() + manager._archive_working_files() - # Confirm files and tarballs ... print "Starting entries:", len(entries) - date_len = len("2015_02_24_14_15_58_037080_") - num = 0 + + actual = len(entries) + + # Confirm there's nothing in working directory ... for f in os.listdir(TEMPDIR): full = os.path.join(TEMPDIR, f) if os.path.isfile(full): - crc = f[date_len:-len(".event")] - del entries[crc] - num += 1 - print "Untarred entries:", num, "Remaining:", len(entries) + self.fail("Working directory not empty.") - # the rest have to be in tarballs ... + # Extract the tarballs ... + total = 0 for f in os.listdir(DESTDIR): - num = 0 - actual = 0 tar = tarfile.open(os.path.join(DESTDIR, f), "r:gz") - for tarinfo in tar: - crc = tarinfo.name[len(TEMPDIR) + 1 + date_len:-len(".event")] - actual += 1 - if crc: - del entries[crc] - num += 1 - - if actual == 1: - raise Exception("tarball has 1 entry. Something is wrong.") - - print "In %s: %d of %d Remaining: %d" % (f, num, actual, - len(entries)) + names = tar.getnames() + tar.extractall(path=EXTRACTDIR) tar.close() - if len(entries): - raise Exception("%d more events than generated." % len(entries)) + for item in names: + full = os.path.join(EXTRACTDIR, item) + num = 0 + with open(full, "r") as handle: + for line in handle: + num += 1 + total += num + print "In %s: %d of %d Remaining: %d" % (f, num, actual, + actual - total) + + if actual != total: + raise Exception("Num generated != actual") diff --git a/test/test_roll_manager.py b/test/test_roll_manager.py index 0b376c4..3be2c0d 100644 --- a/test/test_roll_manager.py +++ b/test/test_roll_manager.py @@ -96,9 +96,8 @@ class TestWriting(unittest.TestCase): class TestJSONRollManager(unittest.TestCase): @mock.patch( - "shoebox.roll_manager.WritingJSONRollManager._get_directory_size") - def test_make_filename(self, gds): - gds.return_value = 1000 + "shoebox.roll_manager.WritingJSONRollManager._archive_working_files") + def test_make_filename(self, awf): now = datetime.datetime(day=1, month=2, year=2014, hour=10, minute=11, second=12) with mock.patch.object(notification_utils, "now") as dt: @@ -110,76 +109,96 @@ class TestJSONRollManager(unittest.TestCase): fn = x._make_filename("mycrc", "foo") self.assertEqual("foo/20140201_123.45_mycrc.foo", fn) - @mock.patch('os.path.getsize') @mock.patch('os.listdir') @mock.patch('os.path.isfile') - def test_get_directory_size(self, isf, ld, gs): + def test_archive_working_files(self, isf, ld): rm = roll_manager.WritingJSONRollManager("template.foo") - gs.return_value = 250000 ld.return_value = ['a', 'b', 'c'] isf.return_value = True - self.assertEqual(250000*3, rm._get_directory_size()) - ld.return_value = ['a', 'b', 'c', 'd', 'e', 'f'] - self.assertEqual(250000*6, rm._get_directory_size()) + with mock.patch.object(rm, "_do_roll") as dr: + rm._archive_working_files() + self.assertEqual(dr.call_count, 3) @mock.patch( - "shoebox.roll_manager.WritingJSONRollManager._get_directory_size") - def test_should_tar(self, gds): - gds.return_value = 1000 + "shoebox.roll_manager.WritingJSONRollManager._archive_working_files") + def test_should_roll(self, awf): rm = roll_manager.WritingJSONRollManager("template.foo") - rm.directory_size = 9 * 1048576 rm.roll_size_mb = 10 - self.assertFalse(rm._should_tar()) - rm.directory_size = 10 * 1048576 - rm.roll_size_mb = 10 - self.assertTrue(rm._should_tar()) + self.assertFalse(rm._should_roll(9*1048576)) + self.assertTrue(rm._should_roll(10*1048576)) + + rm = roll_manager.WritingJSONRollManager("template.foo", roll_minutes=10) + self.assertFalse(rm._should_roll(0)) + self.assertFalse(rm._should_roll(1)) + with mock.patch.object(rm, "_get_time") as gt: + gt.return_value = rm.start_time + datetime.timedelta(minutes=11) + self.assertTrue(rm._should_roll(1)) - @mock.patch('os.listdir') @mock.patch('os.remove') - @mock.patch('os.path.isfile') @mock.patch( - "shoebox.roll_manager.WritingJSONRollManager._get_directory_size") - def test_clean_working_directory(self, gds, isf, rem, ld): - gds.return_value = 1000 - isf.return_value = True + "shoebox.roll_manager.WritingJSONRollManager._archive_working_files") + def test_clean_working_directory(self, awf, rem): rm = roll_manager.WritingJSONRollManager("template.foo") - ld.return_value = ['a', 'b', 'c'] - rm._clean_working_directory() - self.assertEqual(3, rem.call_count) + rm._clean_working_directory("foo") + self.assertEqual(1, rem.call_count) - @mock.patch('os.listdir') - @mock.patch('tarfile.open') - @mock.patch('os.path.isfile') @mock.patch( - "shoebox.roll_manager.WritingJSONRollManager._get_directory_size") - def test_tar_directory(self, gds, isf, to, ld): - gds.return_value = 1000 - ld.return_value = ['a', 'b', 'c'] - isf.return_value = True - gds = 1000 + "shoebox.roll_manager.WritingJSONRollManager._archive_working_files") + def test_tar_working_file(self, awf): rm = roll_manager.WritingJSONRollManager("template.foo") - open_name = '%s.open' % roll_manager.__name__ - with mock.patch(open_name, create=True) as mock_open: - mock_open.return_value = mock.MagicMock(spec=file) - - rm._tar_directory() - self.assertTrue(to.called) + with mock.patch.object(rm, "_get_file_sha") as gfs: + gfs.return_value = "aabbcc" + with mock.patch.object(roll_manager.tarfile, 'open') as tar: + tar.return_value = mock.MagicMock() + rm._tar_working_file("foo") + self.assertTrue(tar.called) @mock.patch( - "shoebox.roll_manager.WritingJSONRollManager._get_directory_size") - def test_write(self, gds): - gds.return_value = 0 + "shoebox.roll_manager.WritingJSONRollManager._archive_working_files") + def test_write(self, awf): rm = roll_manager.WritingJSONRollManager("template.foo") payload = "some big payload" - open_name = '%s.open' % roll_manager.__name__ - with mock.patch(open_name, create=True) as mock_open: - with mock.patch.object(rm, "_should_tar") as st: - with mock.patch.object(rm, "_tar_directory") as td: - st.return_value = False - mock_open.return_value = mock.MagicMock(spec=file) + with mock.patch.object(rm, "_get_handle") as gh: + with mock.patch.object(rm, "_should_roll") as sr: + with mock.patch.object(rm, "_do_roll") as dr: + sr.return_value = False + gh.return_value = mock.MagicMock() rm.write("metadata", payload) - self.assertTrue(mock_open.called_once_with( - "template.foo", "wb")) - self.assertFalse(td.called) - self.assertEqual(rm.directory_size, len(payload)) + self.assertFalse(dr.called) + self.assertEqual(rm.size, len(payload)) + + @mock.patch( + "shoebox.roll_manager.WritingJSONRollManager._archive_working_files") + def test_get_file_sha(self, awf): + rm = roll_manager.WritingJSONRollManager("template.foo") + with mock.patch.object(roll_manager.hashlib, "sha256") as sha: + sha_obj = mock.MagicMock() + sha.return_value = sha_obj + hexdigest = mock.MagicMock() + hexdigest.return_value = "aabbcc" + sha_obj.hexdigest = hexdigest + open_name = '%s.open' % roll_manager.__name__ + with mock.patch(open_name, create=True) as mock_open: + handle = mock.MagicMock() + mock_open.return_value = handle + data = mock.MagicMock() + handle.read = data + data.side_effect = ["a", "b", "c", False] + self.assertEqual("aabbcc", rm._get_file_sha('foo')) + + @mock.patch( + "shoebox.roll_manager.WritingJSONRollManager._archive_working_files") + def test_get_handle(self, awf): + rm = roll_manager.WritingJSONRollManager("template.foo") + rm.handle = "abc" + self.assertEqual("abc", rm._get_handle()) + + with mock.patch.object(rm, "_make_filename") as mf: + mf.return_value = "foo" + open_name = '%s.open' % roll_manager.__name__ + with mock.patch(open_name, create=True) as mock_open: + handle = mock.MagicMock() + mock_open.return_value = handle + rm.handle = None + self.assertEqual(handle, rm._get_handle())