use in-memory count for directory size.
Only check the directory size when we first start the RollManager. From then on, incrementally add the payload size. This will result in a difference between on-disk size vs. file sizes. But, it's much faster than checking every N times ... those stats() checks are very expensive. Change-Id: I71b1455db0ea073a4f563f115a693027de50638b
This commit is contained in:
parent
5f91c87739
commit
aecc00d485
@ -153,7 +153,13 @@ class WritingJSONRollManager(object):
|
||||
self.directory = kwargs.get('directory', '.')
|
||||
self.destination_directory = kwargs.get('destination_directory', '.')
|
||||
self.roll_size_mb = int(kwargs.get('roll_size_mb', 1000))
|
||||
self.check_delay = 0 # Only check directory size every N events.
|
||||
|
||||
# Read the directory at the start, but incrementally
|
||||
# update the size as we write more files. Doing a full
|
||||
# disk stat every time is way too expensive.
|
||||
# Of course, this means we are not accounting for
|
||||
# block size.
|
||||
self.directory_size = self._get_directory_size()
|
||||
|
||||
def _make_filename(self, crc, prefix):
|
||||
now = notification_utils.now()
|
||||
@ -167,19 +173,22 @@ class WritingJSONRollManager(object):
|
||||
return os.path.join(prefix, f)
|
||||
|
||||
def _should_tar(self):
|
||||
return (self.directory_size / 1048576) >= self.roll_size_mb
|
||||
|
||||
def _get_directory_size(self):
|
||||
size = 0
|
||||
for f in os.listdir(self.directory):
|
||||
full = os.path.join(self.directory, f)
|
||||
if os.path.isfile(full):
|
||||
size += os.path.getsize(full)
|
||||
|
||||
return (size / 1048576) >= self.roll_size_mb
|
||||
return size
|
||||
|
||||
def _tar_directory(self):
|
||||
# tar all the files in working directory into an archive
|
||||
# in destination_directory.
|
||||
crc = "archive"
|
||||
filename = self._make_filename(crc, self.destination_directory) + ".tar.gz"
|
||||
filename = self._make_filename(crc, self.destination_directory) + \
|
||||
".tar.gz"
|
||||
|
||||
# No contextmgr for tarfile in 2.6 :(
|
||||
tar = tarfile.open(filename, "w:gz")
|
||||
@ -194,13 +203,7 @@ class WritingJSONRollManager(object):
|
||||
full = os.path.join(self.directory, f)
|
||||
if os.path.isfile(full):
|
||||
os.remove(full)
|
||||
|
||||
def _delay_check(self):
|
||||
self.check_delay += 1
|
||||
if self.check_delay > 250:
|
||||
self.check_delay = 0
|
||||
return False
|
||||
return True
|
||||
self.directory_size = self._get_directory_size()
|
||||
|
||||
def write(self, metadata, json_payload):
|
||||
# Metadata is ignored.
|
||||
@ -209,8 +212,7 @@ class WritingJSONRollManager(object):
|
||||
with open(filename, "w") as f:
|
||||
f.write(json_payload)
|
||||
|
||||
if self._delay_check():
|
||||
return
|
||||
self.directory_size += len(json_payload)
|
||||
|
||||
if not self._should_tar():
|
||||
return
|
||||
@ -218,6 +220,5 @@ class WritingJSONRollManager(object):
|
||||
self._tar_directory()
|
||||
self._clean_working_directory()
|
||||
|
||||
|
||||
def close(self):
|
||||
pass
|
||||
|
83
test/integration/test_json_tarball.py
Normal file
83
test/integration/test_json_tarball.py
Normal file
@ -0,0 +1,83 @@
|
||||
import datetime
|
||||
import hashlib
|
||||
import json
|
||||
import mock
|
||||
import os
|
||||
import shutil
|
||||
import tarfile
|
||||
import unittest
|
||||
|
||||
import notification_utils
|
||||
import notigen
|
||||
|
||||
from shoebox import roll_manager
|
||||
|
||||
|
||||
TEMPDIR = "test_temp"
|
||||
DESTDIR = "test_temp/output"
|
||||
|
||||
|
||||
class TestDirectory(unittest.TestCase):
|
||||
def setUp(self):
|
||||
shutil.rmtree(TEMPDIR, ignore_errors=True)
|
||||
shutil.rmtree(DESTDIR, ignore_errors=True)
|
||||
os.mkdir(TEMPDIR)
|
||||
os.mkdir(DESTDIR)
|
||||
|
||||
def test_size_rolling(self):
|
||||
manager = roll_manager.WritingJSONRollManager(
|
||||
"%Y_%m_%d_%X_%f_[[CRC]].event",
|
||||
directory=TEMPDIR,
|
||||
destination_directory=DESTDIR,
|
||||
roll_size_mb=10)
|
||||
|
||||
g = notigen.EventGenerator("test/integration/templates")
|
||||
entries = {}
|
||||
now = datetime.datetime.utcnow()
|
||||
while len(entries) < 10000:
|
||||
events = g.generate(now)
|
||||
if events:
|
||||
for event in events:
|
||||
metadata = {}
|
||||
json_event = json.dumps(event,
|
||||
cls=notification_utils.DateTimeEncoder)
|
||||
manager.write(metadata, json_event)
|
||||
crc = hashlib.sha256(json_event).hexdigest()
|
||||
entries[crc] = json_event
|
||||
|
||||
now = g.move_to_next_tick(now)
|
||||
manager.close()
|
||||
|
||||
# Confirm files and tarballs ...
|
||||
print "Starting entries:", len(entries)
|
||||
date_len = len("2015_02_24_14_15_58_037080_")
|
||||
num = 0
|
||||
for f in os.listdir(TEMPDIR):
|
||||
full = os.path.join(TEMPDIR, f)
|
||||
if os.path.isfile(full):
|
||||
crc = f[date_len:-len(".event")]
|
||||
del entries[crc]
|
||||
num += 1
|
||||
print "Untarred entries:", num, "Remaining:", len(entries)
|
||||
|
||||
# the rest have to be in tarballs ...
|
||||
for f in os.listdir(DESTDIR):
|
||||
num = 0
|
||||
actual = 0
|
||||
tar = tarfile.open(os.path.join(DESTDIR, f), "r:gz")
|
||||
for tarinfo in tar:
|
||||
crc = tarinfo.name[len(TEMPDIR) + 1 + date_len:-len(".event")]
|
||||
actual += 1
|
||||
if crc:
|
||||
del entries[crc]
|
||||
num += 1
|
||||
|
||||
if actual == 1:
|
||||
raise Exception("tarball has 1 entry. Something is wrong.")
|
||||
|
||||
print "In %s: %d of %d Remaining: %d" % (f, num, actual,
|
||||
len(entries))
|
||||
tar.close()
|
||||
|
||||
if len(entries):
|
||||
raise Exception("%d more events than generated." % len(entries))
|
@ -95,7 +95,10 @@ class TestWriting(unittest.TestCase):
|
||||
|
||||
|
||||
class TestJSONRollManager(unittest.TestCase):
|
||||
def test_make_filename(self):
|
||||
@mock.patch(
|
||||
"shoebox.roll_manager.WritingJSONRollManager._get_directory_size")
|
||||
def test_make_filename(self, gds):
|
||||
gds.return_value = 1000
|
||||
now = datetime.datetime(day=1, month=2, year=2014,
|
||||
hour=10, minute=11, second=12)
|
||||
with mock.patch.object(notification_utils, "now") as dt:
|
||||
@ -110,20 +113,34 @@ class TestJSONRollManager(unittest.TestCase):
|
||||
@mock.patch('os.path.getsize')
|
||||
@mock.patch('os.listdir')
|
||||
@mock.patch('os.path.isfile')
|
||||
def test_should_tar(self, isf, ld, gs):
|
||||
def test_get_directory_size(self, isf, ld, gs):
|
||||
rm = roll_manager.WritingJSONRollManager("template.foo")
|
||||
gs.return_value = 250000
|
||||
ld.return_value = ['a', 'b', 'c']
|
||||
isf.return_value = True
|
||||
rm.roll_size_mb = 1
|
||||
self.assertFalse(rm._should_tar())
|
||||
self.assertEqual(250000*3, rm._get_directory_size())
|
||||
ld.return_value = ['a', 'b', 'c', 'd', 'e', 'f']
|
||||
self.assertEqual(250000*6, rm._get_directory_size())
|
||||
|
||||
@mock.patch(
|
||||
"shoebox.roll_manager.WritingJSONRollManager._get_directory_size")
|
||||
def test_should_tar(self, gds):
|
||||
gds.return_value = 1000
|
||||
rm = roll_manager.WritingJSONRollManager("template.foo")
|
||||
rm.directory_size = 9 * 1048576
|
||||
rm.roll_size_mb = 10
|
||||
self.assertFalse(rm._should_tar())
|
||||
rm.directory_size = 10 * 1048576
|
||||
rm.roll_size_mb = 10
|
||||
self.assertTrue(rm._should_tar())
|
||||
|
||||
@mock.patch('os.listdir')
|
||||
@mock.patch('os.remove')
|
||||
@mock.patch('os.path.isfile')
|
||||
def test_clean_working_directory(self, isf, rem, ld):
|
||||
@mock.patch(
|
||||
"shoebox.roll_manager.WritingJSONRollManager._get_directory_size")
|
||||
def test_clean_working_directory(self, gds, isf, rem, ld):
|
||||
gds.return_value = 1000
|
||||
isf.return_value = True
|
||||
rm = roll_manager.WritingJSONRollManager("template.foo")
|
||||
ld.return_value = ['a', 'b', 'c']
|
||||
@ -133,9 +150,13 @@ class TestJSONRollManager(unittest.TestCase):
|
||||
@mock.patch('os.listdir')
|
||||
@mock.patch('tarfile.open')
|
||||
@mock.patch('os.path.isfile')
|
||||
def test_tar_directory(self, isf, to, ld):
|
||||
@mock.patch(
|
||||
"shoebox.roll_manager.WritingJSONRollManager._get_directory_size")
|
||||
def test_tar_directory(self, gds, isf, to, ld):
|
||||
gds.return_value = 1000
|
||||
ld.return_value = ['a', 'b', 'c']
|
||||
isf.return_value = True
|
||||
gds = 1000
|
||||
rm = roll_manager.WritingJSONRollManager("template.foo")
|
||||
|
||||
open_name = '%s.open' % roll_manager.__name__
|
||||
@ -145,7 +166,10 @@ class TestJSONRollManager(unittest.TestCase):
|
||||
rm._tar_directory()
|
||||
self.assertTrue(to.called)
|
||||
|
||||
def test_write(self):
|
||||
@mock.patch(
|
||||
"shoebox.roll_manager.WritingJSONRollManager._get_directory_size")
|
||||
def test_write(self, gds):
|
||||
gds.return_value = 0
|
||||
rm = roll_manager.WritingJSONRollManager("template.foo")
|
||||
payload = "some big payload"
|
||||
open_name = '%s.open' % roll_manager.__name__
|
||||
@ -158,3 +182,4 @@ class TestJSONRollManager(unittest.TestCase):
|
||||
self.assertTrue(mock_open.called_once_with(
|
||||
"template.foo", "wb"))
|
||||
self.assertFalse(td.called)
|
||||
self.assertEqual(rm.directory_size, len(payload))
|
||||
|
Loading…
x
Reference in New Issue
Block a user