diff --git a/shoebox/archive.py b/shoebox/archive.py index bbc1d6c..2e73cd3 100644 --- a/shoebox/archive.py +++ b/shoebox/archive.py @@ -24,6 +24,9 @@ class Archive(object): def get_file_handle(self): # pragma: no cover return self._handle + def close(self): + self._handle.close() + class ArchiveWriter(Archive): """The active Archive for appending. diff --git a/shoebox/roll_checker.py b/shoebox/roll_checker.py index 784d777..1ec0746 100644 --- a/shoebox/roll_checker.py +++ b/shoebox/roll_checker.py @@ -44,9 +44,9 @@ class TimeRollChecker(RollChecker): class SizeRollChecker(RollChecker): - def __init__(self, size_in_gb): - self.size_in_gb = size_in_gb + def __init__(self, size_in_mb): + self.size_in_mb = size_in_mb def check(self, archive): size = archive.get_file_handle().tell() - return size / 1073741824 >= self.size_in_gb + return (size / 1048576) >= self.size_in_mb diff --git a/shoebox/roll_manager.py b/shoebox/roll_manager.py index 9578a6b..befb0a0 100644 --- a/shoebox/roll_manager.py +++ b/shoebox/roll_manager.py @@ -19,22 +19,39 @@ import archive import utils +class ArchiveCallback(object): + def on_open(self, filename): + """Called when an Archive is opened.""" + pass + + def on_close(self, filename): + """Called when an Archive is closed.""" + pass + + class RollManager(object): - def __init__(self, filename_template, roll_checker, directory="."): + def __init__(self, filename_template, roll_checker, directory=".", + archive_class=None, archive_callback=None): self.filename_template = filename_template self.roll_checker = roll_checker self.directory = directory self.active_archive = None + self.archive_class = archive_class + self.active_filename = None + self.archive_callback = archive_callback def _make_filename(self): f = utils.now().strftime(self.filename_template) f = f.replace(" ", "_") + f = f.replace("/", "_") return os.path.join(self.directory, f) def get_active_archive(self): if not self.active_archive: - filename = self._make_filename() - self.active_archive = self.archive_class(filename) + self.active_filename = self._make_filename() + self.active_archive = self.archive_class(self.active_filename) + if self.archive_callback: + self.archive_callback.on_open(self.active_filename) self.roll_checker.start(self.active_archive) return self.active_archive @@ -43,15 +60,27 @@ class RollManager(object): return self.roll_checker.check(self.active_archive) def _roll_archive(self): - pass + self.close() + self.get_active_archive() + + def close(self): + if self.active_archive: + self.active_archive.close() + if self.archive_callback: + self.archive_callback.on_close(self.active_filename) + self.active_archive = None + self.active_filename = None class ReadingRollManager(RollManager): def __init__(self, filename_template, roll_checker, directory=".", - archive_class = archive.ArchiveReader): + archive_class = archive.ArchiveReader, + archive_callback=None): super(ReadingRollManager, self).__init__(filename_template, - roll_checker, directory) - self.archive_class = archive_class + roll_checker, + directory=directory, + archive_callback=event_callback, + archive_class=archive_class) def read(self): pass @@ -59,10 +88,14 @@ class ReadingRollManager(RollManager): class WritingRollManager(RollManager): def __init__(self, filename_template, roll_checker, directory=".", - archive_class = archive.ArchiveWriter): - super(WritingRollManager, self).__init__(filename_template, - roll_checker, directory) - self.archive_class = archive_class + archive_class=archive.ArchiveWriter, + archive_callback=None): + super(WritingRollManager, self).__init__( + filename_template, + roll_checker, + directory=directory, + archive_callback=archive_callback, + archive_class=archive_class) def write(self, metadata, payload): """metadata is string:string dict. diff --git a/test/integration/gen_events.py b/test/integration/gen_events.py index 90fecb9..a8eab9c 100644 --- a/test/integration/gen_events.py +++ b/test/integration/gen_events.py @@ -100,8 +100,8 @@ class EventGenerator(object): # An operation will happen every so many milliseconds to # get our operations/sec. We call this a Tick. self.millisecond_per_tick = 1000.0 / float(operations_per_second) - print "Operation every %d ms (%.1f/sec)" % (self.millisecond_per_tick, - operations_per_second) + #print "Operation every %d ms (%.1f/sec)" % (self.millisecond_per_tick, + # operations_per_second) self.next_events = [] # priority queue self.instances_in_use = set() @@ -129,15 +129,16 @@ class EventGenerator(object): if idx == 0: uuid = event['uuid'][-4:] request = event['request_id'][-4:] - if event['is_create']: - print "CREATE:", - if event['is_delete']: - print "DELETE:", - if event['is_update']: - print "UPDATE:", - print "U:%s R:%s" % (uuid, request), - print "(%d of %d)" % (len(self.instances_in_use), \ - len(self.instances)) + if False: + if event['is_create']: + print "CREATE:", + if event['is_delete']: + print "DELETE:", + if event['is_update']: + print "UPDATE:", + print "U:%s R:%s" % (uuid, request), + print "(%d of %d)" % (len(self.instances_in_use), \ + len(self.instances)) # (when, event, is_first_event, is_last_event) heapq.heappush(self.next_events, (when, event, idx==0, idx==len(action)-1)) @@ -161,7 +162,7 @@ class EventGenerator(object): self.instances_in_use.add(uuid) elif event['is_delete']: self.instances_in_use.remove(uuid) - print "%s %40s U:%4s" % (' ' * 20, event['event'], uuid[-4:]) + #print "%s %40s U:%4s" % (' ' * 20, event['event'], uuid[-4:]) ready.append(event) def _get_action(self, now): diff --git a/test/integration/test_rolling.py b/test/integration/test_rolling.py index 7f89601..260bf47 100644 --- a/test/integration/test_rolling.py +++ b/test/integration/test_rolling.py @@ -17,26 +17,44 @@ import test.integration.gen_events as egen TEMPDIR = "test_temp" +class ArchiveCallback(object): + def __init__(self, active_files): + self.active_files = active_files + + def on_open(self, filename): + self.active_files.add(filename) + print "Opened:", filename + + def on_close(self, filename): + self.active_files.remove(filename) + print "Closed:", filename + + class TestSizeRolling(unittest.TestCase): def setUp(self): shutil.rmtree(TEMPDIR, ignore_errors=True) os.mkdir(TEMPDIR) def tearDown(self): - shutil.rmtree(TEMPDIR) + # shutil.rmtree(TEMPDIR) + pass def test_size_rolling(self): + active_files = set() + callback = ArchiveCallback(active_files) + checker = roll_checker.SizeRollChecker(1) - manager = roll_manager.WritingRollManager("test_%c.events", + manager = roll_manager.WritingRollManager("test_%Y_%m_%d_%f.events", checker, - TEMPDIR) + TEMPDIR, + archive_callback=callback) + g = egen.EventGenerator(6000) - nevents = 0 + entries = [] now = datetime.datetime.utcnow() - while nevents < 1000: + while len(entries) < 10000: events = g.generate(now) if events: - nevents += len(events) for event in events: metadata = {'event': event['event'], 'request_id': event['request_id'], @@ -46,6 +64,10 @@ class TestSizeRolling(unittest.TestCase): json_event = json.dumps(event, cls=utils.DateTimeEncoder) manager.write(metadata, json_event) + entries.append((metadata, json_event)) now = g.move_to_next_tick(now) + manager.close() + + raise Exception("Boom") diff --git a/test/test_roll_checker.py b/test/test_roll_checker.py index 0289092..ce304e3 100644 --- a/test/test_roll_checker.py +++ b/test/test_roll_checker.py @@ -36,15 +36,15 @@ class TestRollChecker(unittest.TestCase): self.assertFalse(x.check(None)) def test_size_roll_checker_end(self): - one_gig = 1073741824 + one_mb = 1048576 x = roll_checker.SizeRollChecker(10) archive = mock.Mock() - archive.get_file_handle.return_value.tell.return_value = one_gig * 5 + archive.get_file_handle.return_value.tell.return_value = one_mb * 5 self.assertFalse(x.check(archive)) - archive.get_file_handle.return_value.tell.return_value = one_gig * 10 + archive.get_file_handle.return_value.tell.return_value = one_mb * 10 self.assertTrue(x.check(archive)) - archive.get_file_handle.return_value.tell.return_value = one_gig * 11 + archive.get_file_handle.return_value.tell.return_value = one_mb * 11 self.assertTrue(x.check(archive)) diff --git a/test/test_roll_manager.py b/test/test_roll_manager.py index d1bd4e5..d9806a5 100644 --- a/test/test_roll_manager.py +++ b/test/test_roll_manager.py @@ -9,6 +9,15 @@ from shoebox import roll_manager from shoebox import utils +class FakeArchive(object): + def __init__(self, filename): + self.filename = filename + self.data = [] + + def write(self, metadata, payload): + self.data.append((metadata, payload)) + + class TestRollManager(unittest.TestCase): def test_make_filename(self): now = datetime.datetime(day=1, month=2, year=2014, @@ -21,26 +30,32 @@ class TestRollManager(unittest.TestCase): self.assertEqual(filename, "./filename_Sat_Feb__1_10:11:12_2014.dat") + def test_get_active_archive(self): + checker = mock.Mock() + callback = mock.Mock() + filename_template = "filename_%c.dat" + x = roll_manager.RollManager(filename_template, checker, + archive_callback=callback, + archive_class=FakeArchive) + with mock.patch("shoebox.archive.ArchiveWriter._open_file") as of: + arc = x.get_active_archive() + self.assertTrue(checker.start.called) + self.assertTrue(callback.on_open.called) -class FakeArchive(object): - def __init__(self, filename): - self.filename = filename - self.data = [] - - def write(self, metadata, payload): - self.data.append((metadata, payload)) + def test_close(self): + callback = mock.Mock() + checker = mock.Mock() + x = roll_manager.RollManager("template", checker, + archive_callback=callback) + x.active_archive = mock.Mock() + x.active_filename = "foo" + x.close() + self.assertIsNone(x.active_archive) + self.assertIsNone(x.active_filename) + self.assertTrue(callback.on_close.called) class TestWritingRollManager(unittest.TestCase): - def test_get_active_archive(self): - checker = mock.Mock() - filename_template = "filename_%c.dat" - x = roll_manager.WritingRollManager(filename_template, checker) - with mock.patch("shoebox.archive.ArchiveWriter._open_file") as of: - arc = x.get_active_archive() - self.assertTrue(isinstance(arc, archive.ArchiveWriter)) - self.assertTrue(checker.start.called) - def test_write_always_roll(self): checker = mock.Mock() checker.check.return_value = True @@ -59,6 +74,14 @@ class TestWritingRollManager(unittest.TestCase): x.write({}, "payload") self.assertFalse(ra.called) + def test_get_active_archive(self): + checker = mock.Mock() + filename_template = "filename_%c.dat" + x = roll_manager.WritingRollManager(filename_template, checker) + with mock.patch("shoebox.archive.ArchiveWriter._open_file") as of: + arc = x.get_active_archive() + self.assertTrue(isinstance(arc, archive.ArchiveWriter)) + class TestWriting(unittest.TestCase): def test_write(self):