Evgeny Antyshev d291e7f1e7 Keepalives for gerrit connections
It is off by default (keepalive=0), which means connections may hang forever
when media is not reliable.
This change applies the same change as proposed for Zuul:
https://review.openstack.org/238988

Support in puppets going with dependent review

Change-Id: Ifaf6aa28ad65305592839c1bd2ca90157a32ab3c
2016-01-18 10:05:08 +00:00

221 lines
6.9 KiB
Python

# Copyright 2011 OpenStack, LLC.
# Copyright 2012 Hewlett-Packard Development Company, L.P.
# Copyright 2015 Pure Storage, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import threading
import json
import time
from six.moves import queue as Queue
import paramiko
import logging
import pprint
class GerritWatcher(threading.Thread):
log = logging.getLogger("gerrit.GerritWatcher")
def __init__(self, gerrit, username, hostname, port=29418, keyfile=None,
keepalive=0):
threading.Thread.__init__(self)
self.username = username
self.keyfile = keyfile
self.hostname = hostname
self.port = port
self.gerrit = gerrit
self.keepalive = keepalive
def _read(self, fd):
l = fd.readline()
data = json.loads(l)
self.log.debug("Received data from Gerrit event stream: \n%s" %
str(data))
self.gerrit.addEvent(data)
def _listen(self, stdout, stderr):
while True:
self._read(stdout)
def _run(self):
try:
client = paramiko.SSHClient()
client.load_system_host_keys()
client.set_missing_host_key_policy(paramiko.WarningPolicy())
client.connect(self.hostname,
username=self.username,
port=self.port,
key_filename=self.keyfile)
transport = client.get_transport()
transport.set_keepalive(self.keepalive)
stdin, stdout, stderr = client.exec_command("gerrit stream-events")
self._listen(stdout, stderr)
ret = stdout.channel.recv_exit_status()
self.log.debug("SSH exit status: %s" % ret)
if ret:
raise Exception("Gerrit error executing stream-events")
except:
self.log.exception("Exception on ssh event stream:")
time.sleep(5)
def run(self):
while True:
self._run()
class Gerrit(object):
log = logging.getLogger("gerrit.Gerrit")
def __init__(self, hostname, username, port=29418, keyfile=None, keepalive=0):
self.username = username
self.hostname = hostname
self.port = port
self.keyfile = keyfile
self.watcher_thread = None
self.event_queue = None
self.client = None
self.keepalive = keepalive
def startWatching(self):
self.event_queue = Queue.Queue()
self.watcher_thread = GerritWatcher(
self,
self.username,
self.hostname,
self.port,
keyfile=self.keyfile,
keepalive=self.keepalive)
self.watcher_thread.daemon = True
self.watcher_thread.start()
def addEvent(self, data):
return self.event_queue.put(data)
def getEvent(self):
return self.event_queue.get()
def eventDone(self):
self.event_queue.task_done()
def review(self, project, change, message, action={}):
cmd = 'gerrit review --project %s' % project
if message:
cmd += ' --message "%s"' % message
for k, v in action.items():
if v is True:
cmd += ' --%s' % k
else:
cmd += ' --label %s=%s' % (k, v)
cmd += ' %s' % change
out, err = self._ssh(cmd)
return err
def query(self, query):
args = '--all-approvals --comments --commit-message'
args += ' --current-patch-set --dependencies --files'
args += ' --patch-sets --submit-records'
cmd = 'gerrit query --format json %s %s' % (
args, query)
out, err = self._ssh(cmd)
if not out:
return False
lines = out.split('\n')
if not lines:
return False
data = json.loads(lines[0])
if not data:
return False
self.log.debug("Received data from Gerrit query: \n%s" %
(pprint.pformat(data)))
return data
def simpleQuery(self, query):
def _query_chunk(query):
args = '--current-patch-set'
cmd = 'gerrit query --format json %s %s' % (
args, query)
out, err = self._ssh(cmd)
if not out:
return False
lines = out.split('\n')
if not lines:
return False
data = [json.loads(line) for line in lines
if "sortKey" in line]
if not data:
return False
self.log.debug("Received data from Gerrit query: \n%s" %
(pprint.pformat(data)))
return data
# gerrit returns 500 results by default, so implement paging
# for large projects like nova
alldata = []
chunk = _query_chunk(query)
while(chunk):
alldata.extend(chunk)
sortkey = "resume_sortkey:'%s'" % chunk[-1]["sortKey"]
chunk = _query_chunk("%s %s" % (query, sortkey))
return alldata
def lsMembers(self, group):
cmd = "gerrit ls-members '%s'" % group
out, err = self._ssh(cmd)
if not out:
return False
lines = out.split('\n')
if not lines:
return False
return lines
def _open(self):
client = paramiko.SSHClient()
client.load_system_host_keys()
client.set_missing_host_key_policy(paramiko.WarningPolicy())
client.connect(self.hostname,
username=self.username,
port=self.port,
key_filename=self.keyfile)
transport = client.get_transport()
transport.set_keepalive(self.keepalive)
self.client = client
def _ssh(self, command):
if not self.client:
self._open()
try:
self.log.debug("SSH command:\n%s" % command)
stdin, stdout, stderr = self.client.exec_command(command)
except:
self._open()
stdin, stdout, stderr = self.client.exec_command(command)
out = stdout.read()
self.log.debug("SSH received stdout:\n%s" % out)
ret = stdout.channel.recv_exit_status()
self.log.debug("SSH exit status: %s" % ret)
err = stderr.read()
self.log.debug("SSH received stderr:\n%s" % err)
if ret:
raise Exception("Gerrit error executing %s" % command)
return (out, err)