diff --git a/ciwatch/events.py b/ciwatch/events.py index a748cd1..26c270f 100644 --- a/ciwatch/events.py +++ b/ciwatch/events.py @@ -14,14 +14,13 @@ from datetime import datetime import json -import paramiko import re -import time from ciwatch.config import Config from ciwatch import db from ciwatch.log import logger from ciwatch import models +from zuul.lib.gerrit import Gerrit def _process_project_name(project_name): @@ -70,49 +69,7 @@ def _store_event(event, datadir): return event -class GerritEventStream(object): - def __init__(self, cfg): - - logger.debug('Connecting to %(host)s:%(port)d as ' - '%(user)s using %(key)s', - {'user': cfg.AccountInfo.gerrit_username, - 'key': cfg.AccountInfo.gerrit_ssh_key, - 'host': cfg.AccountInfo.gerrit_host, - 'port': int(cfg.AccountInfo.gerrit_port)}) - - self.ssh = paramiko.SSHClient() - self.ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) - - connected = False - while not connected: - try: - self.ssh.connect(cfg.AccountInfo.gerrit_host, - int(cfg.AccountInfo.gerrit_port), - cfg.AccountInfo.gerrit_username, - key_filename=cfg.AccountInfo.gerrit_ssh_key) - connected = True - except paramiko.SSHException as e: - logger.error('%s', e) - logger.warn('Gerrit may be down, will pause and retry...') - time.sleep(10) - - self.stdin, self.stdout, self.stderr =\ - self.ssh.exec_command("gerrit stream-events") - - def __iter__(self): - return self - - def next(self): - return self.stdout.readline() - - -def parse_json_event(event, projects): - try: - event = json.loads(event) - except Exception as ex: - logger.error('Failed json.loads on event: %s', event) - logger.exception(ex) - return None +def parse_event(event, projects): if _is_valid(event, projects): _process_event(event) logger.info('Parsed valid event: %s', event) @@ -162,17 +119,18 @@ def add_event_to_db(event, commit_=True): def main(): config = Config() db.create_projects() # This will make sure the database has projects in it + gerrit = Gerrit( + hostname=config.cfg.AccountInfo.gerrit_host, + username=config.cfg.AccountInfo.gerrit_username, + port=int(config.cfg.AccountInfo.gerrit_port), + keyfile=config.cfg.AccountInfo.gerrit_ssh_key + ) + gerrit.startWatching() while True: - try: - events = GerritEventStream(config.cfg) - except paramiko.SSHException as ex: - logger.exception('Error connecting to Gerrit: %s', ex) - time.sleep(60) - for event in events: - event = parse_json_event(event, config.get_projects()) - if event is not None: - _store_event(event, config.DATA_DIR) - + event = gerrit.getEvent()[1] + parsed_event = parse_event(event, config.get_projects()) + if parsed_event is not None: + _store_event(parsed_event, config.DATA_DIR) if __name__ == '__main__': main() diff --git a/ciwatch/populate.py b/ciwatch/populate.py index 3bf5a90..6e8cd54 100644 --- a/ciwatch/populate.py +++ b/ciwatch/populate.py @@ -12,21 +12,29 @@ # License for the specific language governing permissions and limitations # under the License. +import json import os from ciwatch.config import Config from ciwatch import db from ciwatch.events import add_event_to_db -from ciwatch.events import parse_json_event +from ciwatch.events import parse_event +from ciwatch.log import logger def get_data(datafile, projects): data = [] with open(datafile) as file_: for line in file_: - event = parse_json_event(line, projects) - if event is not None: - data.append(event) + try: + event = json.loads(line) + except Exception as ex: + logger.error('Failed json.loads on event: %s', event) + logger.exception(ex) + continue + parsed_event = parse_event(event, projects) + if parsed_event is not None: + data.append(parsed_event) return data diff --git a/requirements.txt b/requirements.txt index a37220b..839b96b 100644 --- a/requirements.txt +++ b/requirements.txt @@ -7,3 +7,4 @@ flask>=0.10 sqlalchemy>=1.0 iniparse>=0.4 paramiko>=1.15 +zuul==2.1.0