Make review processing compatible with Gerrit 2.9+

Review processor uses resume_sortkey query parameter for paging
and to store the last processed change request. Staring with Gerrit
2.9 this parameter is removed, also in that version there's no way to do
paging. Staring 2.10 Gerrit supports paging with --start param.

The code is made to be compatible with current Gerrit (2.8.x) and
modern versions. The only compatible way is to use 'age' query param
for paging.

NOTE: this patch introduces incompatible changes into data stored 
in runtime storage. Full re-load is required.

Change-Id: I315dbfdea35f3980dd088be46dd12cb22660ed82
Closes-Bug: #1503267
This commit is contained in:
Ilya Shakhat 2015-10-06 17:43:14 +03:00
parent 2d4d2fc610
commit 4912190783
3 changed files with 177 additions and 76 deletions

View File

@ -17,6 +17,7 @@ from oslo_config import cfg
from oslo_log import log as logging
import psutil
import six
import time
from stackalytics.processor import bps
from stackalytics.processor import config
@ -93,6 +94,7 @@ def _process_reviews(record_iterator, ci_map, module, branch):
def _process_repo(repo, runtime_storage_inst, record_processor_inst,
rcs_inst):
uri = repo['uri']
quoted_uri = six.moves.urllib.parse.quote_plus(uri)
LOG.info('Processing repo uri: %s', uri)
LOG.debug('Processing blueprints for repo uri: %s', uri)
@ -129,8 +131,7 @@ def _process_repo(repo, runtime_storage_inst, record_processor_inst,
for branch in branches:
LOG.debug('Processing commits in repo: %s, branch: %s', uri, branch)
vcs_key = 'vcs:' + str(six.moves.urllib.parse.quote_plus(uri) +
':' + branch)
vcs_key = 'vcs:%s:%s' % (quoted_uri, branch)
last_id = runtime_storage_inst.get_by_key(vcs_key)
commit_iterator = vcs_inst.log(branch, last_id)
@ -145,11 +146,11 @@ def _process_repo(repo, runtime_storage_inst, record_processor_inst,
LOG.debug('Processing reviews for repo: %s, branch: %s', uri, branch)
rcs_key = 'rcs:' + str(six.moves.urllib.parse.quote_plus(uri) +
':' + branch)
last_id = runtime_storage_inst.get_by_key(rcs_key)
rcs_key = 'rcs:%s:%s' % (quoted_uri, branch)
last_retrieval_time = runtime_storage_inst.get_by_key(rcs_key)
current_retrieval_time = int(time.time())
review_iterator = rcs_inst.log(repo, branch, last_id,
review_iterator = rcs_inst.log(repo, branch, last_retrieval_time,
grab_comments=('ci' in repo))
review_iterator_typed = _record_typer(review_iterator, 'review')
@ -162,8 +163,7 @@ def _process_repo(repo, runtime_storage_inst, record_processor_inst,
runtime_storage_inst.set_records(processed_review_iterator,
utils.merge_records)
last_id = rcs_inst.get_last_id(repo, branch)
runtime_storage_inst.set_by_key(rcs_key, last_id)
runtime_storage_inst.set_by_key(rcs_key, current_retrieval_time)
def _process_mail_list(uri, runtime_storage_inst, record_processor_inst):

View File

@ -18,7 +18,7 @@ import re
from oslo_log import log as logging
import paramiko
import time
LOG = logging.getLogger(__name__)
@ -38,12 +38,10 @@ class Rcs(object):
def get_project_list(self):
pass
def log(self, repo, branch, last_id):
def log(self, repo, branch, last_retrieval_time, status=None,
grab_comments=False):
return []
def get_last_id(self, repo, branch):
return -1
def close(self):
pass
@ -89,17 +87,15 @@ class Gerrit(Rcs):
LOG.exception(e)
return False
def _get_cmd(self, project_organization, module, branch, sort_key=None,
is_open=False, limit=PAGE_LIMIT, grab_comments=False):
def _get_cmd(self, project_organization, module, branch, age=0,
status=None, limit=PAGE_LIMIT, grab_comments=False):
cmd = ('gerrit query --all-approvals --patch-sets --format JSON '
'project:\'%(ogn)s/%(module)s\' branch:%(branch)s '
'limit:%(limit)s' %
'limit:%(limit)s age:%(age)ss' %
{'ogn': project_organization, 'module': module,
'branch': branch, 'limit': limit})
if is_open:
cmd += ' is:open'
if sort_key:
cmd += ' resume_sortkey:%016x' % sort_key
'branch': branch, 'limit': limit, 'age': age})
if status:
cmd += ' status:%s' % status
if grab_comments:
cmd += ' --comments'
return cmd
@ -123,37 +119,47 @@ class Gerrit(Rcs):
return False
def _poll_reviews(self, project_organization, module, branch,
start_id=0, last_id=0, is_open=False,
grab_comments=False):
sort_key = start_id
last_id = last_id or 0
last_retrieval_time, status=None, grab_comments=False):
age = 0
proceed = True
while True:
cmd = self._get_cmd(project_organization, module, branch, sort_key,
is_open, grab_comments=grab_comments)
# the algorithm retrieves reviews by age; the next page is started
# with the time of the oldest; it is possible that the oldest
# will be included in consequent result (as the age offsets to local
# machine timestamp, but evaluated remotely), so we need to track all
# ids and ignore those we've already seen
processed = set()
while proceed:
cmd = self._get_cmd(project_organization, module, branch,
age=age, status=status,
grab_comments=grab_comments)
LOG.debug('Executing command: %s', cmd)
exec_result = self._exec_command(cmd)
if not exec_result:
break
stdin, stdout, stderr = exec_result
proceed = False
proceed = False # assume there are no more reviews available
for line in stdout:
review = json.loads(line)
if 'sortKey' in review:
sort_key = int(review['sortKey'], 16)
if sort_key <= last_id:
if 'number' in review: # choose reviews not summary
if review['number'] in processed:
continue # already seen that
last_updated = int(review['lastUpdated'])
if last_updated < last_retrieval_time: # too old
proceed = False
break
proceed = True
proceed = True # have at least one review, can dig deeper
age = max(age, int(time.time()) - last_updated)
processed.add(review['number'])
review['module'] = module
yield review
if not proceed:
break
def get_project_list(self):
exec_result = self._exec_command('gerrit ls-projects')
if not exec_result:
@ -163,48 +169,16 @@ class Gerrit(Rcs):
return result
def log(self, repo, branch, last_id, grab_comments=False):
# poll new reviews from the top down to last_id
LOG.debug('Poll new reviews for module: %s', repo['module'])
for review in self._poll_reviews(repo['organization'],
repo['module'], branch,
last_id=last_id,
grab_comments=grab_comments):
def log(self, repo, branch, last_retrieval_time, status=None,
grab_comments=False):
# poll reviews down from top between last_r_t and current_r_t
LOG.debug('Poll reviews for module: %s', repo['module'])
for review in self._poll_reviews(
repo['organization'], repo['module'], branch,
last_retrieval_time, status=status,
grab_comments=grab_comments):
yield review
# poll open reviews from last_id down to bottom
LOG.debug('Poll open reviews for module: %s', repo['module'])
start_id = None
if last_id:
start_id = last_id + 1 # include the last review into query
for review in self._poll_reviews(repo['organization'],
repo['module'], branch,
start_id=start_id, is_open=True,
grab_comments=grab_comments):
yield review
def get_last_id(self, repo, branch):
LOG.debug('Get last id for module: %s', repo['module'])
cmd = self._get_cmd(repo['organization'], repo['module'],
branch, limit=1)
LOG.debug('Executing command: %s', cmd)
exec_result = self._exec_command(cmd)
if not exec_result:
return None
stdin, stdout, stderr = exec_result
last_id = None
for line in stdout:
review = json.loads(line)
if 'sortKey' in review:
last_id = int(review['sortKey'], 16)
break
LOG.debug('Module %(module)s last id is %(id)s',
{'module': repo['module'], 'id': last_id})
return last_id
def close(self):
self.client.close()

View File

@ -0,0 +1,127 @@
# Copyright (c) 2015 Mirantis Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import mock
import testtools
from stackalytics.processor import rcs
REVIEW_ONE = json.dumps(
{"project": "openstack/nova", "branch": "master", "topic": "bug/1494374",
"id": "Id741dfc769c02a5544691a7db49a7dbff6b11376", "number": "229382",
"subject": "method is nearly 400 LOC and should be broken up",
"createdOn": 1443613948, "lastUpdated": 1444222222,
"sortKey": "0038481b00038006", "open": True, "status": "NEW"})
REVIEW_END_LINE = json.dumps(
{"type": "stats", "rowCount": 2, "runTimeMilliseconds": 13})
class TestRcs(testtools.TestCase):
@mock.patch('paramiko.SSHClient')
def test_setup(self, mock_client_cons):
mock_client = mock.Mock()
mock_client_cons.return_value = mock_client
mock_connect = mock.Mock()
mock_client.connect = mock_connect
gerrit = rcs.Gerrit('gerrit://review.openstack.org')
setup_result = gerrit.setup(username='user', key_filename='key')
self.assertEqual(True, setup_result)
mock_connect.assert_called_once_with(
'review.openstack.org', port=rcs.DEFAULT_PORT, key_filename='key',
username='user')
@mock.patch('paramiko.SSHClient')
def test_setup_error(self, mock_client_cons):
mock_client = mock.Mock()
mock_client_cons.return_value = mock_client
mock_connect = mock.Mock()
mock_client.connect = mock_connect
mock_connect.side_effect = Exception
gerrit = rcs.Gerrit('gerrit://review.openstack.org')
setup_result = gerrit.setup(username='user', key_filename='key')
self.assertEqual(False, setup_result)
mock_connect.assert_called_once_with(
'review.openstack.org', port=rcs.DEFAULT_PORT, key_filename='key',
username='user')
@mock.patch('paramiko.SSHClient')
@mock.patch('time.time')
def test_log(self, mock_time, mock_client_cons):
mock_client = mock.Mock()
mock_client_cons.return_value = mock_client
mock_exec = mock.Mock()
mock_client.exec_command = mock_exec
mock_exec.side_effect = [
('', [REVIEW_ONE, REVIEW_END_LINE], ''), # one review and summary
('', [REVIEW_END_LINE], ''), # only summary = no more reviews
]
gerrit = rcs.Gerrit('uri')
repo = dict(organization='openstack', module='nova')
branch = 'master'
last_retrieval_time = 1444000000
mock_time.return_value = 1444333333
records = list(gerrit.log(repo, branch, last_retrieval_time))
self.assertEqual(1, len(records))
self.assertEqual('229382', records[0]['number'])
mock_client.exec_command.assert_has_calls([
mock.call('gerrit query --all-approvals --patch-sets '
'--format JSON project:\'openstack/nova\' branch:master '
'limit:100 age:0s'),
mock.call('gerrit query --all-approvals --patch-sets '
'--format JSON project:\'openstack/nova\' branch:master '
'limit:100 age:111111s'),
])
@mock.patch('paramiko.SSHClient')
def test_log_old_reviews(self, mock_client_cons):
mock_client = mock.Mock()
mock_client_cons.return_value = mock_client
mock_exec = mock.Mock()
mock_client.exec_command = mock_exec
mock_exec.side_effect = [
('', [REVIEW_ONE, REVIEW_END_LINE], ''), # one review and summary
('', [REVIEW_END_LINE], ''), # only summary = no more reviews
]
gerrit = rcs.Gerrit('uri')
repo = dict(organization='openstack', module='nova')
branch = 'master'
last_retrieval_time = 1445000000
records = list(gerrit.log(repo, branch, last_retrieval_time,
status='merged', grab_comments=True))
self.assertEqual(0, len(records))
mock_client.exec_command.assert_has_calls([
mock.call('gerrit query --all-approvals --patch-sets '
'--format JSON project:\'openstack/nova\' branch:master '
'limit:100 age:0s status:merged --comments'),
])