Use zuul gerrit event listener implementation
Current event listener implementation does not handle network problems well. E.g. if ssh stream connection is lost, it would not reconnect or recover on its own. Instead of fixing the implementation, use well-tested gerrit listener used by zuul. Explicitly specify version of zuul to be 2.1.0 to avoid accidental breakages due to changes in zuul lib. Downside is that we need to install zuul and its dependencies just to use gerrit listener. parse_json_event function had to be changed, because zuul gerrit event listener provides object, not json string. We still need to create event from json in populate_db.py, so that part of the function has been moved there. Closes-Bug: #1516820 Change-Id: I8aa7a18460b58998f6c378e9d9c0d783032eca21
This commit is contained in:
parent
822a7a3ac2
commit
56e474d725
@ -14,14 +14,13 @@
|
|||||||
|
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
import json
|
import json
|
||||||
import paramiko
|
|
||||||
import re
|
import re
|
||||||
import time
|
|
||||||
|
|
||||||
from ciwatch.config import Config
|
from ciwatch.config import Config
|
||||||
from ciwatch import db
|
from ciwatch import db
|
||||||
from ciwatch.log import logger
|
from ciwatch.log import logger
|
||||||
from ciwatch import models
|
from ciwatch import models
|
||||||
|
from zuul.lib.gerrit import Gerrit
|
||||||
|
|
||||||
|
|
||||||
def _process_project_name(project_name):
|
def _process_project_name(project_name):
|
||||||
@ -70,49 +69,7 @@ def _store_event(event, datadir):
|
|||||||
return event
|
return event
|
||||||
|
|
||||||
|
|
||||||
class GerritEventStream(object):
|
def parse_event(event, projects):
|
||||||
def __init__(self, cfg):
|
|
||||||
|
|
||||||
logger.debug('Connecting to %(host)s:%(port)d as '
|
|
||||||
'%(user)s using %(key)s',
|
|
||||||
{'user': cfg.AccountInfo.gerrit_username,
|
|
||||||
'key': cfg.AccountInfo.gerrit_ssh_key,
|
|
||||||
'host': cfg.AccountInfo.gerrit_host,
|
|
||||||
'port': int(cfg.AccountInfo.gerrit_port)})
|
|
||||||
|
|
||||||
self.ssh = paramiko.SSHClient()
|
|
||||||
self.ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
|
|
||||||
|
|
||||||
connected = False
|
|
||||||
while not connected:
|
|
||||||
try:
|
|
||||||
self.ssh.connect(cfg.AccountInfo.gerrit_host,
|
|
||||||
int(cfg.AccountInfo.gerrit_port),
|
|
||||||
cfg.AccountInfo.gerrit_username,
|
|
||||||
key_filename=cfg.AccountInfo.gerrit_ssh_key)
|
|
||||||
connected = True
|
|
||||||
except paramiko.SSHException as e:
|
|
||||||
logger.error('%s', e)
|
|
||||||
logger.warn('Gerrit may be down, will pause and retry...')
|
|
||||||
time.sleep(10)
|
|
||||||
|
|
||||||
self.stdin, self.stdout, self.stderr =\
|
|
||||||
self.ssh.exec_command("gerrit stream-events")
|
|
||||||
|
|
||||||
def __iter__(self):
|
|
||||||
return self
|
|
||||||
|
|
||||||
def next(self):
|
|
||||||
return self.stdout.readline()
|
|
||||||
|
|
||||||
|
|
||||||
def parse_json_event(event, projects):
|
|
||||||
try:
|
|
||||||
event = json.loads(event)
|
|
||||||
except Exception as ex:
|
|
||||||
logger.error('Failed json.loads on event: %s', event)
|
|
||||||
logger.exception(ex)
|
|
||||||
return None
|
|
||||||
if _is_valid(event, projects):
|
if _is_valid(event, projects):
|
||||||
_process_event(event)
|
_process_event(event)
|
||||||
logger.info('Parsed valid event: %s', event)
|
logger.info('Parsed valid event: %s', event)
|
||||||
@ -162,17 +119,18 @@ def add_event_to_db(event, commit_=True):
|
|||||||
def main():
|
def main():
|
||||||
config = Config()
|
config = Config()
|
||||||
db.create_projects() # This will make sure the database has projects in it
|
db.create_projects() # This will make sure the database has projects in it
|
||||||
|
gerrit = Gerrit(
|
||||||
|
hostname=config.cfg.AccountInfo.gerrit_host,
|
||||||
|
username=config.cfg.AccountInfo.gerrit_username,
|
||||||
|
port=int(config.cfg.AccountInfo.gerrit_port),
|
||||||
|
keyfile=config.cfg.AccountInfo.gerrit_ssh_key
|
||||||
|
)
|
||||||
|
gerrit.startWatching()
|
||||||
while True:
|
while True:
|
||||||
try:
|
event = gerrit.getEvent()[1]
|
||||||
events = GerritEventStream(config.cfg)
|
parsed_event = parse_event(event, config.get_projects())
|
||||||
except paramiko.SSHException as ex:
|
if parsed_event is not None:
|
||||||
logger.exception('Error connecting to Gerrit: %s', ex)
|
_store_event(parsed_event, config.DATA_DIR)
|
||||||
time.sleep(60)
|
|
||||||
for event in events:
|
|
||||||
event = parse_json_event(event, config.get_projects())
|
|
||||||
if event is not None:
|
|
||||||
_store_event(event, config.DATA_DIR)
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
main()
|
main()
|
||||||
|
@ -12,21 +12,29 @@
|
|||||||
# License for the specific language governing permissions and limitations
|
# License for the specific language governing permissions and limitations
|
||||||
# under the License.
|
# under the License.
|
||||||
|
|
||||||
|
import json
|
||||||
import os
|
import os
|
||||||
|
|
||||||
from ciwatch.config import Config
|
from ciwatch.config import Config
|
||||||
from ciwatch import db
|
from ciwatch import db
|
||||||
from ciwatch.events import add_event_to_db
|
from ciwatch.events import add_event_to_db
|
||||||
from ciwatch.events import parse_json_event
|
from ciwatch.events import parse_event
|
||||||
|
from ciwatch.log import logger
|
||||||
|
|
||||||
|
|
||||||
def get_data(datafile, projects):
|
def get_data(datafile, projects):
|
||||||
data = []
|
data = []
|
||||||
with open(datafile) as file_:
|
with open(datafile) as file_:
|
||||||
for line in file_:
|
for line in file_:
|
||||||
event = parse_json_event(line, projects)
|
try:
|
||||||
if event is not None:
|
event = json.loads(line)
|
||||||
data.append(event)
|
except Exception as ex:
|
||||||
|
logger.error('Failed json.loads on event: %s', event)
|
||||||
|
logger.exception(ex)
|
||||||
|
continue
|
||||||
|
parsed_event = parse_event(event, projects)
|
||||||
|
if parsed_event is not None:
|
||||||
|
data.append(parsed_event)
|
||||||
return data
|
return data
|
||||||
|
|
||||||
|
|
||||||
|
@ -7,3 +7,4 @@ flask>=0.10
|
|||||||
sqlalchemy>=1.0
|
sqlalchemy>=1.0
|
||||||
iniparse>=0.4
|
iniparse>=0.4
|
||||||
paramiko>=1.15
|
paramiko>=1.15
|
||||||
|
zuul==2.1.0
|
||||||
|
Loading…
x
Reference in New Issue
Block a user