From ae0e7e0d1fb22f55852e41e58eed8d623c7c9beb Mon Sep 17 00:00:00 2001 From: John Dickinson Date: Mon, 26 Mar 2012 06:48:45 -0500 Subject: [PATCH 01/10] refactored child worker error handling to fix hangs --- slogging/log_common.py | 21 +++++++++++++++++---- 1 file changed, 17 insertions(+), 4 deletions(-) diff --git a/slogging/log_common.py b/slogging/log_common.py index 310cf8e..7f14092 100644 --- a/slogging/log_common.py +++ b/slogging/log_common.py @@ -23,6 +23,8 @@ from contextlib import contextmanager import os import errno import fcntl +import sys +import traceback from eventlet import sleep @@ -37,6 +39,12 @@ class BadFileDownload(Exception): self.status_code = status_code +class WorkerError(Exception): + + def __init__(self): + self.tb_str = '' # ensure that there is always something here + + class LogProcessorCommon(object): def __init__(self, conf, logger, log_route='log-processor'): @@ -188,9 +196,9 @@ def multiprocess_collate(processor_klass, processor_args, processor_method, if logger: logger.exception('error reading from out queue') else: - if isinstance(data, Exception): + if isinstance(data, WorkerError): if logger: - logger.exception(data) + logger.error(data.tb_str) else: yield item, data if not any(r.is_alive() for r in results) and out_queue.empty(): @@ -214,8 +222,13 @@ def collate_worker(processor_klass, processor_args, processor_method, in_queue, return try: ret = method(*item) - except Exception, err: - ret = err + except Exception: + err_type, err, tb = sys.exc_info() + # Use err_type since unplickling err in the parent process + # will fail if it has a custom constructor with required + # parameters. + ret = WorkerError() + ret.tb_str = ''.join(traceback.format_tb(tb)) out_queue.put((item, ret)) except Exception, err: print '****ERROR in worker****\n%r\n********' % err From b029ff00b27976a58d4e47a1d4870222d7cd2275 Mon Sep 17 00:00:00 2001 From: John Dickinson Date: Mon, 26 Mar 2012 08:48:56 -0500 Subject: [PATCH 02/10] changed to bare exception --- slogging/log_common.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/slogging/log_common.py b/slogging/log_common.py index 7f14092..02029f8 100644 --- a/slogging/log_common.py +++ b/slogging/log_common.py @@ -222,7 +222,7 @@ def collate_worker(processor_klass, processor_args, processor_method, in_queue, return try: ret = method(*item) - except Exception: + except: err_type, err, tb = sys.exc_info() # Use err_type since unplickling err in the parent process # will fail if it has a custom constructor with required From fff7dcd04bb839ad1d71b6dee929a6e3139f6d0d Mon Sep 17 00:00:00 2001 From: John Dickinson Date: Mon, 13 Feb 2012 14:11:54 -0600 Subject: [PATCH 03/10] added common functionality from tools that use log_common --- slogging/log_common.py | 65 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 65 insertions(+) diff --git a/slogging/log_common.py b/slogging/log_common.py index 02029f8..a52b794 100644 --- a/slogging/log_common.py +++ b/slogging/log_common.py @@ -57,6 +57,15 @@ class LogProcessorCommon(object): if s.strip()]) self.conf = conf self._internal_proxy = None + self.lookback_hours = int(conf.get('lookback_hours', '120')) + self.lookback_window = int(conf.get('lookback_window', + str(self.lookback_hours))) + self.log_processor_account = conf['swift_account'] + self.log_processor_container = conf.get('container_name', + 'simple_billing_data') + self.processed_files_object_name = \ + conf.get('processed_files_object_name', + 'processed_files.pickle.gz') @property def internal_proxy(self): @@ -79,6 +88,62 @@ class LogProcessorCommon(object): retries=3) return self._internal_proxy + def calculate_lookback(self): + if self.lookback_hours == 0: + lookback_start = None + lookback_end = None + else: + delta_hours = datetime.timedelta(hours=self.lookback_hours) + lookback_start = datetime.datetime.now() - delta_hours + lookback_start = lookback_start.strftime('%Y%m%d%H') + if self.lookback_window == 0: + lookback_end = None + else: + delta_window = datetime.timedelta(hours=self.lookback_window) + lookback_end = datetime.datetime.now() - \ + delta_hours + \ + delta_window + lookback_end = lookback_end.strftime('%Y%m%d%H') + self.logger.debug('lookback_start: %s' % lookback_start) + self.logger.debug('lookback_end: %s' % lookback_end) + return (lookback_start, lookback_end) + + def load_already_processed_files(self): + try: + # Note: this file (or data set) will grow without bound. + # In practice, if it becomes a problem (say, after many months of + # running), one could manually prune the file to remove older + # entries. Automatically pruning on each run could be dangerous. + # There is not a good way to determine when an old entry should be + # pruned (lookback_hours could be set to anything and could change) + processed_files_stream = self.get_object_data( + self.log_processor_account, + self.log_processor_container, + self.processed_files_object_name, + compressed=True) + buf = '\n'.join(x for x in processed_files_stream) + if buf: + already_processed_files = cPickle.loads(buf) + else: + already_processed_files = set() + except BadFileDownload, err: + if err.status_code == 404: + already_processed_files = set() + else: + self.logger.error(_('Simple billing unable to load list ' + 'of already processed log files')) + return + self.logger.debug(_('found %d processed files') % \ + len(already_processed_files)) + return already_processed_files + + def save_processed_files(self, processed_files_data): + s = cPickle.dumps(processed_files_data, cPickle.HIGHEST_PROTOCOL) + f = cStringIO.StringIO(s) + return self.internal_proxy.upload_file(f, self.log_processor_account, + self.log_processor_container, + self.processed_files_object_name) + def get_object_data(self, swift_account, container_name, object_name, compressed=False): '''reads an object and yields its lines''' From 3127f365f99d7674f65852d7be705235f15bea8f Mon Sep 17 00:00:00 2001 From: John Dickinson Date: Mon, 13 Feb 2012 14:19:26 -0600 Subject: [PATCH 04/10] moved FileBuffer to its own file --- slogging/access_log_delivery.py | 41 ++------------------------------- slogging/file_buffer.py | 40 ++++++++++++++++++++++++++++++++ 2 files changed, 42 insertions(+), 39 deletions(-) create mode 100644 slogging/file_buffer.py diff --git a/slogging/access_log_delivery.py b/slogging/access_log_delivery.py index 2ea66c1..2b8384c 100644 --- a/slogging/access_log_delivery.py +++ b/slogging/access_log_delivery.py @@ -27,10 +27,11 @@ import random import errno from swift.common.daemon import Daemon -from swift.common.utils import get_logger, TRUE_VALUES, split_path, lock_file +from swift.common.utils import get_logger, TRUE_VALUES, split_path from swift.common.exceptions import LockTimeout, ChunkReadTimeout from slogging.log_common import LogProcessorCommon, multiprocess_collate, \ BadFileDownload +from slogging.file_buffer import FileBuffer month_map = '_ Jan Feb Mar Apr May Jun Jul Aug Sep Oct Nov Dec'.split() @@ -70,44 +71,6 @@ def memoize(func): return wrapped -class FileBuffer(object): - - def __init__(self, limit, logger): - self.buffers = collections.defaultdict(list) - self.limit = limit - self.logger = logger - self.total_size = 0 - - def write(self, filename, data): - self.buffers[filename].append(data) - self.total_size += len(data) - if self.total_size >= self.limit: - self.flush() - - def flush(self): - while self.buffers: - filename_list = self.buffers.keys() - for filename in filename_list: - out = '\n'.join(self.buffers[filename]) + '\n' - mid_dirs = os.path.dirname(filename) - try: - os.makedirs(mid_dirs) - except OSError, err: - if err.errno == errno.EEXIST: - pass - else: - raise - try: - with lock_file(filename, append=True, unlink=False) as f: - f.write(out) - except LockTimeout: - # couldn't write, we'll try again later - self.logger.debug(_('Timeout writing to %s' % filename)) - else: - del self.buffers[filename] - self.total_size = 0 - - class AccessLogDelivery(LogProcessorCommon): def __init__(self, conf, logger): diff --git a/slogging/file_buffer.py b/slogging/file_buffer.py new file mode 100644 index 0000000..4abbd3b --- /dev/null +++ b/slogging/file_buffer.py @@ -0,0 +1,40 @@ +import os +from swift.common.utils import lock_file + + +class FileBuffer(object): + + def __init__(self, limit, logger): + self.buffers = collections.defaultdict(list) + self.limit = limit + self.logger = logger + self.total_size = 0 + + def write(self, filename, data): + self.buffers[filename].append(data) + self.total_size += len(data) + if self.total_size >= self.limit: + self.flush() + + def flush(self): + while self.buffers: + filename_list = self.buffers.keys() + for filename in filename_list: + out = '\n'.join(self.buffers[filename]) + '\n' + mid_dirs = os.path.dirname(filename) + try: + os.makedirs(mid_dirs) + except OSError, err: + if err.errno == errno.EEXIST: + pass + else: + raise + try: + with lock_file(filename, append=True, unlink=False) as f: + f.write(out) + except LockTimeout: + # couldn't write, we'll try again later + self.logger.debug(_('Timeout writing to %s' % filename)) + else: + del self.buffers[filename] + self.total_size = 0 From c80e63466d9ec4d296cf9cbed12e9c9aa6e18426 Mon Sep 17 00:00:00 2001 From: John Dickinson Date: Fri, 2 Mar 2012 16:01:30 -0600 Subject: [PATCH 05/10] updated tests --- slogging/file_buffer.py | 1 + slogging/log_common.py | 12 +++++------- .../unit/test_access_log_delivery.py | 19 ++++++++++++------- test_slogging/unit/test_log_processor.py | 10 ++++++---- 4 files changed, 24 insertions(+), 18 deletions(-) diff --git a/slogging/file_buffer.py b/slogging/file_buffer.py index 4abbd3b..381de7c 100644 --- a/slogging/file_buffer.py +++ b/slogging/file_buffer.py @@ -1,4 +1,5 @@ import os +import collections from swift.common.utils import lock_file diff --git a/slogging/log_common.py b/slogging/log_common.py index a52b794..870b020 100644 --- a/slogging/log_common.py +++ b/slogging/log_common.py @@ -15,7 +15,7 @@ import multiprocessing import Queue -import datetime +from datetime import datetime, timedelta import zlib import time from paste.deploy import appconfig @@ -93,16 +93,14 @@ class LogProcessorCommon(object): lookback_start = None lookback_end = None else: - delta_hours = datetime.timedelta(hours=self.lookback_hours) - lookback_start = datetime.datetime.now() - delta_hours + delta_hours = timedelta(hours=self.lookback_hours) + lookback_start = datetime.now() - delta_hours lookback_start = lookback_start.strftime('%Y%m%d%H') if self.lookback_window == 0: lookback_end = None else: - delta_window = datetime.timedelta(hours=self.lookback_window) - lookback_end = datetime.datetime.now() - \ - delta_hours + \ - delta_window + delta_window = timedelta(hours=self.lookback_window) + lookback_end = datetime.now() - delta_hours + delta_window lookback_end = lookback_end.strftime('%Y%m%d%H') self.logger.debug('lookback_start: %s' % lookback_start) self.logger.debug('lookback_end: %s' % lookback_end) diff --git a/test_slogging/unit/test_access_log_delivery.py b/test_slogging/unit/test_access_log_delivery.py index 129c480..5480fb6 100644 --- a/test_slogging/unit/test_access_log_delivery.py +++ b/test_slogging/unit/test_access_log_delivery.py @@ -64,7 +64,8 @@ class FakeMemcache(object): class TestAccessLogDelivery(unittest.TestCase): def test_log_line_parser_query_args(self): - p = access_log_delivery.AccessLogDelivery({}, DumbLogger()) + c = {'swift_account': 'foo'} + p = access_log_delivery.AccessLogDelivery(c, DumbLogger()) log_line = [str(x) for x in range(18)] log_line[1] = 'proxy-server' log_line[4] = '1/Jan/3/4/5/6' @@ -82,7 +83,7 @@ class TestAccessLogDelivery(unittest.TestCase): self.assertEquals(res, expected) def test_log_line_parser_hidden_ip(self): - conf = {'hidden_ips': '1.2.3.4'} + conf = {'hidden_ips': '1.2.3.4', 'swift_account': 'foo'} p = access_log_delivery.AccessLogDelivery(conf, DumbLogger()) log_line = [str(x) for x in range(18)] log_line[1] = 'proxy-server' @@ -104,7 +105,8 @@ class TestAccessLogDelivery(unittest.TestCase): self.assertEquals(res['client_ip'], expected) def test_log_line_parser_field_count(self): - p = access_log_delivery.AccessLogDelivery({}, DumbLogger()) + c = {'swift_account': 'foo'} + p = access_log_delivery.AccessLogDelivery(c, DumbLogger()) # too few fields log_line = [str(x) for x in range(17)] log_line[1] = 'proxy-server' @@ -148,7 +150,8 @@ class TestAccessLogDelivery(unittest.TestCase): self.assertEquals(res, expected) def test_make_clf_from_parts(self): - p = access_log_delivery.AccessLogDelivery({}, DumbLogger()) + c = {'swift_account': 'foo'} + p = access_log_delivery.AccessLogDelivery(c, DumbLogger()) log_line = [str(x) for x in range(18)] log_line[1] = 'proxy-server' log_line[4] = '1/Jan/3/4/5/6' @@ -160,7 +163,8 @@ class TestAccessLogDelivery(unittest.TestCase): self.assertEquals(clf, expect) def test_convert_log_line(self): - p = access_log_delivery.AccessLogDelivery({}, DumbLogger()) + c = {'swift_account': 'foo'} + p = access_log_delivery.AccessLogDelivery(c, DumbLogger()) log_line = [str(x) for x in range(18)] log_line[1] = 'proxy-server' log_line[4] = '1/Jan/3/4/5/6' @@ -174,7 +178,8 @@ class TestAccessLogDelivery(unittest.TestCase): self.assertEquals(res, expected) def test_get_container_save_log_flag(self): - p = access_log_delivery.AccessLogDelivery({}, DumbLogger()) + c = {'swift_account': 'foo'} + p = access_log_delivery.AccessLogDelivery(c, DumbLogger()) def my_get_metadata_true(*a, **kw): return {p.metadata_key: 'yes'} @@ -202,7 +207,7 @@ class TestAccessLogDelivery(unittest.TestCase): def test_process_one_file(self): with temptree([]) as t: - conf = {'working_dir': t} + conf = {'working_dir': t, 'swift_account': 'foo'} p = access_log_delivery.AccessLogDelivery(conf, DumbLogger()) def my_get_object_data(*a, **kw): diff --git a/test_slogging/unit/test_log_processor.py b/test_slogging/unit/test_log_processor.py index 06c03df..43d4ddf 100644 --- a/test_slogging/unit/test_log_processor.py +++ b/test_slogging/unit/test_log_processor.py @@ -92,8 +92,8 @@ class TestLogProcessor(unittest.TestCase): '6 95 - txfa431231-7f07-42fd-8fc7-7da9d8cc1f90 - 0.0262' stats_test_line = 'account,1,2,3' proxy_config = {'log-processor': { - - } + }, + 'swift_account': 'foo' } def test_lazy_load_internal_proxy(self): @@ -105,7 +105,8 @@ use = egg:swift#proxy with tmpfile(dummy_proxy_config) as proxy_config_file: conf = {'log-processor': { 'proxy_server_conf': proxy_config_file, - } + }, + 'swift_account': 'foo' } p = log_processor.LogProcessor(conf, DumbLogger()) self.assert_(isinstance(p._internal_proxy, @@ -116,7 +117,8 @@ use = egg:swift#proxy # test with empty config variable conf = {'log-processor': { 'proxy_server_conf': '', - } + }, + 'swift_account': 'foo' } q = log_processor.LogProcessor(conf, DumbLogger()) self.assert_(isinstance(q._internal_proxy, From 235aa72da9c9d0a4bc5dd75b544d53be844e2c77 Mon Sep 17 00:00:00 2001 From: John Dickinson Date: Thu, 1 Mar 2012 11:03:04 -0600 Subject: [PATCH 06/10] explicit queue join_thread calls --- slogging/log_common.py | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/slogging/log_common.py b/slogging/log_common.py index 870b020..c2f037a 100644 --- a/slogging/log_common.py +++ b/slogging/log_common.py @@ -242,13 +242,15 @@ def multiprocess_collate(processor_klass, processor_args, processor_method, processor_args, processor_method, in_queue, - out_queue)) + out_queue, + logger)) p.start() results.append(p) for x in items_to_process: in_queue.put(x) for _junk in range(worker_count): in_queue.put(None) # tell the worker to end + in_queue.join_thread() in_queue.close() while True: try: @@ -270,7 +272,7 @@ def multiprocess_collate(processor_klass, processor_args, processor_method, def collate_worker(processor_klass, processor_args, processor_method, in_queue, - out_queue): + out_queue, logger=None): '''worker process for multiprocess_collate''' try: p = processor_klass(*processor_args) @@ -294,7 +296,10 @@ def collate_worker(processor_klass, processor_args, processor_method, in_queue, ret.tb_str = ''.join(traceback.format_tb(tb)) out_queue.put((item, ret)) except Exception, err: - print '****ERROR in worker****\n%r\n********' % err + if logger: + logger.exception('Error in worker') finally: + in_queue.join_thread() in_queue.close() + out_queue.join_thread() out_queue.close() From 4903e578210419e9c486b67ed1e394de0219246b Mon Sep 17 00:00:00 2001 From: John Dickinson Date: Thu, 1 Mar 2012 12:44:31 -0600 Subject: [PATCH 07/10] split log common tests out --- slogging/log_common.py | 3 - test_slogging/unit/test_log_common.py | 215 +++++++++++++++++++++++ test_slogging/unit/test_log_processor.py | 138 --------------- 3 files changed, 215 insertions(+), 141 deletions(-) create mode 100644 test_slogging/unit/test_log_common.py diff --git a/slogging/log_common.py b/slogging/log_common.py index c2f037a..66aecd2 100644 --- a/slogging/log_common.py +++ b/slogging/log_common.py @@ -250,7 +250,6 @@ def multiprocess_collate(processor_klass, processor_args, processor_method, in_queue.put(x) for _junk in range(worker_count): in_queue.put(None) # tell the worker to end - in_queue.join_thread() in_queue.close() while True: try: @@ -299,7 +298,5 @@ def collate_worker(processor_klass, processor_args, processor_method, in_queue, if logger: logger.exception('Error in worker') finally: - in_queue.join_thread() in_queue.close() - out_queue.join_thread() out_queue.close() diff --git a/test_slogging/unit/test_log_common.py b/test_slogging/unit/test_log_common.py new file mode 100644 index 0000000..6803112 --- /dev/null +++ b/test_slogging/unit/test_log_common.py @@ -0,0 +1,215 @@ +import unittest +import Queue +from slogging import log_common + +from slogging import log_processor + + +class DumbLogger(object): + def __getattr__(self, n): + return self.foo + + def foo(self, *a, **kw): + pass + + +class DumbInternalProxy(object): + def __init__(self, code=200, timeout=False, bad_compressed=False): + self.code = code + self.timeout = timeout + self.bad_compressed = bad_compressed + + def get_container_list(self, account, container, marker=None, + end_marker=None): + n = '2010/03/14/13/obj1' + if marker is None or n > marker: + if end_marker: + if n <= end_marker: + return [{'name': n}] + else: + return [] + return [{'name': n}] + return [] + + def get_object(self, account, container, object_name): + if object_name.endswith('.gz'): + if self.bad_compressed: + # invalid compressed data + def data(): + yield '\xff\xff\xff\xff\xff\xff\xff' + else: + # 'obj\ndata', compressed with gzip -9 + def data(): + yield '\x1f\x8b\x08' + yield '\x08"\xd79L' + yield '\x02\x03te' + yield 'st\x00\xcbO' + yield '\xca\xe2JI,I' + yield '\xe4\x02\x00O\xff' + yield '\xa3Y\t\x00\x00\x00' + else: + def data(): + yield 'obj\n' + if self.timeout: + raise ChunkReadTimeout + yield 'data' + return self.code, data() + + +class TestLogProcessor(unittest.TestCase): + + proxy_config = {'log-processor': { + + } + } + access_test_line = 'Jul 9 04:14:30 saio proxy-server 1.2.3.4 4.5.6.7 '\ + '09/Jul/2010/04/14/30 GET '\ + '/v1/acct/foo/bar?format=json&foo HTTP/1.0 200 - '\ + 'curl tk4e350daf-9338-4cc6-aabb-090e49babfbd '\ + '6 95 - txfa431231-7f07-42fd-8fc7-7da9d8cc1f90 - 0.0262' + + def test_collate_worker(self): + try: + log_processor.LogProcessor._internal_proxy = DumbInternalProxy() + + def get_object_data(*a, **kw): + return [self.access_test_line] + orig_get_object_data = log_processor.LogProcessor.get_object_data + log_processor.LogProcessor.get_object_data = get_object_data + proxy_config = self.proxy_config.copy() + proxy_config.update({ + 'log-processor-access': { + 'source_filename_format': '%Y%m%d%H*', + 'class_path': + 'slogging.access_processor.AccessLogProcessor' + }}) + processor_args = (proxy_config, DumbLogger()) + q_in = Queue.Queue() + q_in.close = lambda: None + q_out = Queue.Queue() + q_out.close = lambda: None + work_request = ('access', 'a', 'c', 'o') + q_in.put(work_request) + q_in.put(None) + processor_klass = log_processor.LogProcessor + log_common.collate_worker(processor_klass, processor_args, + 'process_one_file', q_in, q_out, + DumbLogger()) + item, ret = q_out.get() + self.assertEquals(item, work_request) + expected = {('acct', '2010', '07', '09', '04'): + {('public', 'object', 'GET', '2xx'): 1, + ('public', 'bytes_out'): 95, + 'marker_query': 0, + 'format_query': 1, + 'delimiter_query': 0, + 'path_query': 0, + ('public', 'bytes_in'): 6, + 'prefix_query': 0}} + self.assertEquals(ret, expected) + finally: + log_processor.LogProcessor._internal_proxy = None + log_processor.LogProcessor.get_object_data = orig_get_object_data + + def test_collate_worker_error(self): + def get_object_data(*a, **kw): + raise Exception() + orig_get_object_data = log_processor.LogProcessor.get_object_data + try: + log_processor.LogProcessor.get_object_data = get_object_data + proxy_config = self.proxy_config.copy() + proxy_config.update({ + 'log-processor-access': { + 'source_filename_format': '%Y%m%d%H*', + 'class_path': + 'slogging.access_processor.AccessLogProcessor' + }}) + processor_args = (proxy_config, DumbLogger()) + q_in = Queue.Queue() + q_in.close = lambda: None + q_out = Queue.Queue() + q_out.close = lambda: None + work_request = ('access', 'a', 'c', 'o') + q_in.put(work_request) + q_in.put(None) + processor_klass = log_processor.LogProcessor + log_common.collate_worker(processor_klass, processor_args, + 'process_one_file', q_in, q_out, + DumbLogger()) + item, ret = q_out.get() + self.assertEquals(item, work_request) + # these only work for Py2.7+ + #self.assertIsInstance(ret, log_common.BadFileDownload) + self.assertTrue(isinstance(ret, Exception)) + finally: + log_processor.LogProcessor.get_object_data = orig_get_object_data + + def test_multiprocess_collate(self): + try: + log_processor.LogProcessor._internal_proxy = DumbInternalProxy() + + def get_object_data(*a, **kw): + return [self.access_test_line] + orig_get_object_data = log_processor.LogProcessor.get_object_data + log_processor.LogProcessor.get_object_data = get_object_data + proxy_config = self.proxy_config.copy() + proxy_config.update({ + 'log-processor-access': { + 'source_filename_format': '%Y%m%d%H*', + 'class_path': + 'slogging.access_processor.AccessLogProcessor' + }}) + processor_args = (proxy_config, DumbLogger()) + item = ('access', 'a', 'c', 'o') + logs_to_process = [item] + processor_klass = log_processor.LogProcessor + results = log_processor.multiprocess_collate(processor_klass, + processor_args, + 'process_one_file', + logs_to_process, + 1, + DumbLogger()) + results = list(results) + expected = [(item, {('acct', '2010', '07', '09', '04'): + {('public', 'object', 'GET', '2xx'): 1, + ('public', 'bytes_out'): 95, + 'marker_query': 0, + 'format_query': 1, + 'delimiter_query': 0, + 'path_query': 0, + ('public', 'bytes_in'): 6, + 'prefix_query': 0}})] + self.assertEquals(results, expected) + finally: + log_processor.LogProcessor._internal_proxy = None + log_processor.LogProcessor.get_object_data = orig_get_object_data + + def test_multiprocess_collate_errors(self): + def get_object_data(*a, **kw): + raise log_common.BadFileDownload() + orig_get_object_data = log_processor.LogProcessor.get_object_data + try: + log_processor.LogProcessor.get_object_data = get_object_data + proxy_config = self.proxy_config.copy() + proxy_config.update({ + 'log-processor-access': { + 'source_filename_format': '%Y%m%d%H*', + 'class_path': + 'slogging.access_processor.AccessLogProcessor' + }}) + processor_args = (proxy_config, DumbLogger()) + item = ('access', 'a', 'c', 'o') + logs_to_process = [item] + processor_klass = log_processor.LogProcessor + results = log_common.multiprocess_collate(processor_klass, + processor_args, + 'process_one_file', + logs_to_process, + 1, + DumbLogger()) + results = list(results) + expected = [] + self.assertEquals(results, expected) + finally: + log_processor.LogProcessor._internal_proxy = None + log_processor.LogProcessor.get_object_data = orig_get_object_data diff --git a/test_slogging/unit/test_log_processor.py b/test_slogging/unit/test_log_processor.py index 43d4ddf..7a07b6a 100644 --- a/test_slogging/unit/test_log_processor.py +++ b/test_slogging/unit/test_log_processor.py @@ -325,144 +325,6 @@ use = egg:swift#proxy #self.assertIsInstance(k, str) self.assertTrue(isinstance(k, str), type(k)) - def test_collate_worker(self): - try: - log_processor.LogProcessor._internal_proxy = DumbInternalProxy() - - def get_object_data(*a, **kw): - return [self.access_test_line] - orig_get_object_data = log_processor.LogProcessor.get_object_data - log_processor.LogProcessor.get_object_data = get_object_data - proxy_config = self.proxy_config.copy() - proxy_config.update({ - 'log-processor-access': { - 'source_filename_format': '%Y%m%d%H*', - 'class_path': - 'slogging.access_processor.AccessLogProcessor' - }}) - processor_args = (proxy_config, DumbLogger()) - q_in = Queue.Queue() - q_out = Queue.Queue() - work_request = ('access', 'a', 'c', 'o') - q_in.put(work_request) - q_in.put(None) - processor_klass = log_processor.LogProcessor - log_common.collate_worker(processor_klass, processor_args, - 'process_one_file', q_in, q_out) - item, ret = q_out.get() - self.assertEquals(item, work_request) - expected = {('acct', '2010', '07', '09', '04'): - {('public', 'object', 'GET', '2xx'): 1, - ('public', 'bytes_out'): 95, - 'marker_query': 0, - 'format_query': 1, - 'delimiter_query': 0, - 'path_query': 0, - ('public', 'bytes_in'): 6, - 'prefix_query': 0}} - self.assertEquals(ret, expected) - finally: - log_processor.LogProcessor._internal_proxy = None - log_processor.LogProcessor.get_object_data = orig_get_object_data - - def test_collate_worker_error(self): - def get_object_data(*a, **kw): - raise Exception() - orig_get_object_data = log_processor.LogProcessor.get_object_data - try: - log_processor.LogProcessor.get_object_data = get_object_data - proxy_config = self.proxy_config.copy() - proxy_config.update({ - 'log-processor-access': { - 'source_filename_format': '%Y%m%d%H*', - 'class_path': - 'slogging.access_processor.AccessLogProcessor' - }}) - processor_args = (proxy_config, DumbLogger()) - q_in = Queue.Queue() - q_out = Queue.Queue() - work_request = ('access', 'a', 'c', 'o') - q_in.put(work_request) - q_in.put(None) - processor_klass = log_processor.LogProcessor - log_common.collate_worker(processor_klass, processor_args, - 'process_one_file', q_in, q_out) - item, ret = q_out.get() - self.assertEquals(item, work_request) - # these only work for Py2.7+ - #self.assertIsInstance(ret, log_common.BadFileDownload) - self.assertTrue(isinstance(ret, Exception)) - finally: - log_processor.LogProcessor.get_object_data = orig_get_object_data - - def test_multiprocess_collate(self): - try: - log_processor.LogProcessor._internal_proxy = DumbInternalProxy() - - def get_object_data(*a, **kw): - return [self.access_test_line] - orig_get_object_data = log_processor.LogProcessor.get_object_data - log_processor.LogProcessor.get_object_data = get_object_data - proxy_config = self.proxy_config.copy() - proxy_config.update({ - 'log-processor-access': { - 'source_filename_format': '%Y%m%d%H*', - 'class_path': - 'slogging.access_processor.AccessLogProcessor' - }}) - processor_args = (proxy_config, DumbLogger()) - item = ('access', 'a', 'c', 'o') - logs_to_process = [item] - processor_klass = log_processor.LogProcessor - results = log_processor.multiprocess_collate(processor_klass, - processor_args, - 'process_one_file', - logs_to_process, - 1) - results = list(results) - expected = [(item, {('acct', '2010', '07', '09', '04'): - {('public', 'object', 'GET', '2xx'): 1, - ('public', 'bytes_out'): 95, - 'marker_query': 0, - 'format_query': 1, - 'delimiter_query': 0, - 'path_query': 0, - ('public', 'bytes_in'): 6, - 'prefix_query': 0}})] - self.assertEquals(results, expected) - finally: - log_processor.LogProcessor._internal_proxy = None - log_processor.LogProcessor.get_object_data = orig_get_object_data - - def test_multiprocess_collate_errors(self): - def get_object_data(*a, **kw): - raise log_common.BadFileDownload() - orig_get_object_data = log_processor.LogProcessor.get_object_data - try: - log_processor.LogProcessor.get_object_data = get_object_data - proxy_config = self.proxy_config.copy() - proxy_config.update({ - 'log-processor-access': { - 'source_filename_format': '%Y%m%d%H*', - 'class_path': - 'slogging.access_processor.AccessLogProcessor' - }}) - processor_args = (proxy_config, DumbLogger()) - item = ('access', 'a', 'c', 'o') - logs_to_process = [item] - processor_klass = log_processor.LogProcessor - results = log_common.multiprocess_collate(processor_klass, - processor_args, - 'process_one_file', - logs_to_process, - 1) - results = list(results) - expected = [] - self.assertEquals(results, expected) - finally: - log_processor.LogProcessor._internal_proxy = None - log_processor.LogProcessor.get_object_data = orig_get_object_data - class TestLogProcessorDaemon(unittest.TestCase): From 6784a987173c9a3dea5513f91c7f24c3064438b2 Mon Sep 17 00:00:00 2001 From: John Dickinson Date: Fri, 2 Mar 2012 16:13:49 -0600 Subject: [PATCH 08/10] fixed tests --- test_slogging/unit/test_access_log_delivery.py | 17 +++++++---------- test_slogging/unit/test_log_common.py | 5 ++--- 2 files changed, 9 insertions(+), 13 deletions(-) diff --git a/test_slogging/unit/test_access_log_delivery.py b/test_slogging/unit/test_access_log_delivery.py index 5480fb6..c05340a 100644 --- a/test_slogging/unit/test_access_log_delivery.py +++ b/test_slogging/unit/test_access_log_delivery.py @@ -63,9 +63,10 @@ class FakeMemcache(object): class TestAccessLogDelivery(unittest.TestCase): + conf = {'swift_account': 'foo'} + def test_log_line_parser_query_args(self): - c = {'swift_account': 'foo'} - p = access_log_delivery.AccessLogDelivery(c, DumbLogger()) + p = access_log_delivery.AccessLogDelivery(self.conf, DumbLogger()) log_line = [str(x) for x in range(18)] log_line[1] = 'proxy-server' log_line[4] = '1/Jan/3/4/5/6' @@ -105,8 +106,7 @@ class TestAccessLogDelivery(unittest.TestCase): self.assertEquals(res['client_ip'], expected) def test_log_line_parser_field_count(self): - c = {'swift_account': 'foo'} - p = access_log_delivery.AccessLogDelivery(c, DumbLogger()) + p = access_log_delivery.AccessLogDelivery(self.conf, DumbLogger()) # too few fields log_line = [str(x) for x in range(17)] log_line[1] = 'proxy-server' @@ -150,8 +150,7 @@ class TestAccessLogDelivery(unittest.TestCase): self.assertEquals(res, expected) def test_make_clf_from_parts(self): - c = {'swift_account': 'foo'} - p = access_log_delivery.AccessLogDelivery(c, DumbLogger()) + p = access_log_delivery.AccessLogDelivery(self.conf, DumbLogger()) log_line = [str(x) for x in range(18)] log_line[1] = 'proxy-server' log_line[4] = '1/Jan/3/4/5/6' @@ -163,8 +162,7 @@ class TestAccessLogDelivery(unittest.TestCase): self.assertEquals(clf, expect) def test_convert_log_line(self): - c = {'swift_account': 'foo'} - p = access_log_delivery.AccessLogDelivery(c, DumbLogger()) + p = access_log_delivery.AccessLogDelivery(self.conf, DumbLogger()) log_line = [str(x) for x in range(18)] log_line[1] = 'proxy-server' log_line[4] = '1/Jan/3/4/5/6' @@ -178,8 +176,7 @@ class TestAccessLogDelivery(unittest.TestCase): self.assertEquals(res, expected) def test_get_container_save_log_flag(self): - c = {'swift_account': 'foo'} - p = access_log_delivery.AccessLogDelivery(c, DumbLogger()) + p = access_log_delivery.AccessLogDelivery(self.conf, DumbLogger()) def my_get_metadata_true(*a, **kw): return {p.metadata_key: 'yes'} diff --git a/test_slogging/unit/test_log_common.py b/test_slogging/unit/test_log_common.py index 6803112..ff9985f 100644 --- a/test_slogging/unit/test_log_common.py +++ b/test_slogging/unit/test_log_common.py @@ -58,9 +58,8 @@ class DumbInternalProxy(object): class TestLogProcessor(unittest.TestCase): - proxy_config = {'log-processor': { - - } + proxy_config = {'log-processor': {}, + 'swift_account': 'foo' } access_test_line = 'Jul 9 04:14:30 saio proxy-server 1.2.3.4 4.5.6.7 '\ '09/Jul/2010/04/14/30 GET '\ From 6bd1c93d931feeefaa8bf575269923c87266e9ca Mon Sep 17 00:00:00 2001 From: John Dickinson Date: Tue, 27 Mar 2012 15:27:55 -0500 Subject: [PATCH 09/10] silly conf read fix --- slogging/log_common.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/slogging/log_common.py b/slogging/log_common.py index 66aecd2..56a90c4 100644 --- a/slogging/log_common.py +++ b/slogging/log_common.py @@ -59,7 +59,7 @@ class LogProcessorCommon(object): self._internal_proxy = None self.lookback_hours = int(conf.get('lookback_hours', '120')) self.lookback_window = int(conf.get('lookback_window', - str(self.lookback_hours))) + self.lookback_hours)) self.log_processor_account = conf['swift_account'] self.log_processor_container = conf.get('container_name', 'simple_billing_data') From 7ed5a0d23ae16bbd2410f0b9e82d87779ca0c823 Mon Sep 17 00:00:00 2001 From: John Dickinson Date: Tue, 27 Mar 2012 15:52:57 -0500 Subject: [PATCH 10/10] access log delivery now uses the common utilities in log_common --- slogging/access_log_delivery.py | 95 ++++++------------- .../unit/test_access_log_delivery.py | 9 +- 2 files changed, 34 insertions(+), 70 deletions(-) diff --git a/slogging/access_log_delivery.py b/slogging/access_log_delivery.py index 2b8384c..c9e8d95 100644 --- a/slogging/access_log_delivery.py +++ b/slogging/access_log_delivery.py @@ -85,6 +85,23 @@ class AccessLogDelivery(LogProcessorCommon): self.file_buffer = FileBuffer(buffer_limit, logger) self.hidden_ips = [x.strip() for x in conf.get('hidden_ips', '').split(',') if x.strip()] + self.source_account = conf['log_source_account'] + self.source_container = conf.get('log_source_container_name', + 'log_data') + + def get_logs_to_process(self, already_processed_files): + lookback_start, lookback_end = self.calculate_lookback() + logs_to_process = self.log_processor.get_container_listing( + self.source_account, + self.source_container, + lookback_start, + lookback_end, + already_processed_files) + logs_to_process = [(self.source_account, self.source_container, x) + for x in logs_to_process] + self.logger.info(_('loaded %d files to process') % + len(logs_to_process)) + return logs_to_process def process_one_file(self, account, container, object_name): files_to_upload = set() @@ -220,18 +237,11 @@ class AccessLogDeliveryDaemon(Daemon): super(AccessLogDeliveryDaemon, self).__init__(c) self.logger = get_logger(c, log_route='access-log-delivery') self.log_processor = AccessLogDelivery(c, self.logger) - self.lookback_hours = int(c.get('lookback_hours', '120')) - self.lookback_window = int(c.get('lookback_window', - str(self.lookback_hours))) self.log_delivery_account = c['swift_account'] self.log_delivery_container = c.get('container_name', 'access_log_delivery_data') - self.source_account = c['log_source_account'] - self.source_container = c.get('log_source_container_name', 'log_data') self.target_container = c.get('target_container', '.ACCESS_LOGS') self.frequency = int(c.get('frequency', '3600')) - self.processed_files_object_name = c.get('processed_files_object_name', - 'processed_files.pickle.gz') self.worker_count = int(c.get('worker_count', '1')) self.working_dir = c.get('working_dir', '/tmp/swift') if self.working_dir.endswith('/'): @@ -240,65 +250,21 @@ class AccessLogDeliveryDaemon(Daemon): def run_once(self, *a, **kw): self.logger.info(_("Beginning log processing")) start = time.time() - if self.lookback_hours == 0: - lookback_start = None - lookback_end = None - else: - delta_hours = datetime.timedelta(hours=self.lookback_hours) - lookback_start = datetime.datetime.now() - delta_hours - lookback_start = lookback_start.strftime('%Y%m%d%H') - if self.lookback_window == 0: - lookback_end = None - else: - delta_window = datetime.timedelta(hours=self.lookback_window) - lookback_end = datetime.datetime.now() - \ - delta_hours + \ - delta_window - lookback_end = lookback_end.strftime('%Y%m%d%H') - self.logger.debug('lookback_start: %s' % lookback_start) - self.logger.debug('lookback_end: %s' % lookback_end) - try: - # Note: this file (or data set) will grow without bound. - # In practice, if it becomes a problem (say, after many months of - # running), one could manually prune the file to remove older - # entries. Automatically pruning on each run could be dangerous. - # There is not a good way to determine when an old entry should be - # pruned (lookback_hours could be set to anything and could change) - processed_files_stream = self.log_processor.get_object_data( - self.log_delivery_account, - self.log_delivery_container, - self.processed_files_object_name, - compressed=True) - buf = '\n'.join(x for x in processed_files_stream) - if buf: - already_processed_files = cPickle.loads(buf) - else: - already_processed_files = set() - except BadFileDownload, err: - if err.status_code == 404: - already_processed_files = set() - else: - self.logger.error(_('Access log delivery unable to load list ' - 'of already processed log files')) - return - self.logger.debug(_('found %d processed files') % \ - len(already_processed_files)) - logs_to_process = self.log_processor.get_container_listing( - self.source_account, - self.source_container, - lookback_start, - lookback_end, - already_processed_files) - self.logger.info(_('loaded %d files to process') % - len(logs_to_process)) + already_processed_files = \ + self.log_processor.load_already_processed_files() + lookback_hours = kw.get('lookback_hours') + if lookback_hours: + self.log_processor.lookback_hours = lookback_hours + lookback_window = kw.get('lookback_window') + if lookback_window: + self.log_processor.lookback_window = lookback_window + logs_to_process = \ + self.log_processor.get_logs_to_process(already_processed_files) if not logs_to_process: self.logger.info(_("Log processing done (%0.2f minutes)") % ((time.time() - start) / 60)) return - logs_to_process = [(self.source_account, self.source_container, x) - for x in logs_to_process] - # map processor_args = (self.conf, self.logger) results = multiprocess_collate(AccessLogDelivery, processor_args, @@ -332,12 +298,7 @@ class AccessLogDeliveryDaemon(Daemon): filename, account)) # cleanup - s = cPickle.dumps(processed_files, cPickle.HIGHEST_PROTOCOL) - f = cStringIO.StringIO(s) - success = self.log_processor.internal_proxy.upload_file(f, - self.log_delivery_account, - self.log_delivery_container, - self.processed_files_object_name) + success = self.log_processor.save_processed_files(processed_files) if not success: self.logger.error('Error uploading updated processed files log') self.logger.info(_("Log processing done (%0.2f minutes)") % diff --git a/test_slogging/unit/test_access_log_delivery.py b/test_slogging/unit/test_access_log_delivery.py index c05340a..aca9b18 100644 --- a/test_slogging/unit/test_access_log_delivery.py +++ b/test_slogging/unit/test_access_log_delivery.py @@ -63,7 +63,8 @@ class FakeMemcache(object): class TestAccessLogDelivery(unittest.TestCase): - conf = {'swift_account': 'foo'} + conf = {'swift_account': 'foo', + 'log_source_account': 'bar'} def test_log_line_parser_query_args(self): p = access_log_delivery.AccessLogDelivery(self.conf, DumbLogger()) @@ -84,7 +85,8 @@ class TestAccessLogDelivery(unittest.TestCase): self.assertEquals(res, expected) def test_log_line_parser_hidden_ip(self): - conf = {'hidden_ips': '1.2.3.4', 'swift_account': 'foo'} + conf = {'hidden_ips': '1.2.3.4', 'swift_account': 'foo', + 'log_source_account': 'bar'} p = access_log_delivery.AccessLogDelivery(conf, DumbLogger()) log_line = [str(x) for x in range(18)] log_line[1] = 'proxy-server' @@ -204,7 +206,8 @@ class TestAccessLogDelivery(unittest.TestCase): def test_process_one_file(self): with temptree([]) as t: - conf = {'working_dir': t, 'swift_account': 'foo'} + conf = {'working_dir': t, 'swift_account': 'foo', + 'log_source_account': 'bar'} p = access_log_delivery.AccessLogDelivery(conf, DumbLogger()) def my_get_object_data(*a, **kw):