One notification per file line in tarball.
JsonRollManager also now rotates after time (15min default). The contents of the tarball are a single file that contains one notification per file line. The tarball is created after reaching a certain size or after a fixed amount of time. Any working files are also tarball'ed when the service restarts. Change-Id: I6e9d620086cf24bc1468b638e7b148af7230e2ac
This commit is contained in:
parent
2be257bbfd
commit
7a600792db
@ -1,6 +1,6 @@
|
|||||||
[metadata]
|
[metadata]
|
||||||
name = shoebox
|
name = shoebox
|
||||||
version = 0.5
|
version = 0.6
|
||||||
author = Dark Secret Software Inc.
|
author = Dark Secret Software Inc.
|
||||||
author-email = admin@darksecretsoftware.com
|
author-email = admin@darksecretsoftware.com
|
||||||
summary = data archiving library
|
summary = data archiving library
|
||||||
|
@ -13,6 +13,7 @@
|
|||||||
# See the License for the specific language governing permissions and
|
# See the License for the specific language governing permissions and
|
||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
|
|
||||||
|
import datetime
|
||||||
import fnmatch
|
import fnmatch
|
||||||
import hashlib
|
import hashlib
|
||||||
import os
|
import os
|
||||||
@ -142,24 +143,38 @@ class WritingRollManager(RollManager):
|
|||||||
|
|
||||||
|
|
||||||
class WritingJSONRollManager(object):
|
class WritingJSONRollManager(object):
|
||||||
"""No archiver. No roll checker. Just write the file locally.
|
"""No archiver. No roll checker. Just write 1 file line per json payload.
|
||||||
Once the working_directory gets big enough, the files are
|
Once the file gets big enough, .tar.gz the file and move
|
||||||
.tar.gz'ed into the destination_directory. Then
|
into the destination_directory.
|
||||||
the working_directory is erased.
|
|
||||||
Expects an external tool like rsync to move the file.
|
Expects an external tool like rsync to move the file.
|
||||||
A SHA-256 of the payload may be included in the filename."""
|
A SHA-256 of the payload may be included in the tarball filename."""
|
||||||
def __init__(self, *args, **kwargs):
|
def __init__(self, *args, **kwargs):
|
||||||
self.filename_template = args[0]
|
self.filename_template = args[0]
|
||||||
self.directory = kwargs.get('directory', '.')
|
self.directory = kwargs.get('directory', '.')
|
||||||
self.destination_directory = kwargs.get('destination_directory', '.')
|
self.destination_directory = kwargs.get('destination_directory', '.')
|
||||||
self.roll_size_mb = int(kwargs.get('roll_size_mb', 1000))
|
self.roll_size_mb = int(kwargs.get('roll_size_mb', 1000))
|
||||||
|
minutes = kwargs.get('roll_minutes', 15)
|
||||||
|
self.roll_after = datetime.timedelta(minutes=minutes)
|
||||||
|
|
||||||
# Read the directory at the start, but incrementally
|
# Look in the working directory for any files. Move them to the
|
||||||
# update the size as we write more files. Doing a full
|
# destination directory before we start. This implies we
|
||||||
# disk stat every time is way too expensive.
|
# have to make sure multiple workers don't point at the same
|
||||||
# Of course, this means we are not accounting for
|
# working directory.
|
||||||
# block size.
|
self.handle = None
|
||||||
self.directory_size = self._get_directory_size()
|
self.size = 0
|
||||||
|
self.start_time = self._get_time()
|
||||||
|
|
||||||
|
self._archive_working_files()
|
||||||
|
|
||||||
|
def _get_time(self):
|
||||||
|
# Broken out for testing ...
|
||||||
|
return datetime.datetime.utcnow()
|
||||||
|
|
||||||
|
def _archive_working_files(self):
|
||||||
|
for f in os.listdir(self.directory):
|
||||||
|
full = os.path.join(self.directory, f)
|
||||||
|
if os.path.isfile(full):
|
||||||
|
self._do_roll(full)
|
||||||
|
|
||||||
def _make_filename(self, crc, prefix):
|
def _make_filename(self, crc, prefix):
|
||||||
now = notification_utils.now()
|
now = notification_utils.now()
|
||||||
@ -172,53 +187,65 @@ class WritingJSONRollManager(object):
|
|||||||
f = f.replace("[[TIMESTAMP]]", dt)
|
f = f.replace("[[TIMESTAMP]]", dt)
|
||||||
return os.path.join(prefix, f)
|
return os.path.join(prefix, f)
|
||||||
|
|
||||||
def _should_tar(self):
|
def _should_roll(self, size):
|
||||||
return (self.directory_size / 1048576) >= self.roll_size_mb
|
return ((size / 1048576) >= self.roll_size_mb or
|
||||||
|
(size > 0 and
|
||||||
|
self._get_time() >= (self.start_time + self.roll_after)))
|
||||||
|
|
||||||
def _get_directory_size(self):
|
def _get_file_sha(self, filename):
|
||||||
size = 0
|
block_size=2**20
|
||||||
for f in os.listdir(self.directory):
|
sha256 = hashlib.sha256()
|
||||||
full = os.path.join(self.directory, f)
|
# no context manager, just to keep testing simple.
|
||||||
if os.path.isfile(full):
|
f = open(filename, "r")
|
||||||
size += os.path.getsize(full)
|
while True:
|
||||||
return size
|
data = f.read(block_size)
|
||||||
|
if not data:
|
||||||
|
break
|
||||||
|
sha256.update(data)
|
||||||
|
f.close()
|
||||||
|
return sha256.hexdigest()
|
||||||
|
|
||||||
def _tar_directory(self):
|
def _tar_working_file(self, filename):
|
||||||
# tar all the files in working directory into an archive
|
# tar all the files in working directory into an archive
|
||||||
# in destination_directory.
|
# in destination_directory.
|
||||||
crc = "archive"
|
crc = self._get_file_sha(filename)
|
||||||
filename = self._make_filename(crc, self.destination_directory) + \
|
|
||||||
".tar.gz"
|
|
||||||
|
|
||||||
# No contextmgr for tarfile in 2.6 :(
|
# No contextmgr for tarfile in 2.6 :(
|
||||||
tar = tarfile.open(filename, "w:gz")
|
fn = self._make_filename(crc, self.destination_directory) + ".tar.gz"
|
||||||
for f in os.listdir(self.directory):
|
tar = tarfile.open(fn, "w:gz")
|
||||||
full = os.path.join(self.directory, f)
|
just_name = os.path.basename(filename)
|
||||||
if os.path.isfile(full):
|
tar.add(filename, arcname=just_name)
|
||||||
tar.add(full)
|
|
||||||
tar.close()
|
tar.close()
|
||||||
|
|
||||||
def _clean_working_directory(self):
|
def _clean_working_directory(self, filename):
|
||||||
for f in os.listdir(self.directory):
|
os.remove(filename)
|
||||||
full = os.path.join(self.directory, f)
|
self.size = 0
|
||||||
if os.path.isfile(full):
|
|
||||||
os.remove(full)
|
def _do_roll(self, filename):
|
||||||
self.directory_size = self._get_directory_size()
|
self.close()
|
||||||
|
self._tar_working_file(filename)
|
||||||
|
self._clean_working_directory(filename)
|
||||||
|
|
||||||
def write(self, metadata, json_payload):
|
def write(self, metadata, json_payload):
|
||||||
# Metadata is ignored.
|
# Metadata is ignored.
|
||||||
crc = hashlib.sha256(json_payload).hexdigest()
|
handle = self._get_handle()
|
||||||
filename = self._make_filename(crc, self.directory)
|
handle.write("%s\n" % json_payload)
|
||||||
with open(filename, "w") as f:
|
handle.flush()
|
||||||
f.write(json_payload)
|
|
||||||
|
|
||||||
self.directory_size += len(json_payload)
|
self.size += len(json_payload)
|
||||||
|
|
||||||
if not self._should_tar():
|
if self._should_roll(self.size):
|
||||||
return
|
self._do_roll(self.filename)
|
||||||
|
|
||||||
self._tar_directory()
|
def _get_handle(self):
|
||||||
self._clean_working_directory()
|
if not self.handle:
|
||||||
|
self.filename = self._make_filename('[[CRC]]', self.directory)
|
||||||
|
self.handle = open(self.filename, "w")
|
||||||
|
self.start_time = self._get_time()
|
||||||
|
|
||||||
|
return self.handle
|
||||||
|
|
||||||
def close(self):
|
def close(self):
|
||||||
pass
|
if self.handle:
|
||||||
|
self.handle.close()
|
||||||
|
self.handle = None
|
||||||
|
@ -13,16 +13,16 @@ import notigen
|
|||||||
from shoebox import roll_manager
|
from shoebox import roll_manager
|
||||||
|
|
||||||
|
|
||||||
TEMPDIR = "test_temp"
|
TEMPDIR = "test_temp/working"
|
||||||
DESTDIR = "test_temp/output"
|
DESTDIR = "test_temp/output"
|
||||||
|
EXTRACTDIR = "test_temp/extract"
|
||||||
|
|
||||||
|
|
||||||
class TestDirectory(unittest.TestCase):
|
class TestDirectory(unittest.TestCase):
|
||||||
def setUp(self):
|
def setUp(self):
|
||||||
shutil.rmtree(TEMPDIR, ignore_errors=True)
|
for d in [TEMPDIR, DESTDIR, EXTRACTDIR]:
|
||||||
shutil.rmtree(DESTDIR, ignore_errors=True)
|
shutil.rmtree(d, ignore_errors=True)
|
||||||
os.mkdir(TEMPDIR)
|
os.mkdir(d)
|
||||||
os.mkdir(DESTDIR)
|
|
||||||
|
|
||||||
def test_size_rolling(self):
|
def test_size_rolling(self):
|
||||||
manager = roll_manager.WritingJSONRollManager(
|
manager = roll_manager.WritingJSONRollManager(
|
||||||
@ -42,42 +42,40 @@ class TestDirectory(unittest.TestCase):
|
|||||||
json_event = json.dumps(event,
|
json_event = json.dumps(event,
|
||||||
cls=notification_utils.DateTimeEncoder)
|
cls=notification_utils.DateTimeEncoder)
|
||||||
manager.write(metadata, json_event)
|
manager.write(metadata, json_event)
|
||||||
crc = hashlib.sha256(json_event).hexdigest()
|
msg_id = event['message_id']
|
||||||
entries[crc] = json_event
|
entries[msg_id] = json_event
|
||||||
|
|
||||||
now = g.move_to_next_tick(now)
|
now = g.move_to_next_tick(now)
|
||||||
manager.close()
|
manager.close()
|
||||||
|
manager._archive_working_files()
|
||||||
|
|
||||||
# Confirm files and tarballs ...
|
|
||||||
print "Starting entries:", len(entries)
|
print "Starting entries:", len(entries)
|
||||||
date_len = len("2015_02_24_14_15_58_037080_")
|
|
||||||
num = 0
|
actual = len(entries)
|
||||||
|
|
||||||
|
# Confirm there's nothing in working directory ...
|
||||||
for f in os.listdir(TEMPDIR):
|
for f in os.listdir(TEMPDIR):
|
||||||
full = os.path.join(TEMPDIR, f)
|
full = os.path.join(TEMPDIR, f)
|
||||||
if os.path.isfile(full):
|
if os.path.isfile(full):
|
||||||
crc = f[date_len:-len(".event")]
|
self.fail("Working directory not empty.")
|
||||||
del entries[crc]
|
|
||||||
num += 1
|
|
||||||
print "Untarred entries:", num, "Remaining:", len(entries)
|
|
||||||
|
|
||||||
# the rest have to be in tarballs ...
|
# Extract the tarballs ...
|
||||||
|
total = 0
|
||||||
for f in os.listdir(DESTDIR):
|
for f in os.listdir(DESTDIR):
|
||||||
num = 0
|
|
||||||
actual = 0
|
|
||||||
tar = tarfile.open(os.path.join(DESTDIR, f), "r:gz")
|
tar = tarfile.open(os.path.join(DESTDIR, f), "r:gz")
|
||||||
for tarinfo in tar:
|
names = tar.getnames()
|
||||||
crc = tarinfo.name[len(TEMPDIR) + 1 + date_len:-len(".event")]
|
tar.extractall(path=EXTRACTDIR)
|
||||||
actual += 1
|
|
||||||
if crc:
|
|
||||||
del entries[crc]
|
|
||||||
num += 1
|
|
||||||
|
|
||||||
if actual == 1:
|
|
||||||
raise Exception("tarball has 1 entry. Something is wrong.")
|
|
||||||
|
|
||||||
print "In %s: %d of %d Remaining: %d" % (f, num, actual,
|
|
||||||
len(entries))
|
|
||||||
tar.close()
|
tar.close()
|
||||||
|
|
||||||
if len(entries):
|
for item in names:
|
||||||
raise Exception("%d more events than generated." % len(entries))
|
full = os.path.join(EXTRACTDIR, item)
|
||||||
|
num = 0
|
||||||
|
with open(full, "r") as handle:
|
||||||
|
for line in handle:
|
||||||
|
num += 1
|
||||||
|
total += num
|
||||||
|
print "In %s: %d of %d Remaining: %d" % (f, num, actual,
|
||||||
|
actual - total)
|
||||||
|
|
||||||
|
if actual != total:
|
||||||
|
raise Exception("Num generated != actual")
|
||||||
|
@ -96,9 +96,8 @@ class TestWriting(unittest.TestCase):
|
|||||||
|
|
||||||
class TestJSONRollManager(unittest.TestCase):
|
class TestJSONRollManager(unittest.TestCase):
|
||||||
@mock.patch(
|
@mock.patch(
|
||||||
"shoebox.roll_manager.WritingJSONRollManager._get_directory_size")
|
"shoebox.roll_manager.WritingJSONRollManager._archive_working_files")
|
||||||
def test_make_filename(self, gds):
|
def test_make_filename(self, awf):
|
||||||
gds.return_value = 1000
|
|
||||||
now = datetime.datetime(day=1, month=2, year=2014,
|
now = datetime.datetime(day=1, month=2, year=2014,
|
||||||
hour=10, minute=11, second=12)
|
hour=10, minute=11, second=12)
|
||||||
with mock.patch.object(notification_utils, "now") as dt:
|
with mock.patch.object(notification_utils, "now") as dt:
|
||||||
@ -110,76 +109,96 @@ class TestJSONRollManager(unittest.TestCase):
|
|||||||
fn = x._make_filename("mycrc", "foo")
|
fn = x._make_filename("mycrc", "foo")
|
||||||
self.assertEqual("foo/20140201_123.45_mycrc.foo", fn)
|
self.assertEqual("foo/20140201_123.45_mycrc.foo", fn)
|
||||||
|
|
||||||
@mock.patch('os.path.getsize')
|
|
||||||
@mock.patch('os.listdir')
|
@mock.patch('os.listdir')
|
||||||
@mock.patch('os.path.isfile')
|
@mock.patch('os.path.isfile')
|
||||||
def test_get_directory_size(self, isf, ld, gs):
|
def test_archive_working_files(self, isf, ld):
|
||||||
rm = roll_manager.WritingJSONRollManager("template.foo")
|
rm = roll_manager.WritingJSONRollManager("template.foo")
|
||||||
gs.return_value = 250000
|
|
||||||
ld.return_value = ['a', 'b', 'c']
|
ld.return_value = ['a', 'b', 'c']
|
||||||
isf.return_value = True
|
isf.return_value = True
|
||||||
self.assertEqual(250000*3, rm._get_directory_size())
|
with mock.patch.object(rm, "_do_roll") as dr:
|
||||||
ld.return_value = ['a', 'b', 'c', 'd', 'e', 'f']
|
rm._archive_working_files()
|
||||||
self.assertEqual(250000*6, rm._get_directory_size())
|
self.assertEqual(dr.call_count, 3)
|
||||||
|
|
||||||
@mock.patch(
|
@mock.patch(
|
||||||
"shoebox.roll_manager.WritingJSONRollManager._get_directory_size")
|
"shoebox.roll_manager.WritingJSONRollManager._archive_working_files")
|
||||||
def test_should_tar(self, gds):
|
def test_should_roll(self, awf):
|
||||||
gds.return_value = 1000
|
|
||||||
rm = roll_manager.WritingJSONRollManager("template.foo")
|
rm = roll_manager.WritingJSONRollManager("template.foo")
|
||||||
rm.directory_size = 9 * 1048576
|
|
||||||
rm.roll_size_mb = 10
|
rm.roll_size_mb = 10
|
||||||
self.assertFalse(rm._should_tar())
|
self.assertFalse(rm._should_roll(9*1048576))
|
||||||
rm.directory_size = 10 * 1048576
|
self.assertTrue(rm._should_roll(10*1048576))
|
||||||
rm.roll_size_mb = 10
|
|
||||||
self.assertTrue(rm._should_tar())
|
rm = roll_manager.WritingJSONRollManager("template.foo", roll_minutes=10)
|
||||||
|
self.assertFalse(rm._should_roll(0))
|
||||||
|
self.assertFalse(rm._should_roll(1))
|
||||||
|
with mock.patch.object(rm, "_get_time") as gt:
|
||||||
|
gt.return_value = rm.start_time + datetime.timedelta(minutes=11)
|
||||||
|
self.assertTrue(rm._should_roll(1))
|
||||||
|
|
||||||
@mock.patch('os.listdir')
|
|
||||||
@mock.patch('os.remove')
|
@mock.patch('os.remove')
|
||||||
@mock.patch('os.path.isfile')
|
|
||||||
@mock.patch(
|
@mock.patch(
|
||||||
"shoebox.roll_manager.WritingJSONRollManager._get_directory_size")
|
"shoebox.roll_manager.WritingJSONRollManager._archive_working_files")
|
||||||
def test_clean_working_directory(self, gds, isf, rem, ld):
|
def test_clean_working_directory(self, awf, rem):
|
||||||
gds.return_value = 1000
|
|
||||||
isf.return_value = True
|
|
||||||
rm = roll_manager.WritingJSONRollManager("template.foo")
|
rm = roll_manager.WritingJSONRollManager("template.foo")
|
||||||
ld.return_value = ['a', 'b', 'c']
|
rm._clean_working_directory("foo")
|
||||||
rm._clean_working_directory()
|
self.assertEqual(1, rem.call_count)
|
||||||
self.assertEqual(3, rem.call_count)
|
|
||||||
|
|
||||||
@mock.patch('os.listdir')
|
|
||||||
@mock.patch('tarfile.open')
|
|
||||||
@mock.patch('os.path.isfile')
|
|
||||||
@mock.patch(
|
@mock.patch(
|
||||||
"shoebox.roll_manager.WritingJSONRollManager._get_directory_size")
|
"shoebox.roll_manager.WritingJSONRollManager._archive_working_files")
|
||||||
def test_tar_directory(self, gds, isf, to, ld):
|
def test_tar_working_file(self, awf):
|
||||||
gds.return_value = 1000
|
|
||||||
ld.return_value = ['a', 'b', 'c']
|
|
||||||
isf.return_value = True
|
|
||||||
gds = 1000
|
|
||||||
rm = roll_manager.WritingJSONRollManager("template.foo")
|
rm = roll_manager.WritingJSONRollManager("template.foo")
|
||||||
|
|
||||||
open_name = '%s.open' % roll_manager.__name__
|
with mock.patch.object(rm, "_get_file_sha") as gfs:
|
||||||
with mock.patch(open_name, create=True) as mock_open:
|
gfs.return_value = "aabbcc"
|
||||||
mock_open.return_value = mock.MagicMock(spec=file)
|
with mock.patch.object(roll_manager.tarfile, 'open') as tar:
|
||||||
|
tar.return_value = mock.MagicMock()
|
||||||
rm._tar_directory()
|
rm._tar_working_file("foo")
|
||||||
self.assertTrue(to.called)
|
self.assertTrue(tar.called)
|
||||||
|
|
||||||
@mock.patch(
|
@mock.patch(
|
||||||
"shoebox.roll_manager.WritingJSONRollManager._get_directory_size")
|
"shoebox.roll_manager.WritingJSONRollManager._archive_working_files")
|
||||||
def test_write(self, gds):
|
def test_write(self, awf):
|
||||||
gds.return_value = 0
|
|
||||||
rm = roll_manager.WritingJSONRollManager("template.foo")
|
rm = roll_manager.WritingJSONRollManager("template.foo")
|
||||||
payload = "some big payload"
|
payload = "some big payload"
|
||||||
open_name = '%s.open' % roll_manager.__name__
|
with mock.patch.object(rm, "_get_handle") as gh:
|
||||||
with mock.patch(open_name, create=True) as mock_open:
|
with mock.patch.object(rm, "_should_roll") as sr:
|
||||||
with mock.patch.object(rm, "_should_tar") as st:
|
with mock.patch.object(rm, "_do_roll") as dr:
|
||||||
with mock.patch.object(rm, "_tar_directory") as td:
|
sr.return_value = False
|
||||||
st.return_value = False
|
gh.return_value = mock.MagicMock()
|
||||||
mock_open.return_value = mock.MagicMock(spec=file)
|
|
||||||
rm.write("metadata", payload)
|
rm.write("metadata", payload)
|
||||||
self.assertTrue(mock_open.called_once_with(
|
self.assertFalse(dr.called)
|
||||||
"template.foo", "wb"))
|
self.assertEqual(rm.size, len(payload))
|
||||||
self.assertFalse(td.called)
|
|
||||||
self.assertEqual(rm.directory_size, len(payload))
|
@mock.patch(
|
||||||
|
"shoebox.roll_manager.WritingJSONRollManager._archive_working_files")
|
||||||
|
def test_get_file_sha(self, awf):
|
||||||
|
rm = roll_manager.WritingJSONRollManager("template.foo")
|
||||||
|
with mock.patch.object(roll_manager.hashlib, "sha256") as sha:
|
||||||
|
sha_obj = mock.MagicMock()
|
||||||
|
sha.return_value = sha_obj
|
||||||
|
hexdigest = mock.MagicMock()
|
||||||
|
hexdigest.return_value = "aabbcc"
|
||||||
|
sha_obj.hexdigest = hexdigest
|
||||||
|
open_name = '%s.open' % roll_manager.__name__
|
||||||
|
with mock.patch(open_name, create=True) as mock_open:
|
||||||
|
handle = mock.MagicMock()
|
||||||
|
mock_open.return_value = handle
|
||||||
|
data = mock.MagicMock()
|
||||||
|
handle.read = data
|
||||||
|
data.side_effect = ["a", "b", "c", False]
|
||||||
|
self.assertEqual("aabbcc", rm._get_file_sha('foo'))
|
||||||
|
|
||||||
|
@mock.patch(
|
||||||
|
"shoebox.roll_manager.WritingJSONRollManager._archive_working_files")
|
||||||
|
def test_get_handle(self, awf):
|
||||||
|
rm = roll_manager.WritingJSONRollManager("template.foo")
|
||||||
|
rm.handle = "abc"
|
||||||
|
self.assertEqual("abc", rm._get_handle())
|
||||||
|
|
||||||
|
with mock.patch.object(rm, "_make_filename") as mf:
|
||||||
|
mf.return_value = "foo"
|
||||||
|
open_name = '%s.open' % roll_manager.__name__
|
||||||
|
with mock.patch(open_name, create=True) as mock_open:
|
||||||
|
handle = mock.MagicMock()
|
||||||
|
mock_open.return_value = handle
|
||||||
|
rm.handle = None
|
||||||
|
self.assertEqual(handle, rm._get_handle())
|
||||||
|
Loading…
x
Reference in New Issue
Block a user