Tarball archiving of JSON messages.
Changed the JSON RollManager to write JSON payloads to a working directory until a certain directory size is met. Then the directory is tar/gzipped and moved into a destination directory. No CRC is available for the tarball. Change-Id: I119da646104eeaea86f7153a9bcf123d3143a0ba
This commit is contained in:
parent
852c1853e5
commit
5f91c87739
@ -1,6 +1,6 @@
|
||||
[metadata]
|
||||
name = shoebox
|
||||
version = 0.2
|
||||
version = 0.41
|
||||
author = Dark Secret Software Inc.
|
||||
author-email = admin@darksecretsoftware.com
|
||||
summary = data archiving library
|
||||
|
@ -14,10 +14,10 @@
|
||||
# limitations under the License.
|
||||
|
||||
import fnmatch
|
||||
import gzip
|
||||
import hashlib
|
||||
import os
|
||||
import os.path
|
||||
import tarfile
|
||||
|
||||
import notification_utils
|
||||
|
||||
@ -62,8 +62,9 @@ class RollManager(object):
|
||||
|
||||
class ReadingRollManager(RollManager):
|
||||
def __init__(self, filename_template, directory=".",
|
||||
destination_directory=".",
|
||||
archive_class = archive.ArchiveReader,
|
||||
archive_callback=None):
|
||||
archive_callback=None, roll_size_mb=1000):
|
||||
super(ReadingRollManager, self).__init__(
|
||||
filename_template,
|
||||
directory=directory,
|
||||
@ -101,8 +102,9 @@ class ReadingRollManager(RollManager):
|
||||
|
||||
class WritingRollManager(RollManager):
|
||||
def __init__(self, filename_template, roll_checker, directory=".",
|
||||
destination_directory=".",
|
||||
archive_class=archive.ArchiveWriter,
|
||||
archive_callback=None):
|
||||
archive_callback=None, roll_size_mb=1000):
|
||||
super(WritingRollManager, self).__init__(
|
||||
filename_template,
|
||||
directory=directory,
|
||||
@ -141,16 +143,19 @@ class WritingRollManager(RollManager):
|
||||
|
||||
class WritingJSONRollManager(object):
|
||||
"""No archiver. No roll checker. Just write the file locally.
|
||||
Once the working_directory gets big enough, the files are
|
||||
.tar.gz'ed into the destination_directory. Then
|
||||
the working_directory is erased.
|
||||
Expects an external tool like rsync to move the file.
|
||||
A SHA-256 of the payload may be included in the filename."""
|
||||
def __init__(self, *args, **kwargs):
|
||||
self.filename_template = args[0]
|
||||
self.directory = kwargs.get('directory', '.')
|
||||
if not os.path.isdir(self.directory):
|
||||
raise BadWorkingDirectory("Directory '%s' does not exist" %
|
||||
self.directory)
|
||||
self.destination_directory = kwargs.get('destination_directory', '.')
|
||||
self.roll_size_mb = int(kwargs.get('roll_size_mb', 1000))
|
||||
self.check_delay = 0 # Only check directory size every N events.
|
||||
|
||||
def _make_filename(self, crc):
|
||||
def _make_filename(self, crc, prefix):
|
||||
now = notification_utils.now()
|
||||
dt = str(notification_utils.dt_to_decimal(now))
|
||||
f = now.strftime(self.filename_template)
|
||||
@ -159,12 +164,60 @@ class WritingJSONRollManager(object):
|
||||
f = f.replace(":", "_")
|
||||
f = f.replace("[[CRC]]", crc)
|
||||
f = f.replace("[[TIMESTAMP]]", dt)
|
||||
return os.path.join(self.directory, f)
|
||||
return os.path.join(prefix, f)
|
||||
|
||||
def _should_tar(self):
|
||||
size = 0
|
||||
for f in os.listdir(self.directory):
|
||||
full = os.path.join(self.directory, f)
|
||||
if os.path.isfile(full):
|
||||
size += os.path.getsize(full)
|
||||
|
||||
return (size / 1048576) >= self.roll_size_mb
|
||||
|
||||
def _tar_directory(self):
|
||||
# tar all the files in working directory into an archive
|
||||
# in destination_directory.
|
||||
crc = "archive"
|
||||
filename = self._make_filename(crc, self.destination_directory) + ".tar.gz"
|
||||
|
||||
# No contextmgr for tarfile in 2.6 :(
|
||||
tar = tarfile.open(filename, "w:gz")
|
||||
for f in os.listdir(self.directory):
|
||||
full = os.path.join(self.directory, f)
|
||||
if os.path.isfile(full):
|
||||
tar.add(full)
|
||||
tar.close()
|
||||
|
||||
def _clean_working_directory(self):
|
||||
for f in os.listdir(self.directory):
|
||||
full = os.path.join(self.directory, f)
|
||||
if os.path.isfile(full):
|
||||
os.remove(full)
|
||||
|
||||
def _delay_check(self):
|
||||
self.check_delay += 1
|
||||
if self.check_delay > 250:
|
||||
self.check_delay = 0
|
||||
return False
|
||||
return True
|
||||
|
||||
def write(self, metadata, json_payload):
|
||||
# Metadata is ignored.
|
||||
crc = hashlib.sha256(json_payload).hexdigest()
|
||||
filename = self._make_filename(crc)
|
||||
f = gzip.open(filename, 'wb')
|
||||
f.write(json_payload)
|
||||
f.close()
|
||||
filename = self._make_filename(crc, self.directory)
|
||||
with open(filename, "w") as f:
|
||||
f.write(json_payload)
|
||||
|
||||
if self._delay_check():
|
||||
return
|
||||
|
||||
if not self._should_tar():
|
||||
return
|
||||
|
||||
self._tar_directory()
|
||||
self._clean_working_directory()
|
||||
|
||||
|
||||
def close(self):
|
||||
pass
|
||||
|
@ -95,14 +95,6 @@ class TestWriting(unittest.TestCase):
|
||||
|
||||
|
||||
class TestJSONRollManager(unittest.TestCase):
|
||||
def test_bad_directory(self):
|
||||
try:
|
||||
roll_manager.WritingJSONRollManager("x", directory="bad_directory")
|
||||
self.fail("Should raise BadWorkingDirectory")
|
||||
except roll_manager.BadWorkingDirectory as e:
|
||||
pass
|
||||
|
||||
|
||||
def test_make_filename(self):
|
||||
now = datetime.datetime(day=1, month=2, year=2014,
|
||||
hour=10, minute=11, second=12)
|
||||
@ -112,12 +104,57 @@ class TestJSONRollManager(unittest.TestCase):
|
||||
dt.return_value = now
|
||||
x = roll_manager.WritingJSONRollManager(
|
||||
"%Y%m%d [[TIMESTAMP]] [[CRC]].foo")
|
||||
fn = x._make_filename("mycrc")
|
||||
self.assertEqual("./20140201_123.45_mycrc.foo", fn)
|
||||
fn = x._make_filename("mycrc", "foo")
|
||||
self.assertEqual("foo/20140201_123.45_mycrc.foo", fn)
|
||||
|
||||
@mock.patch('os.path.getsize')
|
||||
@mock.patch('os.listdir')
|
||||
@mock.patch('os.path.isfile')
|
||||
def test_should_tar(self, isf, ld, gs):
|
||||
rm = roll_manager.WritingJSONRollManager("template.foo")
|
||||
gs.return_value = 250000
|
||||
ld.return_value = ['a', 'b', 'c']
|
||||
isf.return_value = True
|
||||
rm.roll_size_mb = 1
|
||||
self.assertFalse(rm._should_tar())
|
||||
ld.return_value = ['a', 'b', 'c', 'd', 'e', 'f']
|
||||
self.assertTrue(rm._should_tar())
|
||||
|
||||
@mock.patch('os.listdir')
|
||||
@mock.patch('os.remove')
|
||||
@mock.patch('os.path.isfile')
|
||||
def test_clean_working_directory(self, isf, rem, ld):
|
||||
isf.return_value = True
|
||||
rm = roll_manager.WritingJSONRollManager("template.foo")
|
||||
ld.return_value = ['a', 'b', 'c']
|
||||
rm._clean_working_directory()
|
||||
self.assertEqual(3, rem.call_count)
|
||||
|
||||
@mock.patch('os.listdir')
|
||||
@mock.patch('tarfile.open')
|
||||
@mock.patch('os.path.isfile')
|
||||
def test_tar_directory(self, isf, to, ld):
|
||||
ld.return_value = ['a', 'b', 'c']
|
||||
isf.return_value = True
|
||||
rm = roll_manager.WritingJSONRollManager("template.foo")
|
||||
|
||||
open_name = '%s.open' % roll_manager.__name__
|
||||
with mock.patch(open_name, create=True) as mock_open:
|
||||
mock_open.return_value = mock.MagicMock(spec=file)
|
||||
|
||||
rm._tar_directory()
|
||||
self.assertTrue(to.called)
|
||||
|
||||
def test_write(self):
|
||||
x = roll_manager.WritingJSONRollManager("template.foo")
|
||||
with mock.patch.object(roll_manager.gzip, "open") as gz:
|
||||
x.write("metadata", "json_payload")
|
||||
|
||||
self.assertTrue(gz.called_once_with("template.foo", "wb"))
|
||||
rm = roll_manager.WritingJSONRollManager("template.foo")
|
||||
payload = "some big payload"
|
||||
open_name = '%s.open' % roll_manager.__name__
|
||||
with mock.patch(open_name, create=True) as mock_open:
|
||||
with mock.patch.object(rm, "_should_tar") as st:
|
||||
with mock.patch.object(rm, "_tar_directory") as td:
|
||||
st.return_value = False
|
||||
mock_open.return_value = mock.MagicMock(spec=file)
|
||||
rm.write("metadata", payload)
|
||||
self.assertTrue(mock_open.called_once_with(
|
||||
"template.foo", "wb"))
|
||||
self.assertFalse(td.called)
|
||||
|
Loading…
x
Reference in New Issue
Block a user