Query projects in batches

Gerrit supports up to 10 queries in a single request.  Do the project
sync queries in batches of 10 to reduce network traffic.

Change-Id: I9aecbb91c1898d436fc54d21d1910badaddfef3b
This commit is contained in:
James E. Blair 2014-09-02 19:38:03 -07:00
parent c664c4e039
commit 040814221f

View File

@ -84,6 +84,7 @@ class Task(object):
self.priority = priority
self.succeeded = None
self.event = threading.Event()
self.tasks = []
def complete(self, success):
self.succeeded = success
@ -177,43 +178,56 @@ class SyncSubscribedProjectsTask(Task):
def run(self, sync):
app = sync.app
with app.db.getSession() as session:
for p in session.getProjects(subscribed=True):
sync.submitTask(SyncProjectTask(p.key, self.priority))
keys = [p.key for p in session.getProjects(subscribed=True)]
for i in range(0, len(keys), 10):
t = SyncProjectTask(keys[i:i+10], self.priority)
self.tasks.append(t)
sync.submitTask(t)
class SyncProjectTask(Task):
_closed_statuses = ['MERGED', 'ABANDONED']
def __init__(self, project_key, priority=NORMAL_PRIORITY):
def __init__(self, project_keys, priority=NORMAL_PRIORITY):
super(SyncProjectTask, self).__init__(priority)
self.project_key = project_key
if type(project_keys) == int:
project_keys = [project_keys]
self.project_keys = project_keys
def __repr__(self):
return '<SyncProjectTask %s>' % (self.project_key,)
return '<SyncProjectTask %s>' % (self.project_keys,)
def run(self, sync):
app = sync.app
now = datetime.datetime.utcnow()
queries = []
with app.db.getSession() as session:
project = session.getProject(self.project_key)
query = 'project:%s' % project.name
if project.updated:
# Allow 4 seconds for request time, etc.
query += ' -age:%ss' % (int(math.ceil((now-project.updated).total_seconds())) + 4,)
else:
query += ' status:open'
for project_key in self.project_keys:
project = session.getProject(project_key)
query = 'q=project:%s' % project.name
if project.updated:
# Allow 4 seconds for request time, etc.
query += ' -age:%ss' % (int(math.ceil((now-project.updated).total_seconds())) + 4,)
else:
query += ' status:open'
queries.append(query)
changes = []
sortkey = ''
while True:
done = False
while not done:
query = '&'.join(queries)
# We don't actually want to limit to 500, but that's the server-side default, and
# if we don't specify this, we won't get a _more_changes flag.
q = 'changes/?n=500%s&q=%s' % (sortkey, query,)
q = 'changes/?n=500%s&%s' % (sortkey, query)
self.log.debug('Query: %s ' % (q,))
batch = sync.get(q)
changes += batch
if batch and '_more_changes' in batch[-1]:
sortkey = '&N=%s' % (batch[-1]['_sortkey'],)
else:
break
responses = sync.get(q)
if len(queries) == 1:
responses = [responses]
done = True
for batch in responses:
changes += batch
if batch and '_more_changes' in batch[-1]:
sortkey = '&N=%s' % (batch[-1]['_sortkey'],)
done = False
with app.db.getSession() as session:
for c in changes:
# For now, just sync open changes or changes already
@ -222,7 +236,8 @@ class SyncProjectTask(Task):
if change or (c['status'] not in self._closed_statuses):
sync.submitTask(SyncChangeTask(c['id'], priority=self.priority))
self.log.debug("Change %s update %s" % (c['id'], c['updated']))
sync.submitTask(SetProjectUpdatedTask(self.project_key, now, priority=self.priority))
for key in self.project_keys:
sync.submitTask(SetProjectUpdatedTask(key, now, priority=self.priority))
class SetProjectUpdatedTask(Task):
def __init__(self, project_key, updated, priority=NORMAL_PRIORITY):
@ -262,7 +277,6 @@ class SyncChangeByNumberTask(Task):
def __init__(self, number, priority=NORMAL_PRIORITY):
super(SyncChangeByNumberTask, self).__init__(priority)
self.number = number
self.tasks = []
def __repr__(self):
return '<SyncChangeByNumberTask %s>' % (self.number,)
@ -850,11 +864,8 @@ class Sync(object):
self.log.debug('Received: %s' % (r.text,))
def syncSubscribedProjects(self):
keys = []
with self.app.db.getSession() as session:
for p in session.getProjects(subscribed=True):
keys.append(p.key)
for key in keys:
t = SyncProjectTask(key, LOW_PRIORITY)
self.submitTask(t)
t.wait()
task = SyncSubscribedProjectsTask(LOW_PRIORITY)
self.submitTask(task)
task.wait()
for subtask in task.tasks:
subtask.wait()