diff --git a/slogging/access_log_delivery.py b/slogging/access_log_delivery.py index 2ea66c1..c9e8d95 100644 --- a/slogging/access_log_delivery.py +++ b/slogging/access_log_delivery.py @@ -27,10 +27,11 @@ import random import errno from swift.common.daemon import Daemon -from swift.common.utils import get_logger, TRUE_VALUES, split_path, lock_file +from swift.common.utils import get_logger, TRUE_VALUES, split_path from swift.common.exceptions import LockTimeout, ChunkReadTimeout from slogging.log_common import LogProcessorCommon, multiprocess_collate, \ BadFileDownload +from slogging.file_buffer import FileBuffer month_map = '_ Jan Feb Mar Apr May Jun Jul Aug Sep Oct Nov Dec'.split() @@ -70,44 +71,6 @@ def memoize(func): return wrapped -class FileBuffer(object): - - def __init__(self, limit, logger): - self.buffers = collections.defaultdict(list) - self.limit = limit - self.logger = logger - self.total_size = 0 - - def write(self, filename, data): - self.buffers[filename].append(data) - self.total_size += len(data) - if self.total_size >= self.limit: - self.flush() - - def flush(self): - while self.buffers: - filename_list = self.buffers.keys() - for filename in filename_list: - out = '\n'.join(self.buffers[filename]) + '\n' - mid_dirs = os.path.dirname(filename) - try: - os.makedirs(mid_dirs) - except OSError, err: - if err.errno == errno.EEXIST: - pass - else: - raise - try: - with lock_file(filename, append=True, unlink=False) as f: - f.write(out) - except LockTimeout: - # couldn't write, we'll try again later - self.logger.debug(_('Timeout writing to %s' % filename)) - else: - del self.buffers[filename] - self.total_size = 0 - - class AccessLogDelivery(LogProcessorCommon): def __init__(self, conf, logger): @@ -122,6 +85,23 @@ class AccessLogDelivery(LogProcessorCommon): self.file_buffer = FileBuffer(buffer_limit, logger) self.hidden_ips = [x.strip() for x in conf.get('hidden_ips', '').split(',') if x.strip()] + self.source_account = conf['log_source_account'] + self.source_container = conf.get('log_source_container_name', + 'log_data') + + def get_logs_to_process(self, already_processed_files): + lookback_start, lookback_end = self.calculate_lookback() + logs_to_process = self.log_processor.get_container_listing( + self.source_account, + self.source_container, + lookback_start, + lookback_end, + already_processed_files) + logs_to_process = [(self.source_account, self.source_container, x) + for x in logs_to_process] + self.logger.info(_('loaded %d files to process') % + len(logs_to_process)) + return logs_to_process def process_one_file(self, account, container, object_name): files_to_upload = set() @@ -257,18 +237,11 @@ class AccessLogDeliveryDaemon(Daemon): super(AccessLogDeliveryDaemon, self).__init__(c) self.logger = get_logger(c, log_route='access-log-delivery') self.log_processor = AccessLogDelivery(c, self.logger) - self.lookback_hours = int(c.get('lookback_hours', '120')) - self.lookback_window = int(c.get('lookback_window', - str(self.lookback_hours))) self.log_delivery_account = c['swift_account'] self.log_delivery_container = c.get('container_name', 'access_log_delivery_data') - self.source_account = c['log_source_account'] - self.source_container = c.get('log_source_container_name', 'log_data') self.target_container = c.get('target_container', '.ACCESS_LOGS') self.frequency = int(c.get('frequency', '3600')) - self.processed_files_object_name = c.get('processed_files_object_name', - 'processed_files.pickle.gz') self.worker_count = int(c.get('worker_count', '1')) self.working_dir = c.get('working_dir', '/tmp/swift') if self.working_dir.endswith('/'): @@ -277,65 +250,21 @@ class AccessLogDeliveryDaemon(Daemon): def run_once(self, *a, **kw): self.logger.info(_("Beginning log processing")) start = time.time() - if self.lookback_hours == 0: - lookback_start = None - lookback_end = None - else: - delta_hours = datetime.timedelta(hours=self.lookback_hours) - lookback_start = datetime.datetime.now() - delta_hours - lookback_start = lookback_start.strftime('%Y%m%d%H') - if self.lookback_window == 0: - lookback_end = None - else: - delta_window = datetime.timedelta(hours=self.lookback_window) - lookback_end = datetime.datetime.now() - \ - delta_hours + \ - delta_window - lookback_end = lookback_end.strftime('%Y%m%d%H') - self.logger.debug('lookback_start: %s' % lookback_start) - self.logger.debug('lookback_end: %s' % lookback_end) - try: - # Note: this file (or data set) will grow without bound. - # In practice, if it becomes a problem (say, after many months of - # running), one could manually prune the file to remove older - # entries. Automatically pruning on each run could be dangerous. - # There is not a good way to determine when an old entry should be - # pruned (lookback_hours could be set to anything and could change) - processed_files_stream = self.log_processor.get_object_data( - self.log_delivery_account, - self.log_delivery_container, - self.processed_files_object_name, - compressed=True) - buf = '\n'.join(x for x in processed_files_stream) - if buf: - already_processed_files = cPickle.loads(buf) - else: - already_processed_files = set() - except BadFileDownload, err: - if err.status_code == 404: - already_processed_files = set() - else: - self.logger.error(_('Access log delivery unable to load list ' - 'of already processed log files')) - return - self.logger.debug(_('found %d processed files') % \ - len(already_processed_files)) - logs_to_process = self.log_processor.get_container_listing( - self.source_account, - self.source_container, - lookback_start, - lookback_end, - already_processed_files) - self.logger.info(_('loaded %d files to process') % - len(logs_to_process)) + already_processed_files = \ + self.log_processor.load_already_processed_files() + lookback_hours = kw.get('lookback_hours') + if lookback_hours: + self.log_processor.lookback_hours = lookback_hours + lookback_window = kw.get('lookback_window') + if lookback_window: + self.log_processor.lookback_window = lookback_window + logs_to_process = \ + self.log_processor.get_logs_to_process(already_processed_files) if not logs_to_process: self.logger.info(_("Log processing done (%0.2f minutes)") % ((time.time() - start) / 60)) return - logs_to_process = [(self.source_account, self.source_container, x) - for x in logs_to_process] - # map processor_args = (self.conf, self.logger) results = multiprocess_collate(AccessLogDelivery, processor_args, @@ -369,12 +298,7 @@ class AccessLogDeliveryDaemon(Daemon): filename, account)) # cleanup - s = cPickle.dumps(processed_files, cPickle.HIGHEST_PROTOCOL) - f = cStringIO.StringIO(s) - success = self.log_processor.internal_proxy.upload_file(f, - self.log_delivery_account, - self.log_delivery_container, - self.processed_files_object_name) + success = self.log_processor.save_processed_files(processed_files) if not success: self.logger.error('Error uploading updated processed files log') self.logger.info(_("Log processing done (%0.2f minutes)") % diff --git a/slogging/file_buffer.py b/slogging/file_buffer.py new file mode 100644 index 0000000..381de7c --- /dev/null +++ b/slogging/file_buffer.py @@ -0,0 +1,41 @@ +import os +import collections +from swift.common.utils import lock_file + + +class FileBuffer(object): + + def __init__(self, limit, logger): + self.buffers = collections.defaultdict(list) + self.limit = limit + self.logger = logger + self.total_size = 0 + + def write(self, filename, data): + self.buffers[filename].append(data) + self.total_size += len(data) + if self.total_size >= self.limit: + self.flush() + + def flush(self): + while self.buffers: + filename_list = self.buffers.keys() + for filename in filename_list: + out = '\n'.join(self.buffers[filename]) + '\n' + mid_dirs = os.path.dirname(filename) + try: + os.makedirs(mid_dirs) + except OSError, err: + if err.errno == errno.EEXIST: + pass + else: + raise + try: + with lock_file(filename, append=True, unlink=False) as f: + f.write(out) + except LockTimeout: + # couldn't write, we'll try again later + self.logger.debug(_('Timeout writing to %s' % filename)) + else: + del self.buffers[filename] + self.total_size = 0 diff --git a/slogging/log_common.py b/slogging/log_common.py index 310cf8e..56a90c4 100644 --- a/slogging/log_common.py +++ b/slogging/log_common.py @@ -15,7 +15,7 @@ import multiprocessing import Queue -import datetime +from datetime import datetime, timedelta import zlib import time from paste.deploy import appconfig @@ -23,6 +23,8 @@ from contextlib import contextmanager import os import errno import fcntl +import sys +import traceback from eventlet import sleep @@ -37,6 +39,12 @@ class BadFileDownload(Exception): self.status_code = status_code +class WorkerError(Exception): + + def __init__(self): + self.tb_str = '' # ensure that there is always something here + + class LogProcessorCommon(object): def __init__(self, conf, logger, log_route='log-processor'): @@ -49,6 +57,15 @@ class LogProcessorCommon(object): if s.strip()]) self.conf = conf self._internal_proxy = None + self.lookback_hours = int(conf.get('lookback_hours', '120')) + self.lookback_window = int(conf.get('lookback_window', + self.lookback_hours)) + self.log_processor_account = conf['swift_account'] + self.log_processor_container = conf.get('container_name', + 'simple_billing_data') + self.processed_files_object_name = \ + conf.get('processed_files_object_name', + 'processed_files.pickle.gz') @property def internal_proxy(self): @@ -71,6 +88,60 @@ class LogProcessorCommon(object): retries=3) return self._internal_proxy + def calculate_lookback(self): + if self.lookback_hours == 0: + lookback_start = None + lookback_end = None + else: + delta_hours = timedelta(hours=self.lookback_hours) + lookback_start = datetime.now() - delta_hours + lookback_start = lookback_start.strftime('%Y%m%d%H') + if self.lookback_window == 0: + lookback_end = None + else: + delta_window = timedelta(hours=self.lookback_window) + lookback_end = datetime.now() - delta_hours + delta_window + lookback_end = lookback_end.strftime('%Y%m%d%H') + self.logger.debug('lookback_start: %s' % lookback_start) + self.logger.debug('lookback_end: %s' % lookback_end) + return (lookback_start, lookback_end) + + def load_already_processed_files(self): + try: + # Note: this file (or data set) will grow without bound. + # In practice, if it becomes a problem (say, after many months of + # running), one could manually prune the file to remove older + # entries. Automatically pruning on each run could be dangerous. + # There is not a good way to determine when an old entry should be + # pruned (lookback_hours could be set to anything and could change) + processed_files_stream = self.get_object_data( + self.log_processor_account, + self.log_processor_container, + self.processed_files_object_name, + compressed=True) + buf = '\n'.join(x for x in processed_files_stream) + if buf: + already_processed_files = cPickle.loads(buf) + else: + already_processed_files = set() + except BadFileDownload, err: + if err.status_code == 404: + already_processed_files = set() + else: + self.logger.error(_('Simple billing unable to load list ' + 'of already processed log files')) + return + self.logger.debug(_('found %d processed files') % \ + len(already_processed_files)) + return already_processed_files + + def save_processed_files(self, processed_files_data): + s = cPickle.dumps(processed_files_data, cPickle.HIGHEST_PROTOCOL) + f = cStringIO.StringIO(s) + return self.internal_proxy.upload_file(f, self.log_processor_account, + self.log_processor_container, + self.processed_files_object_name) + def get_object_data(self, swift_account, container_name, object_name, compressed=False): '''reads an object and yields its lines''' @@ -171,7 +242,8 @@ def multiprocess_collate(processor_klass, processor_args, processor_method, processor_args, processor_method, in_queue, - out_queue)) + out_queue, + logger)) p.start() results.append(p) for x in items_to_process: @@ -188,9 +260,9 @@ def multiprocess_collate(processor_klass, processor_args, processor_method, if logger: logger.exception('error reading from out queue') else: - if isinstance(data, Exception): + if isinstance(data, WorkerError): if logger: - logger.exception(data) + logger.error(data.tb_str) else: yield item, data if not any(r.is_alive() for r in results) and out_queue.empty(): @@ -199,7 +271,7 @@ def multiprocess_collate(processor_klass, processor_args, processor_method, def collate_worker(processor_klass, processor_args, processor_method, in_queue, - out_queue): + out_queue, logger=None): '''worker process for multiprocess_collate''' try: p = processor_klass(*processor_args) @@ -214,11 +286,17 @@ def collate_worker(processor_klass, processor_args, processor_method, in_queue, return try: ret = method(*item) - except Exception, err: - ret = err + except: + err_type, err, tb = sys.exc_info() + # Use err_type since unplickling err in the parent process + # will fail if it has a custom constructor with required + # parameters. + ret = WorkerError() + ret.tb_str = ''.join(traceback.format_tb(tb)) out_queue.put((item, ret)) except Exception, err: - print '****ERROR in worker****\n%r\n********' % err + if logger: + logger.exception('Error in worker') finally: in_queue.close() out_queue.close() diff --git a/test_slogging/unit/test_access_log_delivery.py b/test_slogging/unit/test_access_log_delivery.py index 129c480..aca9b18 100644 --- a/test_slogging/unit/test_access_log_delivery.py +++ b/test_slogging/unit/test_access_log_delivery.py @@ -63,8 +63,11 @@ class FakeMemcache(object): class TestAccessLogDelivery(unittest.TestCase): + conf = {'swift_account': 'foo', + 'log_source_account': 'bar'} + def test_log_line_parser_query_args(self): - p = access_log_delivery.AccessLogDelivery({}, DumbLogger()) + p = access_log_delivery.AccessLogDelivery(self.conf, DumbLogger()) log_line = [str(x) for x in range(18)] log_line[1] = 'proxy-server' log_line[4] = '1/Jan/3/4/5/6' @@ -82,7 +85,8 @@ class TestAccessLogDelivery(unittest.TestCase): self.assertEquals(res, expected) def test_log_line_parser_hidden_ip(self): - conf = {'hidden_ips': '1.2.3.4'} + conf = {'hidden_ips': '1.2.3.4', 'swift_account': 'foo', + 'log_source_account': 'bar'} p = access_log_delivery.AccessLogDelivery(conf, DumbLogger()) log_line = [str(x) for x in range(18)] log_line[1] = 'proxy-server' @@ -104,7 +108,7 @@ class TestAccessLogDelivery(unittest.TestCase): self.assertEquals(res['client_ip'], expected) def test_log_line_parser_field_count(self): - p = access_log_delivery.AccessLogDelivery({}, DumbLogger()) + p = access_log_delivery.AccessLogDelivery(self.conf, DumbLogger()) # too few fields log_line = [str(x) for x in range(17)] log_line[1] = 'proxy-server' @@ -148,7 +152,7 @@ class TestAccessLogDelivery(unittest.TestCase): self.assertEquals(res, expected) def test_make_clf_from_parts(self): - p = access_log_delivery.AccessLogDelivery({}, DumbLogger()) + p = access_log_delivery.AccessLogDelivery(self.conf, DumbLogger()) log_line = [str(x) for x in range(18)] log_line[1] = 'proxy-server' log_line[4] = '1/Jan/3/4/5/6' @@ -160,7 +164,7 @@ class TestAccessLogDelivery(unittest.TestCase): self.assertEquals(clf, expect) def test_convert_log_line(self): - p = access_log_delivery.AccessLogDelivery({}, DumbLogger()) + p = access_log_delivery.AccessLogDelivery(self.conf, DumbLogger()) log_line = [str(x) for x in range(18)] log_line[1] = 'proxy-server' log_line[4] = '1/Jan/3/4/5/6' @@ -174,7 +178,7 @@ class TestAccessLogDelivery(unittest.TestCase): self.assertEquals(res, expected) def test_get_container_save_log_flag(self): - p = access_log_delivery.AccessLogDelivery({}, DumbLogger()) + p = access_log_delivery.AccessLogDelivery(self.conf, DumbLogger()) def my_get_metadata_true(*a, **kw): return {p.metadata_key: 'yes'} @@ -202,7 +206,8 @@ class TestAccessLogDelivery(unittest.TestCase): def test_process_one_file(self): with temptree([]) as t: - conf = {'working_dir': t} + conf = {'working_dir': t, 'swift_account': 'foo', + 'log_source_account': 'bar'} p = access_log_delivery.AccessLogDelivery(conf, DumbLogger()) def my_get_object_data(*a, **kw): diff --git a/test_slogging/unit/test_log_common.py b/test_slogging/unit/test_log_common.py new file mode 100644 index 0000000..ff9985f --- /dev/null +++ b/test_slogging/unit/test_log_common.py @@ -0,0 +1,214 @@ +import unittest +import Queue +from slogging import log_common + +from slogging import log_processor + + +class DumbLogger(object): + def __getattr__(self, n): + return self.foo + + def foo(self, *a, **kw): + pass + + +class DumbInternalProxy(object): + def __init__(self, code=200, timeout=False, bad_compressed=False): + self.code = code + self.timeout = timeout + self.bad_compressed = bad_compressed + + def get_container_list(self, account, container, marker=None, + end_marker=None): + n = '2010/03/14/13/obj1' + if marker is None or n > marker: + if end_marker: + if n <= end_marker: + return [{'name': n}] + else: + return [] + return [{'name': n}] + return [] + + def get_object(self, account, container, object_name): + if object_name.endswith('.gz'): + if self.bad_compressed: + # invalid compressed data + def data(): + yield '\xff\xff\xff\xff\xff\xff\xff' + else: + # 'obj\ndata', compressed with gzip -9 + def data(): + yield '\x1f\x8b\x08' + yield '\x08"\xd79L' + yield '\x02\x03te' + yield 'st\x00\xcbO' + yield '\xca\xe2JI,I' + yield '\xe4\x02\x00O\xff' + yield '\xa3Y\t\x00\x00\x00' + else: + def data(): + yield 'obj\n' + if self.timeout: + raise ChunkReadTimeout + yield 'data' + return self.code, data() + + +class TestLogProcessor(unittest.TestCase): + + proxy_config = {'log-processor': {}, + 'swift_account': 'foo' + } + access_test_line = 'Jul 9 04:14:30 saio proxy-server 1.2.3.4 4.5.6.7 '\ + '09/Jul/2010/04/14/30 GET '\ + '/v1/acct/foo/bar?format=json&foo HTTP/1.0 200 - '\ + 'curl tk4e350daf-9338-4cc6-aabb-090e49babfbd '\ + '6 95 - txfa431231-7f07-42fd-8fc7-7da9d8cc1f90 - 0.0262' + + def test_collate_worker(self): + try: + log_processor.LogProcessor._internal_proxy = DumbInternalProxy() + + def get_object_data(*a, **kw): + return [self.access_test_line] + orig_get_object_data = log_processor.LogProcessor.get_object_data + log_processor.LogProcessor.get_object_data = get_object_data + proxy_config = self.proxy_config.copy() + proxy_config.update({ + 'log-processor-access': { + 'source_filename_format': '%Y%m%d%H*', + 'class_path': + 'slogging.access_processor.AccessLogProcessor' + }}) + processor_args = (proxy_config, DumbLogger()) + q_in = Queue.Queue() + q_in.close = lambda: None + q_out = Queue.Queue() + q_out.close = lambda: None + work_request = ('access', 'a', 'c', 'o') + q_in.put(work_request) + q_in.put(None) + processor_klass = log_processor.LogProcessor + log_common.collate_worker(processor_klass, processor_args, + 'process_one_file', q_in, q_out, + DumbLogger()) + item, ret = q_out.get() + self.assertEquals(item, work_request) + expected = {('acct', '2010', '07', '09', '04'): + {('public', 'object', 'GET', '2xx'): 1, + ('public', 'bytes_out'): 95, + 'marker_query': 0, + 'format_query': 1, + 'delimiter_query': 0, + 'path_query': 0, + ('public', 'bytes_in'): 6, + 'prefix_query': 0}} + self.assertEquals(ret, expected) + finally: + log_processor.LogProcessor._internal_proxy = None + log_processor.LogProcessor.get_object_data = orig_get_object_data + + def test_collate_worker_error(self): + def get_object_data(*a, **kw): + raise Exception() + orig_get_object_data = log_processor.LogProcessor.get_object_data + try: + log_processor.LogProcessor.get_object_data = get_object_data + proxy_config = self.proxy_config.copy() + proxy_config.update({ + 'log-processor-access': { + 'source_filename_format': '%Y%m%d%H*', + 'class_path': + 'slogging.access_processor.AccessLogProcessor' + }}) + processor_args = (proxy_config, DumbLogger()) + q_in = Queue.Queue() + q_in.close = lambda: None + q_out = Queue.Queue() + q_out.close = lambda: None + work_request = ('access', 'a', 'c', 'o') + q_in.put(work_request) + q_in.put(None) + processor_klass = log_processor.LogProcessor + log_common.collate_worker(processor_klass, processor_args, + 'process_one_file', q_in, q_out, + DumbLogger()) + item, ret = q_out.get() + self.assertEquals(item, work_request) + # these only work for Py2.7+ + #self.assertIsInstance(ret, log_common.BadFileDownload) + self.assertTrue(isinstance(ret, Exception)) + finally: + log_processor.LogProcessor.get_object_data = orig_get_object_data + + def test_multiprocess_collate(self): + try: + log_processor.LogProcessor._internal_proxy = DumbInternalProxy() + + def get_object_data(*a, **kw): + return [self.access_test_line] + orig_get_object_data = log_processor.LogProcessor.get_object_data + log_processor.LogProcessor.get_object_data = get_object_data + proxy_config = self.proxy_config.copy() + proxy_config.update({ + 'log-processor-access': { + 'source_filename_format': '%Y%m%d%H*', + 'class_path': + 'slogging.access_processor.AccessLogProcessor' + }}) + processor_args = (proxy_config, DumbLogger()) + item = ('access', 'a', 'c', 'o') + logs_to_process = [item] + processor_klass = log_processor.LogProcessor + results = log_processor.multiprocess_collate(processor_klass, + processor_args, + 'process_one_file', + logs_to_process, + 1, + DumbLogger()) + results = list(results) + expected = [(item, {('acct', '2010', '07', '09', '04'): + {('public', 'object', 'GET', '2xx'): 1, + ('public', 'bytes_out'): 95, + 'marker_query': 0, + 'format_query': 1, + 'delimiter_query': 0, + 'path_query': 0, + ('public', 'bytes_in'): 6, + 'prefix_query': 0}})] + self.assertEquals(results, expected) + finally: + log_processor.LogProcessor._internal_proxy = None + log_processor.LogProcessor.get_object_data = orig_get_object_data + + def test_multiprocess_collate_errors(self): + def get_object_data(*a, **kw): + raise log_common.BadFileDownload() + orig_get_object_data = log_processor.LogProcessor.get_object_data + try: + log_processor.LogProcessor.get_object_data = get_object_data + proxy_config = self.proxy_config.copy() + proxy_config.update({ + 'log-processor-access': { + 'source_filename_format': '%Y%m%d%H*', + 'class_path': + 'slogging.access_processor.AccessLogProcessor' + }}) + processor_args = (proxy_config, DumbLogger()) + item = ('access', 'a', 'c', 'o') + logs_to_process = [item] + processor_klass = log_processor.LogProcessor + results = log_common.multiprocess_collate(processor_klass, + processor_args, + 'process_one_file', + logs_to_process, + 1, + DumbLogger()) + results = list(results) + expected = [] + self.assertEquals(results, expected) + finally: + log_processor.LogProcessor._internal_proxy = None + log_processor.LogProcessor.get_object_data = orig_get_object_data diff --git a/test_slogging/unit/test_log_processor.py b/test_slogging/unit/test_log_processor.py index 06c03df..7a07b6a 100644 --- a/test_slogging/unit/test_log_processor.py +++ b/test_slogging/unit/test_log_processor.py @@ -92,8 +92,8 @@ class TestLogProcessor(unittest.TestCase): '6 95 - txfa431231-7f07-42fd-8fc7-7da9d8cc1f90 - 0.0262' stats_test_line = 'account,1,2,3' proxy_config = {'log-processor': { - - } + }, + 'swift_account': 'foo' } def test_lazy_load_internal_proxy(self): @@ -105,7 +105,8 @@ use = egg:swift#proxy with tmpfile(dummy_proxy_config) as proxy_config_file: conf = {'log-processor': { 'proxy_server_conf': proxy_config_file, - } + }, + 'swift_account': 'foo' } p = log_processor.LogProcessor(conf, DumbLogger()) self.assert_(isinstance(p._internal_proxy, @@ -116,7 +117,8 @@ use = egg:swift#proxy # test with empty config variable conf = {'log-processor': { 'proxy_server_conf': '', - } + }, + 'swift_account': 'foo' } q = log_processor.LogProcessor(conf, DumbLogger()) self.assert_(isinstance(q._internal_proxy, @@ -323,144 +325,6 @@ use = egg:swift#proxy #self.assertIsInstance(k, str) self.assertTrue(isinstance(k, str), type(k)) - def test_collate_worker(self): - try: - log_processor.LogProcessor._internal_proxy = DumbInternalProxy() - - def get_object_data(*a, **kw): - return [self.access_test_line] - orig_get_object_data = log_processor.LogProcessor.get_object_data - log_processor.LogProcessor.get_object_data = get_object_data - proxy_config = self.proxy_config.copy() - proxy_config.update({ - 'log-processor-access': { - 'source_filename_format': '%Y%m%d%H*', - 'class_path': - 'slogging.access_processor.AccessLogProcessor' - }}) - processor_args = (proxy_config, DumbLogger()) - q_in = Queue.Queue() - q_out = Queue.Queue() - work_request = ('access', 'a', 'c', 'o') - q_in.put(work_request) - q_in.put(None) - processor_klass = log_processor.LogProcessor - log_common.collate_worker(processor_klass, processor_args, - 'process_one_file', q_in, q_out) - item, ret = q_out.get() - self.assertEquals(item, work_request) - expected = {('acct', '2010', '07', '09', '04'): - {('public', 'object', 'GET', '2xx'): 1, - ('public', 'bytes_out'): 95, - 'marker_query': 0, - 'format_query': 1, - 'delimiter_query': 0, - 'path_query': 0, - ('public', 'bytes_in'): 6, - 'prefix_query': 0}} - self.assertEquals(ret, expected) - finally: - log_processor.LogProcessor._internal_proxy = None - log_processor.LogProcessor.get_object_data = orig_get_object_data - - def test_collate_worker_error(self): - def get_object_data(*a, **kw): - raise Exception() - orig_get_object_data = log_processor.LogProcessor.get_object_data - try: - log_processor.LogProcessor.get_object_data = get_object_data - proxy_config = self.proxy_config.copy() - proxy_config.update({ - 'log-processor-access': { - 'source_filename_format': '%Y%m%d%H*', - 'class_path': - 'slogging.access_processor.AccessLogProcessor' - }}) - processor_args = (proxy_config, DumbLogger()) - q_in = Queue.Queue() - q_out = Queue.Queue() - work_request = ('access', 'a', 'c', 'o') - q_in.put(work_request) - q_in.put(None) - processor_klass = log_processor.LogProcessor - log_common.collate_worker(processor_klass, processor_args, - 'process_one_file', q_in, q_out) - item, ret = q_out.get() - self.assertEquals(item, work_request) - # these only work for Py2.7+ - #self.assertIsInstance(ret, log_common.BadFileDownload) - self.assertTrue(isinstance(ret, Exception)) - finally: - log_processor.LogProcessor.get_object_data = orig_get_object_data - - def test_multiprocess_collate(self): - try: - log_processor.LogProcessor._internal_proxy = DumbInternalProxy() - - def get_object_data(*a, **kw): - return [self.access_test_line] - orig_get_object_data = log_processor.LogProcessor.get_object_data - log_processor.LogProcessor.get_object_data = get_object_data - proxy_config = self.proxy_config.copy() - proxy_config.update({ - 'log-processor-access': { - 'source_filename_format': '%Y%m%d%H*', - 'class_path': - 'slogging.access_processor.AccessLogProcessor' - }}) - processor_args = (proxy_config, DumbLogger()) - item = ('access', 'a', 'c', 'o') - logs_to_process = [item] - processor_klass = log_processor.LogProcessor - results = log_processor.multiprocess_collate(processor_klass, - processor_args, - 'process_one_file', - logs_to_process, - 1) - results = list(results) - expected = [(item, {('acct', '2010', '07', '09', '04'): - {('public', 'object', 'GET', '2xx'): 1, - ('public', 'bytes_out'): 95, - 'marker_query': 0, - 'format_query': 1, - 'delimiter_query': 0, - 'path_query': 0, - ('public', 'bytes_in'): 6, - 'prefix_query': 0}})] - self.assertEquals(results, expected) - finally: - log_processor.LogProcessor._internal_proxy = None - log_processor.LogProcessor.get_object_data = orig_get_object_data - - def test_multiprocess_collate_errors(self): - def get_object_data(*a, **kw): - raise log_common.BadFileDownload() - orig_get_object_data = log_processor.LogProcessor.get_object_data - try: - log_processor.LogProcessor.get_object_data = get_object_data - proxy_config = self.proxy_config.copy() - proxy_config.update({ - 'log-processor-access': { - 'source_filename_format': '%Y%m%d%H*', - 'class_path': - 'slogging.access_processor.AccessLogProcessor' - }}) - processor_args = (proxy_config, DumbLogger()) - item = ('access', 'a', 'c', 'o') - logs_to_process = [item] - processor_klass = log_processor.LogProcessor - results = log_common.multiprocess_collate(processor_klass, - processor_args, - 'process_one_file', - logs_to_process, - 1) - results = list(results) - expected = [] - self.assertEquals(results, expected) - finally: - log_processor.LogProcessor._internal_proxy = None - log_processor.LogProcessor.get_object_data = orig_get_object_data - class TestLogProcessorDaemon(unittest.TestCase):