Simple per-event file output.
Records the event as a json payload in a file that may include a CRC of the payload. This is typically used when storing events in something like HDFS. Change-Id: Iaecca2397afa3501e437e27d698f3573760f1ac0
This commit is contained in:
parent
a3e80502f6
commit
852c1853e5
@ -14,6 +14,8 @@
|
||||
# limitations under the License.
|
||||
|
||||
import fnmatch
|
||||
import gzip
|
||||
import hashlib
|
||||
import os
|
||||
import os.path
|
||||
|
||||
@ -31,6 +33,10 @@ class NoValidFile(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class BadWorkingDirectory(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class RollManager(object):
|
||||
def __init__(self, filename_template, directory=".",
|
||||
archive_class=None, archive_callback=None):
|
||||
@ -131,3 +137,34 @@ class WritingRollManager(RollManager):
|
||||
|
||||
def _should_roll_archive(self):
|
||||
return self.roll_checker.check(self.active_archive)
|
||||
|
||||
|
||||
class WritingJSONRollManager(object):
|
||||
"""No archiver. No roll checker. Just write the file locally.
|
||||
Expects an external tool like rsync to move the file.
|
||||
A SHA-256 of the payload may be included in the filename."""
|
||||
def __init__(self, *args, **kwargs):
|
||||
self.filename_template = args[0]
|
||||
self.directory = kwargs.get('directory', '.')
|
||||
if not os.path.isdir(self.directory):
|
||||
raise BadWorkingDirectory("Directory '%s' does not exist" %
|
||||
self.directory)
|
||||
|
||||
def _make_filename(self, crc):
|
||||
now = notification_utils.now()
|
||||
dt = str(notification_utils.dt_to_decimal(now))
|
||||
f = now.strftime(self.filename_template)
|
||||
f = f.replace(" ", "_")
|
||||
f = f.replace("/", "_")
|
||||
f = f.replace(":", "_")
|
||||
f = f.replace("[[CRC]]", crc)
|
||||
f = f.replace("[[TIMESTAMP]]", dt)
|
||||
return os.path.join(self.directory, f)
|
||||
|
||||
def write(self, metadata, json_payload):
|
||||
# Metadata is ignored.
|
||||
crc = hashlib.sha256(json_payload).hexdigest()
|
||||
filename = self._make_filename(crc)
|
||||
f = gzip.open(filename, 'wb')
|
||||
f.write(json_payload)
|
||||
f.close()
|
||||
|
@ -92,3 +92,32 @@ class TestWriting(unittest.TestCase):
|
||||
arc = x.get_active_archive()
|
||||
self.assertEqual(10, len(arc.data))
|
||||
self.assertEqual(({"index": "0"}, "payload_0"), arc.data[0])
|
||||
|
||||
|
||||
class TestJSONRollManager(unittest.TestCase):
|
||||
def test_bad_directory(self):
|
||||
try:
|
||||
roll_manager.WritingJSONRollManager("x", directory="bad_directory")
|
||||
self.fail("Should raise BadWorkingDirectory")
|
||||
except roll_manager.BadWorkingDirectory as e:
|
||||
pass
|
||||
|
||||
|
||||
def test_make_filename(self):
|
||||
now = datetime.datetime(day=1, month=2, year=2014,
|
||||
hour=10, minute=11, second=12)
|
||||
with mock.patch.object(notification_utils, "now") as dt:
|
||||
with mock.patch.object(notification_utils, "dt_to_decimal") as td:
|
||||
td.return_value = 123.45
|
||||
dt.return_value = now
|
||||
x = roll_manager.WritingJSONRollManager(
|
||||
"%Y%m%d [[TIMESTAMP]] [[CRC]].foo")
|
||||
fn = x._make_filename("mycrc")
|
||||
self.assertEqual("./20140201_123.45_mycrc.foo", fn)
|
||||
|
||||
def test_write(self):
|
||||
x = roll_manager.WritingJSONRollManager("template.foo")
|
||||
with mock.patch.object(roll_manager.gzip, "open") as gz:
|
||||
x.write("metadata", "json_payload")
|
||||
|
||||
self.assertTrue(gz.called_once_with("template.foo", "wb"))
|
||||
|
Loading…
x
Reference in New Issue
Block a user