From 7ed5a0d23ae16bbd2410f0b9e82d87779ca0c823 Mon Sep 17 00:00:00 2001 From: John Dickinson Date: Tue, 27 Mar 2012 15:52:57 -0500 Subject: [PATCH] access log delivery now uses the common utilities in log_common --- slogging/access_log_delivery.py | 95 ++++++------------- .../unit/test_access_log_delivery.py | 9 +- 2 files changed, 34 insertions(+), 70 deletions(-) diff --git a/slogging/access_log_delivery.py b/slogging/access_log_delivery.py index 2b8384c..c9e8d95 100644 --- a/slogging/access_log_delivery.py +++ b/slogging/access_log_delivery.py @@ -85,6 +85,23 @@ class AccessLogDelivery(LogProcessorCommon): self.file_buffer = FileBuffer(buffer_limit, logger) self.hidden_ips = [x.strip() for x in conf.get('hidden_ips', '').split(',') if x.strip()] + self.source_account = conf['log_source_account'] + self.source_container = conf.get('log_source_container_name', + 'log_data') + + def get_logs_to_process(self, already_processed_files): + lookback_start, lookback_end = self.calculate_lookback() + logs_to_process = self.log_processor.get_container_listing( + self.source_account, + self.source_container, + lookback_start, + lookback_end, + already_processed_files) + logs_to_process = [(self.source_account, self.source_container, x) + for x in logs_to_process] + self.logger.info(_('loaded %d files to process') % + len(logs_to_process)) + return logs_to_process def process_one_file(self, account, container, object_name): files_to_upload = set() @@ -220,18 +237,11 @@ class AccessLogDeliveryDaemon(Daemon): super(AccessLogDeliveryDaemon, self).__init__(c) self.logger = get_logger(c, log_route='access-log-delivery') self.log_processor = AccessLogDelivery(c, self.logger) - self.lookback_hours = int(c.get('lookback_hours', '120')) - self.lookback_window = int(c.get('lookback_window', - str(self.lookback_hours))) self.log_delivery_account = c['swift_account'] self.log_delivery_container = c.get('container_name', 'access_log_delivery_data') - self.source_account = c['log_source_account'] - self.source_container = c.get('log_source_container_name', 'log_data') self.target_container = c.get('target_container', '.ACCESS_LOGS') self.frequency = int(c.get('frequency', '3600')) - self.processed_files_object_name = c.get('processed_files_object_name', - 'processed_files.pickle.gz') self.worker_count = int(c.get('worker_count', '1')) self.working_dir = c.get('working_dir', '/tmp/swift') if self.working_dir.endswith('/'): @@ -240,65 +250,21 @@ class AccessLogDeliveryDaemon(Daemon): def run_once(self, *a, **kw): self.logger.info(_("Beginning log processing")) start = time.time() - if self.lookback_hours == 0: - lookback_start = None - lookback_end = None - else: - delta_hours = datetime.timedelta(hours=self.lookback_hours) - lookback_start = datetime.datetime.now() - delta_hours - lookback_start = lookback_start.strftime('%Y%m%d%H') - if self.lookback_window == 0: - lookback_end = None - else: - delta_window = datetime.timedelta(hours=self.lookback_window) - lookback_end = datetime.datetime.now() - \ - delta_hours + \ - delta_window - lookback_end = lookback_end.strftime('%Y%m%d%H') - self.logger.debug('lookback_start: %s' % lookback_start) - self.logger.debug('lookback_end: %s' % lookback_end) - try: - # Note: this file (or data set) will grow without bound. - # In practice, if it becomes a problem (say, after many months of - # running), one could manually prune the file to remove older - # entries. Automatically pruning on each run could be dangerous. - # There is not a good way to determine when an old entry should be - # pruned (lookback_hours could be set to anything and could change) - processed_files_stream = self.log_processor.get_object_data( - self.log_delivery_account, - self.log_delivery_container, - self.processed_files_object_name, - compressed=True) - buf = '\n'.join(x for x in processed_files_stream) - if buf: - already_processed_files = cPickle.loads(buf) - else: - already_processed_files = set() - except BadFileDownload, err: - if err.status_code == 404: - already_processed_files = set() - else: - self.logger.error(_('Access log delivery unable to load list ' - 'of already processed log files')) - return - self.logger.debug(_('found %d processed files') % \ - len(already_processed_files)) - logs_to_process = self.log_processor.get_container_listing( - self.source_account, - self.source_container, - lookback_start, - lookback_end, - already_processed_files) - self.logger.info(_('loaded %d files to process') % - len(logs_to_process)) + already_processed_files = \ + self.log_processor.load_already_processed_files() + lookback_hours = kw.get('lookback_hours') + if lookback_hours: + self.log_processor.lookback_hours = lookback_hours + lookback_window = kw.get('lookback_window') + if lookback_window: + self.log_processor.lookback_window = lookback_window + logs_to_process = \ + self.log_processor.get_logs_to_process(already_processed_files) if not logs_to_process: self.logger.info(_("Log processing done (%0.2f minutes)") % ((time.time() - start) / 60)) return - logs_to_process = [(self.source_account, self.source_container, x) - for x in logs_to_process] - # map processor_args = (self.conf, self.logger) results = multiprocess_collate(AccessLogDelivery, processor_args, @@ -332,12 +298,7 @@ class AccessLogDeliveryDaemon(Daemon): filename, account)) # cleanup - s = cPickle.dumps(processed_files, cPickle.HIGHEST_PROTOCOL) - f = cStringIO.StringIO(s) - success = self.log_processor.internal_proxy.upload_file(f, - self.log_delivery_account, - self.log_delivery_container, - self.processed_files_object_name) + success = self.log_processor.save_processed_files(processed_files) if not success: self.logger.error('Error uploading updated processed files log') self.logger.info(_("Log processing done (%0.2f minutes)") % diff --git a/test_slogging/unit/test_access_log_delivery.py b/test_slogging/unit/test_access_log_delivery.py index c05340a..aca9b18 100644 --- a/test_slogging/unit/test_access_log_delivery.py +++ b/test_slogging/unit/test_access_log_delivery.py @@ -63,7 +63,8 @@ class FakeMemcache(object): class TestAccessLogDelivery(unittest.TestCase): - conf = {'swift_account': 'foo'} + conf = {'swift_account': 'foo', + 'log_source_account': 'bar'} def test_log_line_parser_query_args(self): p = access_log_delivery.AccessLogDelivery(self.conf, DumbLogger()) @@ -84,7 +85,8 @@ class TestAccessLogDelivery(unittest.TestCase): self.assertEquals(res, expected) def test_log_line_parser_hidden_ip(self): - conf = {'hidden_ips': '1.2.3.4', 'swift_account': 'foo'} + conf = {'hidden_ips': '1.2.3.4', 'swift_account': 'foo', + 'log_source_account': 'bar'} p = access_log_delivery.AccessLogDelivery(conf, DumbLogger()) log_line = [str(x) for x in range(18)] log_line[1] = 'proxy-server' @@ -204,7 +206,8 @@ class TestAccessLogDelivery(unittest.TestCase): def test_process_one_file(self): with temptree([]) as t: - conf = {'working_dir': t, 'swift_account': 'foo'} + conf = {'working_dir': t, 'swift_account': 'foo', + 'log_source_account': 'bar'} p = access_log_delivery.AccessLogDelivery(conf, DumbLogger()) def my_get_object_data(*a, **kw):