reading works and integration end-to-end tests in place
This commit is contained in:
parent
2bb11b0d31
commit
12878c8ce3
@ -50,6 +50,12 @@ class ArchiveReader(Archive):
|
||||
"""
|
||||
def __init__(self, filename):
|
||||
super(ArchiveReader, self).__init__(filename)
|
||||
self._open_file(filename)
|
||||
|
||||
def _open_file(self, filename):
|
||||
# Broken out for testing.
|
||||
self._handle = open(filename, "rb")
|
||||
|
||||
def read(self):
|
||||
pass
|
||||
# (metadata, payload)
|
||||
return disk_storage.unpack_notification(self._handle)
|
||||
|
@ -12,6 +12,10 @@ class OutOfSync(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class EndOfFile(Exception):
|
||||
pass
|
||||
|
||||
|
||||
BOR_MAGIC_NUMBER = 0x69867884
|
||||
|
||||
|
||||
@ -27,10 +31,14 @@ class Version0(object):
|
||||
def make_preamble(self, version):
|
||||
return struct.pack(self.preamble_schema, BOR_MAGIC_NUMBER, version)
|
||||
|
||||
def _check_eof(self, expected, actual):
|
||||
if actual < expected:
|
||||
raise EndOfFile()
|
||||
|
||||
def load_preamble(self, file_handle):
|
||||
raw = file_handle.read(self.preamble_size)
|
||||
self._check_eof(self.preamble_size, len(raw))
|
||||
header = struct.unpack(self.preamble_schema, raw)
|
||||
print "raw", raw
|
||||
if header[0] != BOR_MAGIC_NUMBER:
|
||||
raise OutOfSync("Expected Beginning of Record marker")
|
||||
return header[1]
|
||||
@ -106,12 +114,14 @@ class Version1(Version0):
|
||||
|
||||
def unpack(self, file_handle):
|
||||
header_bytes = file_handle.read(self.header_size)
|
||||
self._check_eof(self.header_size, len(header_bytes))
|
||||
header = struct.unpack(self.header_schema, header_bytes)
|
||||
|
||||
if header[2] != 0:
|
||||
raise OutOfSync("Didn't find 0 EOR marker.")
|
||||
|
||||
metadata_bytes = file_handle.read(header[0])
|
||||
self._check_eof(header[0], len(metadata_bytes))
|
||||
num_strings = struct.unpack_from("i", metadata_bytes)
|
||||
offset = struct.calcsize("i")
|
||||
lengths = num_strings[0] / 2
|
||||
@ -127,12 +137,11 @@ class Version1(Version0):
|
||||
for n in range(len(key_values))[::2])
|
||||
|
||||
raw = file_handle.read(header[1])
|
||||
self._check_eof(header[1], len(raw))
|
||||
raw_len = struct.unpack_from("i", raw)
|
||||
offset = struct.calcsize("i")
|
||||
raw_json = struct.unpack_from("%ds" % raw_len[0], raw, offset=offset)
|
||||
notification = json.loads(raw_json[0])
|
||||
|
||||
return (metadata, notification)
|
||||
jnot = struct.unpack_from("%ds" % raw_len[0], raw, offset=offset)
|
||||
return (metadata, jnot[0])
|
||||
|
||||
|
||||
VERSIONS = {1: Version1()}
|
||||
|
@ -13,12 +13,23 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import fnmatch
|
||||
import os
|
||||
import os.path
|
||||
|
||||
import archive
|
||||
import disk_storage
|
||||
import utils
|
||||
|
||||
|
||||
class NoMoreFiles(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class NoValidFile(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class ArchiveCallback(object):
|
||||
def on_open(self, filename):
|
||||
"""Called when an Archive is opened."""
|
||||
@ -30,35 +41,15 @@ class ArchiveCallback(object):
|
||||
|
||||
|
||||
class RollManager(object):
|
||||
def __init__(self, filename_template, roll_checker, directory=".",
|
||||
def __init__(self, filename_template, directory=".",
|
||||
archive_class=None, archive_callback=None):
|
||||
self.filename_template = filename_template
|
||||
self.roll_checker = roll_checker
|
||||
self.directory = directory
|
||||
self.active_archive = None
|
||||
self.archive_class = archive_class
|
||||
self.active_filename = None
|
||||
self.archive_callback = archive_callback
|
||||
|
||||
def _make_filename(self):
|
||||
f = utils.now().strftime(self.filename_template)
|
||||
f = f.replace(" ", "_")
|
||||
f = f.replace("/", "_")
|
||||
return os.path.join(self.directory, f)
|
||||
|
||||
def get_active_archive(self):
|
||||
if not self.active_archive:
|
||||
self.active_filename = self._make_filename()
|
||||
self.active_archive = self.archive_class(self.active_filename)
|
||||
if self.archive_callback:
|
||||
self.archive_callback.on_open(self.active_filename)
|
||||
self.roll_checker.start(self.active_archive)
|
||||
|
||||
return self.active_archive
|
||||
|
||||
def _should_roll_archive(self):
|
||||
return self.roll_checker.check(self.active_archive)
|
||||
|
||||
def _roll_archive(self):
|
||||
self.close()
|
||||
self.get_active_archive()
|
||||
@ -73,17 +64,42 @@ class RollManager(object):
|
||||
|
||||
|
||||
class ReadingRollManager(RollManager):
|
||||
def __init__(self, filename_template, roll_checker, directory=".",
|
||||
def __init__(self, filename_template, directory=".",
|
||||
archive_class = archive.ArchiveReader,
|
||||
archive_callback=None):
|
||||
super(ReadingRollManager, self).__init__(filename_template,
|
||||
roll_checker,
|
||||
directory=directory,
|
||||
archive_callback=event_callback,
|
||||
archive_class=archive_class)
|
||||
super(ReadingRollManager, self).__init__(
|
||||
filename_template,
|
||||
directory=directory,
|
||||
archive_callback=archive_callback,
|
||||
archive_class=archive_class)
|
||||
self.files_to_read = self._get_matching_files(directory,
|
||||
filename_template)
|
||||
|
||||
def _get_matching_files(self, directory, filename_template):
|
||||
files = [os.path.join(directory, f)
|
||||
for f in os.listdir(self.directory)
|
||||
if os.path.isfile(os.path.join(directory, f))]
|
||||
return sorted(fnmatch.filter(files, filename_template))
|
||||
|
||||
def read(self):
|
||||
pass
|
||||
# (metadata, payload)
|
||||
for x in range(3): # 3 attempts to file a valid file.
|
||||
a = self.get_active_archive()
|
||||
try:
|
||||
return a.read()
|
||||
except disk_storage.EndOfFile:
|
||||
self._roll_archive()
|
||||
raise NoValidFile("Unable to find a valid file after 3 attempts")
|
||||
|
||||
def get_active_archive(self):
|
||||
if not self.active_archive:
|
||||
if not self.files_to_read:
|
||||
raise NoMoreFiles()
|
||||
self.active_filename = self.files_to_read.pop(0)
|
||||
self.active_archive = self.archive_class(self.active_filename)
|
||||
if self.archive_callback:
|
||||
self.archive_callback.on_open(self.active_filename)
|
||||
return self.active_archive
|
||||
|
||||
|
||||
class WritingRollManager(RollManager):
|
||||
@ -92,10 +108,10 @@ class WritingRollManager(RollManager):
|
||||
archive_callback=None):
|
||||
super(WritingRollManager, self).__init__(
|
||||
filename_template,
|
||||
roll_checker,
|
||||
directory=directory,
|
||||
archive_callback=archive_callback,
|
||||
archive_class=archive_class)
|
||||
self.roll_checker = roll_checker
|
||||
|
||||
def write(self, metadata, payload):
|
||||
"""metadata is string:string dict.
|
||||
@ -105,3 +121,22 @@ class WritingRollManager(RollManager):
|
||||
a.write(metadata, payload)
|
||||
if self._should_roll_archive():
|
||||
self._roll_archive()
|
||||
|
||||
def _make_filename(self):
|
||||
f = utils.now().strftime(self.filename_template)
|
||||
f = f.replace(" ", "_")
|
||||
f = f.replace("/", "_")
|
||||
f = f.replace(":", "_")
|
||||
return os.path.join(self.directory, f)
|
||||
|
||||
def get_active_archive(self):
|
||||
if not self.active_archive:
|
||||
self.active_filename = self._make_filename()
|
||||
self.active_archive = self.archive_class(self.active_filename)
|
||||
if self.archive_callback:
|
||||
self.archive_callback.on_open(self.active_filename)
|
||||
self.roll_checker.start(self.active_archive)
|
||||
return self.active_archive
|
||||
|
||||
def _should_roll_archive(self):
|
||||
return self.roll_checker.check(self.active_archive)
|
||||
|
@ -14,6 +14,7 @@
|
||||
# limitations under the License.
|
||||
|
||||
import calendar
|
||||
import collections
|
||||
import datetime
|
||||
import decimal
|
||||
import json
|
||||
@ -48,3 +49,17 @@ class DateTimeEncoder(json.JSONEncoder):
|
||||
obj = obj - obj.utcoffset()
|
||||
return str(dt_to_decimal(obj))
|
||||
return super(DateTimeEncoder, self).default(obj)
|
||||
|
||||
|
||||
# This is a hack for comparing structures load'ed from json
|
||||
# (which are always unicode) back to strings. It's used
|
||||
# for assertEqual() in the tests and is very slow and expensive.
|
||||
def unicode_to_string(data):
|
||||
if isinstance(data, basestring):
|
||||
return str(data)
|
||||
elif isinstance(data, collections.Mapping):
|
||||
return dict(map(unicode_to_string, data.iteritems()))
|
||||
elif isinstance(data, collections.Iterable):
|
||||
return type(data)(map(unicode_to_string, data))
|
||||
else:
|
||||
return data
|
||||
|
@ -18,16 +18,30 @@ TEMPDIR = "test_temp"
|
||||
|
||||
|
||||
class ArchiveCallback(object):
|
||||
def __init__(self, active_files):
|
||||
self.active_files = active_files
|
||||
def __init__(self):
|
||||
self.active_files = {}
|
||||
self.ordered_files = []
|
||||
|
||||
def on_open(self, filename):
|
||||
self.active_files.add(filename)
|
||||
print "Opened:", filename
|
||||
self.active_files[filename] = True
|
||||
self.ordered_files.append(filename)
|
||||
|
||||
def on_close(self, filename):
|
||||
self.active_files.remove(filename)
|
||||
print "Closed:", filename
|
||||
self.active_files[filename] = False
|
||||
|
||||
|
||||
class VerifyArchiveCallback(object):
|
||||
def __init__(self, original_files):
|
||||
self.original_files = original_files
|
||||
|
||||
def on_open(self, filename):
|
||||
o = self.original_files.pop(0)
|
||||
if filename != o:
|
||||
raise Exception("Wrong order: Got %s, Expected %s" %
|
||||
(filename, o))
|
||||
|
||||
def on_close(self, filename):
|
||||
pass
|
||||
|
||||
|
||||
class TestSizeRolling(unittest.TestCase):
|
||||
@ -40,11 +54,10 @@ class TestSizeRolling(unittest.TestCase):
|
||||
pass
|
||||
|
||||
def test_size_rolling(self):
|
||||
active_files = set()
|
||||
callback = ArchiveCallback(active_files)
|
||||
callback = ArchiveCallback()
|
||||
|
||||
checker = roll_checker.SizeRollChecker(1)
|
||||
manager = roll_manager.WritingRollManager("test_%Y_%m_%d_%f.events",
|
||||
manager = roll_manager.WritingRollManager("test_%Y_%m_%d_%X_%f.events",
|
||||
checker,
|
||||
TEMPDIR,
|
||||
archive_callback=callback)
|
||||
@ -69,5 +82,22 @@ class TestSizeRolling(unittest.TestCase):
|
||||
now = g.move_to_next_tick(now)
|
||||
manager.close()
|
||||
|
||||
raise Exception("Boom")
|
||||
for filename, is_open in callback.active_files.items():
|
||||
self.assertFalse(is_open)
|
||||
|
||||
vcallback = VerifyArchiveCallback(callback.ordered_files)
|
||||
manager = roll_manager.ReadingRollManager("test_*.events",
|
||||
TEMPDIR,
|
||||
archive_callback=vcallback)
|
||||
|
||||
while True:
|
||||
try:
|
||||
# By comparing the json'ed version of
|
||||
# the payloads we avoid all the issues
|
||||
# with unicode and datetime->decimal conversions.
|
||||
metadata, jpayload = manager.read()
|
||||
orig_metadata, orig_jpayload = entries.pop(0)
|
||||
self.assertEqual(orig_metadata, metadata)
|
||||
self.assertEqual(orig_jpayload, jpayload)
|
||||
except roll_manager.NoMoreFiles:
|
||||
break
|
||||
|
@ -63,7 +63,8 @@ class TestVersion1(unittest.TestCase):
|
||||
file_handle = mock.Mock()
|
||||
file_handle.read.side_effect = blocks
|
||||
|
||||
m, p = self.v1.unpack(file_handle)
|
||||
m, jp = self.v1.unpack(file_handle)
|
||||
p = json.loads(jp)
|
||||
self.assertEqual(metadata, m)
|
||||
self.assertEqual(payload, p)
|
||||
|
||||
|
@ -19,33 +19,9 @@ class FakeArchive(object):
|
||||
|
||||
|
||||
class TestRollManager(unittest.TestCase):
|
||||
def test_make_filename(self):
|
||||
now = datetime.datetime(day=1, month=2, year=2014,
|
||||
hour=10, minute=11, second=12)
|
||||
x = roll_manager.RollManager("filename_%c.dat", None)
|
||||
|
||||
with mock.patch.object(utils, "now") as dt:
|
||||
dt.return_value = now
|
||||
filename = x._make_filename()
|
||||
self.assertEqual(filename,
|
||||
"./filename_Sat_Feb__1_10:11:12_2014.dat")
|
||||
|
||||
def test_get_active_archive(self):
|
||||
checker = mock.Mock()
|
||||
callback = mock.Mock()
|
||||
filename_template = "filename_%c.dat"
|
||||
x = roll_manager.RollManager(filename_template, checker,
|
||||
archive_callback=callback,
|
||||
archive_class=FakeArchive)
|
||||
with mock.patch("shoebox.archive.ArchiveWriter._open_file") as of:
|
||||
arc = x.get_active_archive()
|
||||
self.assertTrue(checker.start.called)
|
||||
self.assertTrue(callback.on_open.called)
|
||||
|
||||
def test_close(self):
|
||||
callback = mock.Mock()
|
||||
checker = mock.Mock()
|
||||
x = roll_manager.RollManager("template", checker,
|
||||
x = roll_manager.RollManager("template",
|
||||
archive_callback=callback)
|
||||
x.active_archive = mock.Mock()
|
||||
x.active_filename = "foo"
|
||||
@ -74,13 +50,33 @@ class TestWritingRollManager(unittest.TestCase):
|
||||
x.write({}, "payload")
|
||||
self.assertFalse(ra.called)
|
||||
|
||||
def test_correct_archiver(self):
|
||||
x = roll_manager.WritingRollManager("foo", None)
|
||||
print x.archive_class
|
||||
self.assertEqual(x.archive_class, archive.ArchiveWriter)
|
||||
|
||||
def test_get_active_archive(self):
|
||||
checker = mock.Mock()
|
||||
callback = mock.Mock()
|
||||
filename_template = "filename_%c.dat"
|
||||
x = roll_manager.WritingRollManager(filename_template, checker)
|
||||
x = roll_manager.WritingRollManager(filename_template, checker,
|
||||
archive_callback=callback,
|
||||
archive_class=FakeArchive)
|
||||
with mock.patch("shoebox.archive.ArchiveWriter._open_file") as of:
|
||||
arc = x.get_active_archive()
|
||||
self.assertTrue(isinstance(arc, archive.ArchiveWriter))
|
||||
self.assertTrue(checker.start.called)
|
||||
self.assertTrue(callback.on_open.called)
|
||||
|
||||
def test_make_filename(self):
|
||||
now = datetime.datetime(day=1, month=2, year=2014,
|
||||
hour=10, minute=11, second=12)
|
||||
x = roll_manager.WritingRollManager("filename_%c.dat", None)
|
||||
|
||||
with mock.patch.object(utils, "now") as dt:
|
||||
dt.return_value = now
|
||||
filename = x._make_filename()
|
||||
self.assertEqual(filename,
|
||||
"./filename_Sat_Feb__1_10_11_12_2014.dat")
|
||||
|
||||
|
||||
class TestWriting(unittest.TestCase):
|
||||
|
Loading…
x
Reference in New Issue
Block a user