From 110b7c37d4e1faf39f4f328ac9d0730db6c7400d Mon Sep 17 00:00:00 2001 From: "James E. Blair" Date: Sat, 11 Apr 2015 20:44:14 -0400 Subject: [PATCH] Batch sync change by commit tasks We can end up with a large number of sequential sync change by commit tasks which result in a series of simple gerrit queries each with their own HTTP request. Instead, combine them into batches of 100 commits that are queried in one request. Change-Id: I2437019c47758134d85399b8183a9c6d70fd45b1 --- gertty/sync.py | 50 +++++++++++++++++++++++++++++++++++++++++--------- 1 file changed, 41 insertions(+), 9 deletions(-) diff --git a/gertty/sync.py b/gertty/sync.py index 0ee573c..be4f85f 100644 --- a/gertty/sync.py +++ b/gertty/sync.py @@ -85,6 +85,17 @@ class MultiQueue(object): finally: self.condition.release() + def find(self, klass, priority): + results = [] + self.condition.acquire() + try: + for item in self.queues[priority]: + if isinstance(item, klass): + results.append(item) + finally: + self.condition.release() + return results + class UpdateEvent(object): def updateRelatedChanges(self, session, change): @@ -440,27 +451,36 @@ class SetSyncQueryUpdatedTask(Task): sync_query = session.getSyncQueryByName(self.query_name) sync_query.updated = self.updated -class SyncChangeByCommitTask(Task): - def __init__(self, commit, priority=NORMAL_PRIORITY): - super(SyncChangeByCommitTask, self).__init__(priority) - self.commit = commit +class SyncChangesByCommitsTask(Task): + def __init__(self, commits, priority=NORMAL_PRIORITY): + super(SyncChangesByCommitsTask, self).__init__(priority) + self.commits = commits def __repr__(self): - return '' % (self.commit,) + return '' % (self.commits,) def __eq__(self, other): if (other.__class__ == self.__class__ and - other.commit == self.commit): + other.commits == self.commits): return True return False def run(self, sync): - query = 'commit:%s' % self.commit + query = ' OR '.join(['commit:%s' % x for x in self.commits]) changes = sync.get('changes/?q=%s' % query) self.log.debug('Query: %s ' % (query,)) for c in changes: sync.submitTask(SyncChangeTask(c['id'], priority=self.priority)) - self.log.debug("Sync change %s for its commit %s" % (c['id'], self.commit)) + self.log.debug("Sync change %s for its commit" % (c['id'],)) + + def addCommit(self, commit): + if commit in self.commits: + return True + # 100 should be under the URL length limit + if len(self.commits) >= 100: + return False + self.commits.append(commit) + return True class SyncChangeByNumberTask(Task): def __init__(self, number, priority=NORMAL_PRIORITY): @@ -592,7 +612,7 @@ class SyncChangeTask(Task): if revision.parent not in parent_commits: parent_revision = session.getRevisionByCommit(revision.parent) if not parent_revision and change.status not in CLOSED_STATUSES: - sync.submitTask(SyncChangeByCommitTask(revision.parent, self.priority)) + sync._syncChangeByCommit(revision.parent, self.priority) self.log.debug("Change %s revision %s needs parent commit %s synced" % (change.id, remote_revision['_number'], revision.parent)) parent_commits.add(revision.parent) @@ -1264,3 +1284,15 @@ class Sync(object): if task.wait(): for subtask in task.tasks: subtask.wait() + + def _syncChangeByCommit(self, commit, priority): + # Accumulate sync change by commit tasks because they often + # come in batches. This method assumes it is being called + # from within the run queue already and therefore does not + # need to worry about locking the queue. + task = None + for task in self.queue.find(SyncChangesByCommitsTask, priority): + if task.addCommit(commit): + return + task = SyncChangesByCommitsTask([commit], priority) + self.submitTask(task)