access log delivery now uses the common utilities in log_common
This commit is contained in:
parent
6bd1c93d93
commit
7ed5a0d23a
@ -85,6 +85,23 @@ class AccessLogDelivery(LogProcessorCommon):
|
||||
self.file_buffer = FileBuffer(buffer_limit, logger)
|
||||
self.hidden_ips = [x.strip() for x in
|
||||
conf.get('hidden_ips', '').split(',') if x.strip()]
|
||||
self.source_account = conf['log_source_account']
|
||||
self.source_container = conf.get('log_source_container_name',
|
||||
'log_data')
|
||||
|
||||
def get_logs_to_process(self, already_processed_files):
|
||||
lookback_start, lookback_end = self.calculate_lookback()
|
||||
logs_to_process = self.log_processor.get_container_listing(
|
||||
self.source_account,
|
||||
self.source_container,
|
||||
lookback_start,
|
||||
lookback_end,
|
||||
already_processed_files)
|
||||
logs_to_process = [(self.source_account, self.source_container, x)
|
||||
for x in logs_to_process]
|
||||
self.logger.info(_('loaded %d files to process') %
|
||||
len(logs_to_process))
|
||||
return logs_to_process
|
||||
|
||||
def process_one_file(self, account, container, object_name):
|
||||
files_to_upload = set()
|
||||
@ -220,18 +237,11 @@ class AccessLogDeliveryDaemon(Daemon):
|
||||
super(AccessLogDeliveryDaemon, self).__init__(c)
|
||||
self.logger = get_logger(c, log_route='access-log-delivery')
|
||||
self.log_processor = AccessLogDelivery(c, self.logger)
|
||||
self.lookback_hours = int(c.get('lookback_hours', '120'))
|
||||
self.lookback_window = int(c.get('lookback_window',
|
||||
str(self.lookback_hours)))
|
||||
self.log_delivery_account = c['swift_account']
|
||||
self.log_delivery_container = c.get('container_name',
|
||||
'access_log_delivery_data')
|
||||
self.source_account = c['log_source_account']
|
||||
self.source_container = c.get('log_source_container_name', 'log_data')
|
||||
self.target_container = c.get('target_container', '.ACCESS_LOGS')
|
||||
self.frequency = int(c.get('frequency', '3600'))
|
||||
self.processed_files_object_name = c.get('processed_files_object_name',
|
||||
'processed_files.pickle.gz')
|
||||
self.worker_count = int(c.get('worker_count', '1'))
|
||||
self.working_dir = c.get('working_dir', '/tmp/swift')
|
||||
if self.working_dir.endswith('/'):
|
||||
@ -240,65 +250,21 @@ class AccessLogDeliveryDaemon(Daemon):
|
||||
def run_once(self, *a, **kw):
|
||||
self.logger.info(_("Beginning log processing"))
|
||||
start = time.time()
|
||||
if self.lookback_hours == 0:
|
||||
lookback_start = None
|
||||
lookback_end = None
|
||||
else:
|
||||
delta_hours = datetime.timedelta(hours=self.lookback_hours)
|
||||
lookback_start = datetime.datetime.now() - delta_hours
|
||||
lookback_start = lookback_start.strftime('%Y%m%d%H')
|
||||
if self.lookback_window == 0:
|
||||
lookback_end = None
|
||||
else:
|
||||
delta_window = datetime.timedelta(hours=self.lookback_window)
|
||||
lookback_end = datetime.datetime.now() - \
|
||||
delta_hours + \
|
||||
delta_window
|
||||
lookback_end = lookback_end.strftime('%Y%m%d%H')
|
||||
self.logger.debug('lookback_start: %s' % lookback_start)
|
||||
self.logger.debug('lookback_end: %s' % lookback_end)
|
||||
try:
|
||||
# Note: this file (or data set) will grow without bound.
|
||||
# In practice, if it becomes a problem (say, after many months of
|
||||
# running), one could manually prune the file to remove older
|
||||
# entries. Automatically pruning on each run could be dangerous.
|
||||
# There is not a good way to determine when an old entry should be
|
||||
# pruned (lookback_hours could be set to anything and could change)
|
||||
processed_files_stream = self.log_processor.get_object_data(
|
||||
self.log_delivery_account,
|
||||
self.log_delivery_container,
|
||||
self.processed_files_object_name,
|
||||
compressed=True)
|
||||
buf = '\n'.join(x for x in processed_files_stream)
|
||||
if buf:
|
||||
already_processed_files = cPickle.loads(buf)
|
||||
else:
|
||||
already_processed_files = set()
|
||||
except BadFileDownload, err:
|
||||
if err.status_code == 404:
|
||||
already_processed_files = set()
|
||||
else:
|
||||
self.logger.error(_('Access log delivery unable to load list '
|
||||
'of already processed log files'))
|
||||
return
|
||||
self.logger.debug(_('found %d processed files') % \
|
||||
len(already_processed_files))
|
||||
logs_to_process = self.log_processor.get_container_listing(
|
||||
self.source_account,
|
||||
self.source_container,
|
||||
lookback_start,
|
||||
lookback_end,
|
||||
already_processed_files)
|
||||
self.logger.info(_('loaded %d files to process') %
|
||||
len(logs_to_process))
|
||||
already_processed_files = \
|
||||
self.log_processor.load_already_processed_files()
|
||||
lookback_hours = kw.get('lookback_hours')
|
||||
if lookback_hours:
|
||||
self.log_processor.lookback_hours = lookback_hours
|
||||
lookback_window = kw.get('lookback_window')
|
||||
if lookback_window:
|
||||
self.log_processor.lookback_window = lookback_window
|
||||
logs_to_process = \
|
||||
self.log_processor.get_logs_to_process(already_processed_files)
|
||||
if not logs_to_process:
|
||||
self.logger.info(_("Log processing done (%0.2f minutes)") %
|
||||
((time.time() - start) / 60))
|
||||
return
|
||||
|
||||
logs_to_process = [(self.source_account, self.source_container, x)
|
||||
for x in logs_to_process]
|
||||
|
||||
# map
|
||||
processor_args = (self.conf, self.logger)
|
||||
results = multiprocess_collate(AccessLogDelivery, processor_args,
|
||||
@ -332,12 +298,7 @@ class AccessLogDeliveryDaemon(Daemon):
|
||||
filename, account))
|
||||
|
||||
# cleanup
|
||||
s = cPickle.dumps(processed_files, cPickle.HIGHEST_PROTOCOL)
|
||||
f = cStringIO.StringIO(s)
|
||||
success = self.log_processor.internal_proxy.upload_file(f,
|
||||
self.log_delivery_account,
|
||||
self.log_delivery_container,
|
||||
self.processed_files_object_name)
|
||||
success = self.log_processor.save_processed_files(processed_files)
|
||||
if not success:
|
||||
self.logger.error('Error uploading updated processed files log')
|
||||
self.logger.info(_("Log processing done (%0.2f minutes)") %
|
||||
|
@ -63,7 +63,8 @@ class FakeMemcache(object):
|
||||
|
||||
class TestAccessLogDelivery(unittest.TestCase):
|
||||
|
||||
conf = {'swift_account': 'foo'}
|
||||
conf = {'swift_account': 'foo',
|
||||
'log_source_account': 'bar'}
|
||||
|
||||
def test_log_line_parser_query_args(self):
|
||||
p = access_log_delivery.AccessLogDelivery(self.conf, DumbLogger())
|
||||
@ -84,7 +85,8 @@ class TestAccessLogDelivery(unittest.TestCase):
|
||||
self.assertEquals(res, expected)
|
||||
|
||||
def test_log_line_parser_hidden_ip(self):
|
||||
conf = {'hidden_ips': '1.2.3.4', 'swift_account': 'foo'}
|
||||
conf = {'hidden_ips': '1.2.3.4', 'swift_account': 'foo',
|
||||
'log_source_account': 'bar'}
|
||||
p = access_log_delivery.AccessLogDelivery(conf, DumbLogger())
|
||||
log_line = [str(x) for x in range(18)]
|
||||
log_line[1] = 'proxy-server'
|
||||
@ -204,7 +206,8 @@ class TestAccessLogDelivery(unittest.TestCase):
|
||||
|
||||
def test_process_one_file(self):
|
||||
with temptree([]) as t:
|
||||
conf = {'working_dir': t, 'swift_account': 'foo'}
|
||||
conf = {'working_dir': t, 'swift_account': 'foo',
|
||||
'log_source_account': 'bar'}
|
||||
p = access_log_delivery.AccessLogDelivery(conf, DumbLogger())
|
||||
|
||||
def my_get_object_data(*a, **kw):
|
||||
|
Loading…
x
Reference in New Issue
Block a user