diff --git a/files/log-gearman-worker.py b/files/log-gearman-worker.py index 40a2f01..0bc92c9 100644 --- a/files/log-gearman-worker.py +++ b/files/log-gearman-worker.py @@ -15,24 +15,21 @@ # under the License. import argparse -import cStringIO import daemon import gear -import gzip import json import logging import os import Queue import re +import requests import select import socket import subprocess import sys import threading import time -import urllib2 import yaml -import zlib import paho.mqtt.publish as publish @@ -215,13 +212,13 @@ class LogRetriever(threading.Thread): def _handle_event(self): fields = {} - log_lines = None + num_log_lines = 0 source_url = '' + http_session = None job = self.gearman_worker.getJob() try: arguments = json.loads(job.arguments.decode('utf-8')) source_url = arguments['source_url'] - retry = arguments['retry'] event = arguments['event'] logging.debug("Handling event: " + json.dumps(event)) fields = event.get('fields') or event.get('@fields') @@ -229,7 +226,7 @@ class LogRetriever(threading.Thread): if fields['build_status'] != 'ABORTED': # Handle events ignoring aborted builds. These builds are # discarded by zuul. - log_lines = self._retrieve_log(source_url, retry) + file_obj, http_session = self._open_log_file_url(source_url) try: all_filters = [] @@ -238,12 +235,10 @@ class LogRetriever(threading.Thread): all_filters.append(f.create(fields)) filters = all_filters - logging.debug("Pushing " + str(len(log_lines)) + - " log lines.") base_event = {} base_event.update(fields) base_event["tags"] = tags - for line in log_lines: + for line in self._retrieve_log_line(file_obj): keep_line = True out_event = base_event.copy() out_event["message"] = line @@ -261,12 +256,18 @@ class LogRetriever(threading.Thread): filters = new_filters if keep_line: self.logq.put(out_event) + num_log_lines += 1 + + logging.debug("Pushed " + str(num_log_lines) + + " log lines.") finally: for f in all_filters: f.close() + if http_session: + http_session.close() job.sendWorkComplete() # Only send mqtt events for log files we processed. - if self.mqtt and log_lines: + if self.mqtt and num_log_lines: msg = json.dumps({ 'build_uuid': fields.get('build_uuid'), 'source_url': source_url, @@ -290,13 +291,27 @@ class LogRetriever(threading.Thread): 'retrieve_logs', fields.get('build_queue')) - def _retrieve_log(self, source_url, retry): - encoding = 'raw' - raw_buf = b'' + def _retrieve_log_line(self, file_obj, chunk_size=4096): + if not file_obj: + return + # Response.iter_lines automatically decodes 'gzip' and 'deflate' + # encodings. + # https://requests.readthedocs.io/en/master/user/quickstart/#raw-response-content + for line in file_obj.iter_lines(chunk_size, decode_unicode=True): + yield line + + def _open_log_file_url(self, source_url): + file_obj = None try: - encoding, raw_buf = self._get_log_data(source_url, retry) - except urllib2.HTTPError as e: - if e.code == 404: + logging.debug("Retrieving: " + source_url) + # Use a session to persist the HTTP connection across requests + # while downloading chunks of the log file. + session = requests.Session() + session.headers = {'Accept-encoding': 'deflate, gzip'} + file_obj = session.get(source_url, stream=True) + file_obj.raise_for_status() + except requests.HTTPError as e: + if e.response.status_code == 404: logging.info("Unable to retrieve %s: HTTP error 404" % source_url) else: @@ -304,41 +319,11 @@ class LogRetriever(threading.Thread): except Exception: # Silently drop fatal errors when retrieving logs. # TODO (clarkb): Handle these errors. - # Perhaps simply add a log message to raw_buf? - logging.exception("Unable to get log data.") - if encoding == 'gzip': - logging.debug("Decompressing gzipped source file.") - raw_strIO = cStringIO.StringIO(raw_buf) - f = gzip.GzipFile(fileobj=raw_strIO) - buf = f.read().decode('utf-8') - raw_strIO.close() - f.close() - elif encoding == 'deflate': - logging.debug("Decompressing deflate compressed source file.") - buf = zlib.decompress(raw_buf).decode('utf-8') - else: - logging.debug("Decoding raw source file.") - buf = raw_buf.decode('utf-8') - return buf.splitlines() - - def _get_log_data(self, source_url, retry): - encoding = 'raw' - try: - logging.debug("Retrieving: " + source_url) - req = urllib2.Request(source_url) - req.add_header('Accept-encoding', 'deflate, gzip') - r = urllib2.urlopen(req) - except: + # Perhaps simply add a log message to file_obj? logging.exception("Unable to retrieve source file.") raise - if ('gzip' in r.info().get('Content-Type', '') or - 'gzip' in r.info().get('Content-Encoding', '')): - encoding = 'gzip' - elif 'deflate' in r.info().get('Content-Encoding', ''): - encoding = 'deflate' - raw_buf = r.read() - return encoding, raw_buf + return file_obj, session class StdOutLogProcessor(object): diff --git a/manifests/init.pp b/manifests/init.pp index 42f95ae..90227fb 100644 --- a/manifests/init.pp +++ b/manifests/init.pp @@ -70,6 +70,11 @@ class log_processor ( provider => openstack_pip, require => Class['pip'], } + package { 'requests': + ensure => latest, + provider => openstack_pip, + require => Class['pip'], + } if ! defined(Package['statsd']) { package { 'statsd':