Merge pull request #38 from notmyname/fix_hangs

refactored child worker error handling to fix hangs
This commit is contained in:
John Dickinson 2012-03-28 20:58:20 -07:00
commit fd7e8e8860
6 changed files with 389 additions and 263 deletions

View File

@ -27,10 +27,11 @@ import random
import errno import errno
from swift.common.daemon import Daemon from swift.common.daemon import Daemon
from swift.common.utils import get_logger, TRUE_VALUES, split_path, lock_file from swift.common.utils import get_logger, TRUE_VALUES, split_path
from swift.common.exceptions import LockTimeout, ChunkReadTimeout from swift.common.exceptions import LockTimeout, ChunkReadTimeout
from slogging.log_common import LogProcessorCommon, multiprocess_collate, \ from slogging.log_common import LogProcessorCommon, multiprocess_collate, \
BadFileDownload BadFileDownload
from slogging.file_buffer import FileBuffer
month_map = '_ Jan Feb Mar Apr May Jun Jul Aug Sep Oct Nov Dec'.split() month_map = '_ Jan Feb Mar Apr May Jun Jul Aug Sep Oct Nov Dec'.split()
@ -70,44 +71,6 @@ def memoize(func):
return wrapped return wrapped
class FileBuffer(object):
def __init__(self, limit, logger):
self.buffers = collections.defaultdict(list)
self.limit = limit
self.logger = logger
self.total_size = 0
def write(self, filename, data):
self.buffers[filename].append(data)
self.total_size += len(data)
if self.total_size >= self.limit:
self.flush()
def flush(self):
while self.buffers:
filename_list = self.buffers.keys()
for filename in filename_list:
out = '\n'.join(self.buffers[filename]) + '\n'
mid_dirs = os.path.dirname(filename)
try:
os.makedirs(mid_dirs)
except OSError, err:
if err.errno == errno.EEXIST:
pass
else:
raise
try:
with lock_file(filename, append=True, unlink=False) as f:
f.write(out)
except LockTimeout:
# couldn't write, we'll try again later
self.logger.debug(_('Timeout writing to %s' % filename))
else:
del self.buffers[filename]
self.total_size = 0
class AccessLogDelivery(LogProcessorCommon): class AccessLogDelivery(LogProcessorCommon):
def __init__(self, conf, logger): def __init__(self, conf, logger):
@ -122,6 +85,23 @@ class AccessLogDelivery(LogProcessorCommon):
self.file_buffer = FileBuffer(buffer_limit, logger) self.file_buffer = FileBuffer(buffer_limit, logger)
self.hidden_ips = [x.strip() for x in self.hidden_ips = [x.strip() for x in
conf.get('hidden_ips', '').split(',') if x.strip()] conf.get('hidden_ips', '').split(',') if x.strip()]
self.source_account = conf['log_source_account']
self.source_container = conf.get('log_source_container_name',
'log_data')
def get_logs_to_process(self, already_processed_files):
lookback_start, lookback_end = self.calculate_lookback()
logs_to_process = self.log_processor.get_container_listing(
self.source_account,
self.source_container,
lookback_start,
lookback_end,
already_processed_files)
logs_to_process = [(self.source_account, self.source_container, x)
for x in logs_to_process]
self.logger.info(_('loaded %d files to process') %
len(logs_to_process))
return logs_to_process
def process_one_file(self, account, container, object_name): def process_one_file(self, account, container, object_name):
files_to_upload = set() files_to_upload = set()
@ -257,18 +237,11 @@ class AccessLogDeliveryDaemon(Daemon):
super(AccessLogDeliveryDaemon, self).__init__(c) super(AccessLogDeliveryDaemon, self).__init__(c)
self.logger = get_logger(c, log_route='access-log-delivery') self.logger = get_logger(c, log_route='access-log-delivery')
self.log_processor = AccessLogDelivery(c, self.logger) self.log_processor = AccessLogDelivery(c, self.logger)
self.lookback_hours = int(c.get('lookback_hours', '120'))
self.lookback_window = int(c.get('lookback_window',
str(self.lookback_hours)))
self.log_delivery_account = c['swift_account'] self.log_delivery_account = c['swift_account']
self.log_delivery_container = c.get('container_name', self.log_delivery_container = c.get('container_name',
'access_log_delivery_data') 'access_log_delivery_data')
self.source_account = c['log_source_account']
self.source_container = c.get('log_source_container_name', 'log_data')
self.target_container = c.get('target_container', '.ACCESS_LOGS') self.target_container = c.get('target_container', '.ACCESS_LOGS')
self.frequency = int(c.get('frequency', '3600')) self.frequency = int(c.get('frequency', '3600'))
self.processed_files_object_name = c.get('processed_files_object_name',
'processed_files.pickle.gz')
self.worker_count = int(c.get('worker_count', '1')) self.worker_count = int(c.get('worker_count', '1'))
self.working_dir = c.get('working_dir', '/tmp/swift') self.working_dir = c.get('working_dir', '/tmp/swift')
if self.working_dir.endswith('/'): if self.working_dir.endswith('/'):
@ -277,65 +250,21 @@ class AccessLogDeliveryDaemon(Daemon):
def run_once(self, *a, **kw): def run_once(self, *a, **kw):
self.logger.info(_("Beginning log processing")) self.logger.info(_("Beginning log processing"))
start = time.time() start = time.time()
if self.lookback_hours == 0: already_processed_files = \
lookback_start = None self.log_processor.load_already_processed_files()
lookback_end = None lookback_hours = kw.get('lookback_hours')
else: if lookback_hours:
delta_hours = datetime.timedelta(hours=self.lookback_hours) self.log_processor.lookback_hours = lookback_hours
lookback_start = datetime.datetime.now() - delta_hours lookback_window = kw.get('lookback_window')
lookback_start = lookback_start.strftime('%Y%m%d%H') if lookback_window:
if self.lookback_window == 0: self.log_processor.lookback_window = lookback_window
lookback_end = None logs_to_process = \
else: self.log_processor.get_logs_to_process(already_processed_files)
delta_window = datetime.timedelta(hours=self.lookback_window)
lookback_end = datetime.datetime.now() - \
delta_hours + \
delta_window
lookback_end = lookback_end.strftime('%Y%m%d%H')
self.logger.debug('lookback_start: %s' % lookback_start)
self.logger.debug('lookback_end: %s' % lookback_end)
try:
# Note: this file (or data set) will grow without bound.
# In practice, if it becomes a problem (say, after many months of
# running), one could manually prune the file to remove older
# entries. Automatically pruning on each run could be dangerous.
# There is not a good way to determine when an old entry should be
# pruned (lookback_hours could be set to anything and could change)
processed_files_stream = self.log_processor.get_object_data(
self.log_delivery_account,
self.log_delivery_container,
self.processed_files_object_name,
compressed=True)
buf = '\n'.join(x for x in processed_files_stream)
if buf:
already_processed_files = cPickle.loads(buf)
else:
already_processed_files = set()
except BadFileDownload, err:
if err.status_code == 404:
already_processed_files = set()
else:
self.logger.error(_('Access log delivery unable to load list '
'of already processed log files'))
return
self.logger.debug(_('found %d processed files') % \
len(already_processed_files))
logs_to_process = self.log_processor.get_container_listing(
self.source_account,
self.source_container,
lookback_start,
lookback_end,
already_processed_files)
self.logger.info(_('loaded %d files to process') %
len(logs_to_process))
if not logs_to_process: if not logs_to_process:
self.logger.info(_("Log processing done (%0.2f minutes)") % self.logger.info(_("Log processing done (%0.2f minutes)") %
((time.time() - start) / 60)) ((time.time() - start) / 60))
return return
logs_to_process = [(self.source_account, self.source_container, x)
for x in logs_to_process]
# map # map
processor_args = (self.conf, self.logger) processor_args = (self.conf, self.logger)
results = multiprocess_collate(AccessLogDelivery, processor_args, results = multiprocess_collate(AccessLogDelivery, processor_args,
@ -369,12 +298,7 @@ class AccessLogDeliveryDaemon(Daemon):
filename, account)) filename, account))
# cleanup # cleanup
s = cPickle.dumps(processed_files, cPickle.HIGHEST_PROTOCOL) success = self.log_processor.save_processed_files(processed_files)
f = cStringIO.StringIO(s)
success = self.log_processor.internal_proxy.upload_file(f,
self.log_delivery_account,
self.log_delivery_container,
self.processed_files_object_name)
if not success: if not success:
self.logger.error('Error uploading updated processed files log') self.logger.error('Error uploading updated processed files log')
self.logger.info(_("Log processing done (%0.2f minutes)") % self.logger.info(_("Log processing done (%0.2f minutes)") %

41
slogging/file_buffer.py Normal file
View File

@ -0,0 +1,41 @@
import os
import collections
from swift.common.utils import lock_file
class FileBuffer(object):
def __init__(self, limit, logger):
self.buffers = collections.defaultdict(list)
self.limit = limit
self.logger = logger
self.total_size = 0
def write(self, filename, data):
self.buffers[filename].append(data)
self.total_size += len(data)
if self.total_size >= self.limit:
self.flush()
def flush(self):
while self.buffers:
filename_list = self.buffers.keys()
for filename in filename_list:
out = '\n'.join(self.buffers[filename]) + '\n'
mid_dirs = os.path.dirname(filename)
try:
os.makedirs(mid_dirs)
except OSError, err:
if err.errno == errno.EEXIST:
pass
else:
raise
try:
with lock_file(filename, append=True, unlink=False) as f:
f.write(out)
except LockTimeout:
# couldn't write, we'll try again later
self.logger.debug(_('Timeout writing to %s' % filename))
else:
del self.buffers[filename]
self.total_size = 0

View File

@ -15,7 +15,7 @@
import multiprocessing import multiprocessing
import Queue import Queue
import datetime from datetime import datetime, timedelta
import zlib import zlib
import time import time
from paste.deploy import appconfig from paste.deploy import appconfig
@ -23,6 +23,8 @@ from contextlib import contextmanager
import os import os
import errno import errno
import fcntl import fcntl
import sys
import traceback
from eventlet import sleep from eventlet import sleep
@ -37,6 +39,12 @@ class BadFileDownload(Exception):
self.status_code = status_code self.status_code = status_code
class WorkerError(Exception):
def __init__(self):
self.tb_str = '' # ensure that there is always something here
class LogProcessorCommon(object): class LogProcessorCommon(object):
def __init__(self, conf, logger, log_route='log-processor'): def __init__(self, conf, logger, log_route='log-processor'):
@ -49,6 +57,15 @@ class LogProcessorCommon(object):
if s.strip()]) if s.strip()])
self.conf = conf self.conf = conf
self._internal_proxy = None self._internal_proxy = None
self.lookback_hours = int(conf.get('lookback_hours', '120'))
self.lookback_window = int(conf.get('lookback_window',
self.lookback_hours))
self.log_processor_account = conf['swift_account']
self.log_processor_container = conf.get('container_name',
'simple_billing_data')
self.processed_files_object_name = \
conf.get('processed_files_object_name',
'processed_files.pickle.gz')
@property @property
def internal_proxy(self): def internal_proxy(self):
@ -71,6 +88,60 @@ class LogProcessorCommon(object):
retries=3) retries=3)
return self._internal_proxy return self._internal_proxy
def calculate_lookback(self):
if self.lookback_hours == 0:
lookback_start = None
lookback_end = None
else:
delta_hours = timedelta(hours=self.lookback_hours)
lookback_start = datetime.now() - delta_hours
lookback_start = lookback_start.strftime('%Y%m%d%H')
if self.lookback_window == 0:
lookback_end = None
else:
delta_window = timedelta(hours=self.lookback_window)
lookback_end = datetime.now() - delta_hours + delta_window
lookback_end = lookback_end.strftime('%Y%m%d%H')
self.logger.debug('lookback_start: %s' % lookback_start)
self.logger.debug('lookback_end: %s' % lookback_end)
return (lookback_start, lookback_end)
def load_already_processed_files(self):
try:
# Note: this file (or data set) will grow without bound.
# In practice, if it becomes a problem (say, after many months of
# running), one could manually prune the file to remove older
# entries. Automatically pruning on each run could be dangerous.
# There is not a good way to determine when an old entry should be
# pruned (lookback_hours could be set to anything and could change)
processed_files_stream = self.get_object_data(
self.log_processor_account,
self.log_processor_container,
self.processed_files_object_name,
compressed=True)
buf = '\n'.join(x for x in processed_files_stream)
if buf:
already_processed_files = cPickle.loads(buf)
else:
already_processed_files = set()
except BadFileDownload, err:
if err.status_code == 404:
already_processed_files = set()
else:
self.logger.error(_('Simple billing unable to load list '
'of already processed log files'))
return
self.logger.debug(_('found %d processed files') % \
len(already_processed_files))
return already_processed_files
def save_processed_files(self, processed_files_data):
s = cPickle.dumps(processed_files_data, cPickle.HIGHEST_PROTOCOL)
f = cStringIO.StringIO(s)
return self.internal_proxy.upload_file(f, self.log_processor_account,
self.log_processor_container,
self.processed_files_object_name)
def get_object_data(self, swift_account, container_name, object_name, def get_object_data(self, swift_account, container_name, object_name,
compressed=False): compressed=False):
'''reads an object and yields its lines''' '''reads an object and yields its lines'''
@ -171,7 +242,8 @@ def multiprocess_collate(processor_klass, processor_args, processor_method,
processor_args, processor_args,
processor_method, processor_method,
in_queue, in_queue,
out_queue)) out_queue,
logger))
p.start() p.start()
results.append(p) results.append(p)
for x in items_to_process: for x in items_to_process:
@ -188,9 +260,9 @@ def multiprocess_collate(processor_klass, processor_args, processor_method,
if logger: if logger:
logger.exception('error reading from out queue') logger.exception('error reading from out queue')
else: else:
if isinstance(data, Exception): if isinstance(data, WorkerError):
if logger: if logger:
logger.exception(data) logger.error(data.tb_str)
else: else:
yield item, data yield item, data
if not any(r.is_alive() for r in results) and out_queue.empty(): if not any(r.is_alive() for r in results) and out_queue.empty():
@ -199,7 +271,7 @@ def multiprocess_collate(processor_klass, processor_args, processor_method,
def collate_worker(processor_klass, processor_args, processor_method, in_queue, def collate_worker(processor_klass, processor_args, processor_method, in_queue,
out_queue): out_queue, logger=None):
'''worker process for multiprocess_collate''' '''worker process for multiprocess_collate'''
try: try:
p = processor_klass(*processor_args) p = processor_klass(*processor_args)
@ -214,11 +286,17 @@ def collate_worker(processor_klass, processor_args, processor_method, in_queue,
return return
try: try:
ret = method(*item) ret = method(*item)
except Exception, err: except:
ret = err err_type, err, tb = sys.exc_info()
# Use err_type since unplickling err in the parent process
# will fail if it has a custom constructor with required
# parameters.
ret = WorkerError()
ret.tb_str = ''.join(traceback.format_tb(tb))
out_queue.put((item, ret)) out_queue.put((item, ret))
except Exception, err: except Exception, err:
print '****ERROR in worker****\n%r\n********' % err if logger:
logger.exception('Error in worker')
finally: finally:
in_queue.close() in_queue.close()
out_queue.close() out_queue.close()

View File

@ -63,8 +63,11 @@ class FakeMemcache(object):
class TestAccessLogDelivery(unittest.TestCase): class TestAccessLogDelivery(unittest.TestCase):
conf = {'swift_account': 'foo',
'log_source_account': 'bar'}
def test_log_line_parser_query_args(self): def test_log_line_parser_query_args(self):
p = access_log_delivery.AccessLogDelivery({}, DumbLogger()) p = access_log_delivery.AccessLogDelivery(self.conf, DumbLogger())
log_line = [str(x) for x in range(18)] log_line = [str(x) for x in range(18)]
log_line[1] = 'proxy-server' log_line[1] = 'proxy-server'
log_line[4] = '1/Jan/3/4/5/6' log_line[4] = '1/Jan/3/4/5/6'
@ -82,7 +85,8 @@ class TestAccessLogDelivery(unittest.TestCase):
self.assertEquals(res, expected) self.assertEquals(res, expected)
def test_log_line_parser_hidden_ip(self): def test_log_line_parser_hidden_ip(self):
conf = {'hidden_ips': '1.2.3.4'} conf = {'hidden_ips': '1.2.3.4', 'swift_account': 'foo',
'log_source_account': 'bar'}
p = access_log_delivery.AccessLogDelivery(conf, DumbLogger()) p = access_log_delivery.AccessLogDelivery(conf, DumbLogger())
log_line = [str(x) for x in range(18)] log_line = [str(x) for x in range(18)]
log_line[1] = 'proxy-server' log_line[1] = 'proxy-server'
@ -104,7 +108,7 @@ class TestAccessLogDelivery(unittest.TestCase):
self.assertEquals(res['client_ip'], expected) self.assertEquals(res['client_ip'], expected)
def test_log_line_parser_field_count(self): def test_log_line_parser_field_count(self):
p = access_log_delivery.AccessLogDelivery({}, DumbLogger()) p = access_log_delivery.AccessLogDelivery(self.conf, DumbLogger())
# too few fields # too few fields
log_line = [str(x) for x in range(17)] log_line = [str(x) for x in range(17)]
log_line[1] = 'proxy-server' log_line[1] = 'proxy-server'
@ -148,7 +152,7 @@ class TestAccessLogDelivery(unittest.TestCase):
self.assertEquals(res, expected) self.assertEquals(res, expected)
def test_make_clf_from_parts(self): def test_make_clf_from_parts(self):
p = access_log_delivery.AccessLogDelivery({}, DumbLogger()) p = access_log_delivery.AccessLogDelivery(self.conf, DumbLogger())
log_line = [str(x) for x in range(18)] log_line = [str(x) for x in range(18)]
log_line[1] = 'proxy-server' log_line[1] = 'proxy-server'
log_line[4] = '1/Jan/3/4/5/6' log_line[4] = '1/Jan/3/4/5/6'
@ -160,7 +164,7 @@ class TestAccessLogDelivery(unittest.TestCase):
self.assertEquals(clf, expect) self.assertEquals(clf, expect)
def test_convert_log_line(self): def test_convert_log_line(self):
p = access_log_delivery.AccessLogDelivery({}, DumbLogger()) p = access_log_delivery.AccessLogDelivery(self.conf, DumbLogger())
log_line = [str(x) for x in range(18)] log_line = [str(x) for x in range(18)]
log_line[1] = 'proxy-server' log_line[1] = 'proxy-server'
log_line[4] = '1/Jan/3/4/5/6' log_line[4] = '1/Jan/3/4/5/6'
@ -174,7 +178,7 @@ class TestAccessLogDelivery(unittest.TestCase):
self.assertEquals(res, expected) self.assertEquals(res, expected)
def test_get_container_save_log_flag(self): def test_get_container_save_log_flag(self):
p = access_log_delivery.AccessLogDelivery({}, DumbLogger()) p = access_log_delivery.AccessLogDelivery(self.conf, DumbLogger())
def my_get_metadata_true(*a, **kw): def my_get_metadata_true(*a, **kw):
return {p.metadata_key: 'yes'} return {p.metadata_key: 'yes'}
@ -202,7 +206,8 @@ class TestAccessLogDelivery(unittest.TestCase):
def test_process_one_file(self): def test_process_one_file(self):
with temptree([]) as t: with temptree([]) as t:
conf = {'working_dir': t} conf = {'working_dir': t, 'swift_account': 'foo',
'log_source_account': 'bar'}
p = access_log_delivery.AccessLogDelivery(conf, DumbLogger()) p = access_log_delivery.AccessLogDelivery(conf, DumbLogger())
def my_get_object_data(*a, **kw): def my_get_object_data(*a, **kw):

View File

@ -0,0 +1,214 @@
import unittest
import Queue
from slogging import log_common
from slogging import log_processor
class DumbLogger(object):
def __getattr__(self, n):
return self.foo
def foo(self, *a, **kw):
pass
class DumbInternalProxy(object):
def __init__(self, code=200, timeout=False, bad_compressed=False):
self.code = code
self.timeout = timeout
self.bad_compressed = bad_compressed
def get_container_list(self, account, container, marker=None,
end_marker=None):
n = '2010/03/14/13/obj1'
if marker is None or n > marker:
if end_marker:
if n <= end_marker:
return [{'name': n}]
else:
return []
return [{'name': n}]
return []
def get_object(self, account, container, object_name):
if object_name.endswith('.gz'):
if self.bad_compressed:
# invalid compressed data
def data():
yield '\xff\xff\xff\xff\xff\xff\xff'
else:
# 'obj\ndata', compressed with gzip -9
def data():
yield '\x1f\x8b\x08'
yield '\x08"\xd79L'
yield '\x02\x03te'
yield 'st\x00\xcbO'
yield '\xca\xe2JI,I'
yield '\xe4\x02\x00O\xff'
yield '\xa3Y\t\x00\x00\x00'
else:
def data():
yield 'obj\n'
if self.timeout:
raise ChunkReadTimeout
yield 'data'
return self.code, data()
class TestLogProcessor(unittest.TestCase):
proxy_config = {'log-processor': {},
'swift_account': 'foo'
}
access_test_line = 'Jul 9 04:14:30 saio proxy-server 1.2.3.4 4.5.6.7 '\
'09/Jul/2010/04/14/30 GET '\
'/v1/acct/foo/bar?format=json&foo HTTP/1.0 200 - '\
'curl tk4e350daf-9338-4cc6-aabb-090e49babfbd '\
'6 95 - txfa431231-7f07-42fd-8fc7-7da9d8cc1f90 - 0.0262'
def test_collate_worker(self):
try:
log_processor.LogProcessor._internal_proxy = DumbInternalProxy()
def get_object_data(*a, **kw):
return [self.access_test_line]
orig_get_object_data = log_processor.LogProcessor.get_object_data
log_processor.LogProcessor.get_object_data = get_object_data
proxy_config = self.proxy_config.copy()
proxy_config.update({
'log-processor-access': {
'source_filename_format': '%Y%m%d%H*',
'class_path':
'slogging.access_processor.AccessLogProcessor'
}})
processor_args = (proxy_config, DumbLogger())
q_in = Queue.Queue()
q_in.close = lambda: None
q_out = Queue.Queue()
q_out.close = lambda: None
work_request = ('access', 'a', 'c', 'o')
q_in.put(work_request)
q_in.put(None)
processor_klass = log_processor.LogProcessor
log_common.collate_worker(processor_klass, processor_args,
'process_one_file', q_in, q_out,
DumbLogger())
item, ret = q_out.get()
self.assertEquals(item, work_request)
expected = {('acct', '2010', '07', '09', '04'):
{('public', 'object', 'GET', '2xx'): 1,
('public', 'bytes_out'): 95,
'marker_query': 0,
'format_query': 1,
'delimiter_query': 0,
'path_query': 0,
('public', 'bytes_in'): 6,
'prefix_query': 0}}
self.assertEquals(ret, expected)
finally:
log_processor.LogProcessor._internal_proxy = None
log_processor.LogProcessor.get_object_data = orig_get_object_data
def test_collate_worker_error(self):
def get_object_data(*a, **kw):
raise Exception()
orig_get_object_data = log_processor.LogProcessor.get_object_data
try:
log_processor.LogProcessor.get_object_data = get_object_data
proxy_config = self.proxy_config.copy()
proxy_config.update({
'log-processor-access': {
'source_filename_format': '%Y%m%d%H*',
'class_path':
'slogging.access_processor.AccessLogProcessor'
}})
processor_args = (proxy_config, DumbLogger())
q_in = Queue.Queue()
q_in.close = lambda: None
q_out = Queue.Queue()
q_out.close = lambda: None
work_request = ('access', 'a', 'c', 'o')
q_in.put(work_request)
q_in.put(None)
processor_klass = log_processor.LogProcessor
log_common.collate_worker(processor_klass, processor_args,
'process_one_file', q_in, q_out,
DumbLogger())
item, ret = q_out.get()
self.assertEquals(item, work_request)
# these only work for Py2.7+
#self.assertIsInstance(ret, log_common.BadFileDownload)
self.assertTrue(isinstance(ret, Exception))
finally:
log_processor.LogProcessor.get_object_data = orig_get_object_data
def test_multiprocess_collate(self):
try:
log_processor.LogProcessor._internal_proxy = DumbInternalProxy()
def get_object_data(*a, **kw):
return [self.access_test_line]
orig_get_object_data = log_processor.LogProcessor.get_object_data
log_processor.LogProcessor.get_object_data = get_object_data
proxy_config = self.proxy_config.copy()
proxy_config.update({
'log-processor-access': {
'source_filename_format': '%Y%m%d%H*',
'class_path':
'slogging.access_processor.AccessLogProcessor'
}})
processor_args = (proxy_config, DumbLogger())
item = ('access', 'a', 'c', 'o')
logs_to_process = [item]
processor_klass = log_processor.LogProcessor
results = log_processor.multiprocess_collate(processor_klass,
processor_args,
'process_one_file',
logs_to_process,
1,
DumbLogger())
results = list(results)
expected = [(item, {('acct', '2010', '07', '09', '04'):
{('public', 'object', 'GET', '2xx'): 1,
('public', 'bytes_out'): 95,
'marker_query': 0,
'format_query': 1,
'delimiter_query': 0,
'path_query': 0,
('public', 'bytes_in'): 6,
'prefix_query': 0}})]
self.assertEquals(results, expected)
finally:
log_processor.LogProcessor._internal_proxy = None
log_processor.LogProcessor.get_object_data = orig_get_object_data
def test_multiprocess_collate_errors(self):
def get_object_data(*a, **kw):
raise log_common.BadFileDownload()
orig_get_object_data = log_processor.LogProcessor.get_object_data
try:
log_processor.LogProcessor.get_object_data = get_object_data
proxy_config = self.proxy_config.copy()
proxy_config.update({
'log-processor-access': {
'source_filename_format': '%Y%m%d%H*',
'class_path':
'slogging.access_processor.AccessLogProcessor'
}})
processor_args = (proxy_config, DumbLogger())
item = ('access', 'a', 'c', 'o')
logs_to_process = [item]
processor_klass = log_processor.LogProcessor
results = log_common.multiprocess_collate(processor_klass,
processor_args,
'process_one_file',
logs_to_process,
1,
DumbLogger())
results = list(results)
expected = []
self.assertEquals(results, expected)
finally:
log_processor.LogProcessor._internal_proxy = None
log_processor.LogProcessor.get_object_data = orig_get_object_data

View File

@ -92,8 +92,8 @@ class TestLogProcessor(unittest.TestCase):
'6 95 - txfa431231-7f07-42fd-8fc7-7da9d8cc1f90 - 0.0262' '6 95 - txfa431231-7f07-42fd-8fc7-7da9d8cc1f90 - 0.0262'
stats_test_line = 'account,1,2,3' stats_test_line = 'account,1,2,3'
proxy_config = {'log-processor': { proxy_config = {'log-processor': {
},
} 'swift_account': 'foo'
} }
def test_lazy_load_internal_proxy(self): def test_lazy_load_internal_proxy(self):
@ -105,7 +105,8 @@ use = egg:swift#proxy
with tmpfile(dummy_proxy_config) as proxy_config_file: with tmpfile(dummy_proxy_config) as proxy_config_file:
conf = {'log-processor': { conf = {'log-processor': {
'proxy_server_conf': proxy_config_file, 'proxy_server_conf': proxy_config_file,
} },
'swift_account': 'foo'
} }
p = log_processor.LogProcessor(conf, DumbLogger()) p = log_processor.LogProcessor(conf, DumbLogger())
self.assert_(isinstance(p._internal_proxy, self.assert_(isinstance(p._internal_proxy,
@ -116,7 +117,8 @@ use = egg:swift#proxy
# test with empty config variable # test with empty config variable
conf = {'log-processor': { conf = {'log-processor': {
'proxy_server_conf': '', 'proxy_server_conf': '',
} },
'swift_account': 'foo'
} }
q = log_processor.LogProcessor(conf, DumbLogger()) q = log_processor.LogProcessor(conf, DumbLogger())
self.assert_(isinstance(q._internal_proxy, self.assert_(isinstance(q._internal_proxy,
@ -323,144 +325,6 @@ use = egg:swift#proxy
#self.assertIsInstance(k, str) #self.assertIsInstance(k, str)
self.assertTrue(isinstance(k, str), type(k)) self.assertTrue(isinstance(k, str), type(k))
def test_collate_worker(self):
try:
log_processor.LogProcessor._internal_proxy = DumbInternalProxy()
def get_object_data(*a, **kw):
return [self.access_test_line]
orig_get_object_data = log_processor.LogProcessor.get_object_data
log_processor.LogProcessor.get_object_data = get_object_data
proxy_config = self.proxy_config.copy()
proxy_config.update({
'log-processor-access': {
'source_filename_format': '%Y%m%d%H*',
'class_path':
'slogging.access_processor.AccessLogProcessor'
}})
processor_args = (proxy_config, DumbLogger())
q_in = Queue.Queue()
q_out = Queue.Queue()
work_request = ('access', 'a', 'c', 'o')
q_in.put(work_request)
q_in.put(None)
processor_klass = log_processor.LogProcessor
log_common.collate_worker(processor_klass, processor_args,
'process_one_file', q_in, q_out)
item, ret = q_out.get()
self.assertEquals(item, work_request)
expected = {('acct', '2010', '07', '09', '04'):
{('public', 'object', 'GET', '2xx'): 1,
('public', 'bytes_out'): 95,
'marker_query': 0,
'format_query': 1,
'delimiter_query': 0,
'path_query': 0,
('public', 'bytes_in'): 6,
'prefix_query': 0}}
self.assertEquals(ret, expected)
finally:
log_processor.LogProcessor._internal_proxy = None
log_processor.LogProcessor.get_object_data = orig_get_object_data
def test_collate_worker_error(self):
def get_object_data(*a, **kw):
raise Exception()
orig_get_object_data = log_processor.LogProcessor.get_object_data
try:
log_processor.LogProcessor.get_object_data = get_object_data
proxy_config = self.proxy_config.copy()
proxy_config.update({
'log-processor-access': {
'source_filename_format': '%Y%m%d%H*',
'class_path':
'slogging.access_processor.AccessLogProcessor'
}})
processor_args = (proxy_config, DumbLogger())
q_in = Queue.Queue()
q_out = Queue.Queue()
work_request = ('access', 'a', 'c', 'o')
q_in.put(work_request)
q_in.put(None)
processor_klass = log_processor.LogProcessor
log_common.collate_worker(processor_klass, processor_args,
'process_one_file', q_in, q_out)
item, ret = q_out.get()
self.assertEquals(item, work_request)
# these only work for Py2.7+
#self.assertIsInstance(ret, log_common.BadFileDownload)
self.assertTrue(isinstance(ret, Exception))
finally:
log_processor.LogProcessor.get_object_data = orig_get_object_data
def test_multiprocess_collate(self):
try:
log_processor.LogProcessor._internal_proxy = DumbInternalProxy()
def get_object_data(*a, **kw):
return [self.access_test_line]
orig_get_object_data = log_processor.LogProcessor.get_object_data
log_processor.LogProcessor.get_object_data = get_object_data
proxy_config = self.proxy_config.copy()
proxy_config.update({
'log-processor-access': {
'source_filename_format': '%Y%m%d%H*',
'class_path':
'slogging.access_processor.AccessLogProcessor'
}})
processor_args = (proxy_config, DumbLogger())
item = ('access', 'a', 'c', 'o')
logs_to_process = [item]
processor_klass = log_processor.LogProcessor
results = log_processor.multiprocess_collate(processor_klass,
processor_args,
'process_one_file',
logs_to_process,
1)
results = list(results)
expected = [(item, {('acct', '2010', '07', '09', '04'):
{('public', 'object', 'GET', '2xx'): 1,
('public', 'bytes_out'): 95,
'marker_query': 0,
'format_query': 1,
'delimiter_query': 0,
'path_query': 0,
('public', 'bytes_in'): 6,
'prefix_query': 0}})]
self.assertEquals(results, expected)
finally:
log_processor.LogProcessor._internal_proxy = None
log_processor.LogProcessor.get_object_data = orig_get_object_data
def test_multiprocess_collate_errors(self):
def get_object_data(*a, **kw):
raise log_common.BadFileDownload()
orig_get_object_data = log_processor.LogProcessor.get_object_data
try:
log_processor.LogProcessor.get_object_data = get_object_data
proxy_config = self.proxy_config.copy()
proxy_config.update({
'log-processor-access': {
'source_filename_format': '%Y%m%d%H*',
'class_path':
'slogging.access_processor.AccessLogProcessor'
}})
processor_args = (proxy_config, DumbLogger())
item = ('access', 'a', 'c', 'o')
logs_to_process = [item]
processor_klass = log_processor.LogProcessor
results = log_common.multiprocess_collate(processor_klass,
processor_args,
'process_one_file',
logs_to_process,
1)
results = list(results)
expected = []
self.assertEquals(results, expected)
finally:
log_processor.LogProcessor._internal_proxy = None
log_processor.LogProcessor.get_object_data = orig_get_object_data
class TestLogProcessorDaemon(unittest.TestCase): class TestLogProcessorDaemon(unittest.TestCase):