From 5f91c8773939d1bef91ec9ee4e34a3869b49202c Mon Sep 17 00:00:00 2001 From: Sandy Walsh Date: Tue, 24 Feb 2015 07:10:41 -0800 Subject: [PATCH] Tarball archiving of JSON messages. Changed the JSON RollManager to write JSON payloads to a working directory until a certain directory size is met. Then the directory is tar/gzipped and moved into a destination directory. No CRC is available for the tarball. Change-Id: I119da646104eeaea86f7153a9bcf123d3143a0ba --- setup.cfg | 2 +- shoebox/roll_manager.py | 77 +++++++++++++++++++++++++++++++++------ test/test_roll_manager.py | 67 ++++++++++++++++++++++++++-------- 3 files changed, 118 insertions(+), 28 deletions(-) diff --git a/setup.cfg b/setup.cfg index 72bd36b..1ada1d2 100644 --- a/setup.cfg +++ b/setup.cfg @@ -1,6 +1,6 @@ [metadata] name = shoebox -version = 0.2 +version = 0.41 author = Dark Secret Software Inc. author-email = admin@darksecretsoftware.com summary = data archiving library diff --git a/shoebox/roll_manager.py b/shoebox/roll_manager.py index aa9a8e9..598c566 100644 --- a/shoebox/roll_manager.py +++ b/shoebox/roll_manager.py @@ -14,10 +14,10 @@ # limitations under the License. import fnmatch -import gzip import hashlib import os import os.path +import tarfile import notification_utils @@ -62,8 +62,9 @@ class RollManager(object): class ReadingRollManager(RollManager): def __init__(self, filename_template, directory=".", + destination_directory=".", archive_class = archive.ArchiveReader, - archive_callback=None): + archive_callback=None, roll_size_mb=1000): super(ReadingRollManager, self).__init__( filename_template, directory=directory, @@ -101,8 +102,9 @@ class ReadingRollManager(RollManager): class WritingRollManager(RollManager): def __init__(self, filename_template, roll_checker, directory=".", + destination_directory=".", archive_class=archive.ArchiveWriter, - archive_callback=None): + archive_callback=None, roll_size_mb=1000): super(WritingRollManager, self).__init__( filename_template, directory=directory, @@ -141,16 +143,19 @@ class WritingRollManager(RollManager): class WritingJSONRollManager(object): """No archiver. No roll checker. Just write the file locally. + Once the working_directory gets big enough, the files are + .tar.gz'ed into the destination_directory. Then + the working_directory is erased. Expects an external tool like rsync to move the file. A SHA-256 of the payload may be included in the filename.""" def __init__(self, *args, **kwargs): self.filename_template = args[0] self.directory = kwargs.get('directory', '.') - if not os.path.isdir(self.directory): - raise BadWorkingDirectory("Directory '%s' does not exist" % - self.directory) + self.destination_directory = kwargs.get('destination_directory', '.') + self.roll_size_mb = int(kwargs.get('roll_size_mb', 1000)) + self.check_delay = 0 # Only check directory size every N events. - def _make_filename(self, crc): + def _make_filename(self, crc, prefix): now = notification_utils.now() dt = str(notification_utils.dt_to_decimal(now)) f = now.strftime(self.filename_template) @@ -159,12 +164,60 @@ class WritingJSONRollManager(object): f = f.replace(":", "_") f = f.replace("[[CRC]]", crc) f = f.replace("[[TIMESTAMP]]", dt) - return os.path.join(self.directory, f) + return os.path.join(prefix, f) + + def _should_tar(self): + size = 0 + for f in os.listdir(self.directory): + full = os.path.join(self.directory, f) + if os.path.isfile(full): + size += os.path.getsize(full) + + return (size / 1048576) >= self.roll_size_mb + + def _tar_directory(self): + # tar all the files in working directory into an archive + # in destination_directory. + crc = "archive" + filename = self._make_filename(crc, self.destination_directory) + ".tar.gz" + + # No contextmgr for tarfile in 2.6 :( + tar = tarfile.open(filename, "w:gz") + for f in os.listdir(self.directory): + full = os.path.join(self.directory, f) + if os.path.isfile(full): + tar.add(full) + tar.close() + + def _clean_working_directory(self): + for f in os.listdir(self.directory): + full = os.path.join(self.directory, f) + if os.path.isfile(full): + os.remove(full) + + def _delay_check(self): + self.check_delay += 1 + if self.check_delay > 250: + self.check_delay = 0 + return False + return True def write(self, metadata, json_payload): # Metadata is ignored. crc = hashlib.sha256(json_payload).hexdigest() - filename = self._make_filename(crc) - f = gzip.open(filename, 'wb') - f.write(json_payload) - f.close() + filename = self._make_filename(crc, self.directory) + with open(filename, "w") as f: + f.write(json_payload) + + if self._delay_check(): + return + + if not self._should_tar(): + return + + self._tar_directory() + self._clean_working_directory() + + + def close(self): + pass diff --git a/test/test_roll_manager.py b/test/test_roll_manager.py index 784e48d..76feced 100644 --- a/test/test_roll_manager.py +++ b/test/test_roll_manager.py @@ -95,14 +95,6 @@ class TestWriting(unittest.TestCase): class TestJSONRollManager(unittest.TestCase): - def test_bad_directory(self): - try: - roll_manager.WritingJSONRollManager("x", directory="bad_directory") - self.fail("Should raise BadWorkingDirectory") - except roll_manager.BadWorkingDirectory as e: - pass - - def test_make_filename(self): now = datetime.datetime(day=1, month=2, year=2014, hour=10, minute=11, second=12) @@ -112,12 +104,57 @@ class TestJSONRollManager(unittest.TestCase): dt.return_value = now x = roll_manager.WritingJSONRollManager( "%Y%m%d [[TIMESTAMP]] [[CRC]].foo") - fn = x._make_filename("mycrc") - self.assertEqual("./20140201_123.45_mycrc.foo", fn) + fn = x._make_filename("mycrc", "foo") + self.assertEqual("foo/20140201_123.45_mycrc.foo", fn) + + @mock.patch('os.path.getsize') + @mock.patch('os.listdir') + @mock.patch('os.path.isfile') + def test_should_tar(self, isf, ld, gs): + rm = roll_manager.WritingJSONRollManager("template.foo") + gs.return_value = 250000 + ld.return_value = ['a', 'b', 'c'] + isf.return_value = True + rm.roll_size_mb = 1 + self.assertFalse(rm._should_tar()) + ld.return_value = ['a', 'b', 'c', 'd', 'e', 'f'] + self.assertTrue(rm._should_tar()) + + @mock.patch('os.listdir') + @mock.patch('os.remove') + @mock.patch('os.path.isfile') + def test_clean_working_directory(self, isf, rem, ld): + isf.return_value = True + rm = roll_manager.WritingJSONRollManager("template.foo") + ld.return_value = ['a', 'b', 'c'] + rm._clean_working_directory() + self.assertEqual(3, rem.call_count) + + @mock.patch('os.listdir') + @mock.patch('tarfile.open') + @mock.patch('os.path.isfile') + def test_tar_directory(self, isf, to, ld): + ld.return_value = ['a', 'b', 'c'] + isf.return_value = True + rm = roll_manager.WritingJSONRollManager("template.foo") + + open_name = '%s.open' % roll_manager.__name__ + with mock.patch(open_name, create=True) as mock_open: + mock_open.return_value = mock.MagicMock(spec=file) + + rm._tar_directory() + self.assertTrue(to.called) def test_write(self): - x = roll_manager.WritingJSONRollManager("template.foo") - with mock.patch.object(roll_manager.gzip, "open") as gz: - x.write("metadata", "json_payload") - - self.assertTrue(gz.called_once_with("template.foo", "wb")) + rm = roll_manager.WritingJSONRollManager("template.foo") + payload = "some big payload" + open_name = '%s.open' % roll_manager.__name__ + with mock.patch(open_name, create=True) as mock_open: + with mock.patch.object(rm, "_should_tar") as st: + with mock.patch.object(rm, "_tar_directory") as td: + st.return_value = False + mock_open.return_value = mock.MagicMock(spec=file) + rm.write("metadata", payload) + self.assertTrue(mock_open.called_once_with( + "template.foo", "wb")) + self.assertFalse(td.called)