Fix off-by-one in sync counter
The sync counter up to this point has not counted the currently running sync task. Keep track of that in a thread-safe way so the counter is accurate and the user can actually know if the system is fully synced. Change-Id: I22a1c5d15c46f3e1785c439e72f8a9e5503d6935
This commit is contained in:
parent
9ae126da1e
commit
714a8c8986
@ -56,12 +56,17 @@ class MultiQueue(object):
|
||||
for key in priorities:
|
||||
self.queues[key] = collections.deque()
|
||||
self.condition = threading.Condition()
|
||||
self.incomplete = []
|
||||
|
||||
def qsize(self):
|
||||
count = 0
|
||||
for queue in self.queues.values():
|
||||
count += len(queue)
|
||||
return count
|
||||
self.condition.acquire()
|
||||
try:
|
||||
for queue in self.queues.values():
|
||||
count += len(queue)
|
||||
return count + len(self.incomplete)
|
||||
finally:
|
||||
self.condition.release()
|
||||
|
||||
def put(self, item, priority):
|
||||
added = False
|
||||
@ -82,6 +87,7 @@ class MultiQueue(object):
|
||||
for queue in self.queues.values():
|
||||
try:
|
||||
ret = queue.popleft()
|
||||
self.incomplete.append(ret)
|
||||
return ret
|
||||
except IndexError:
|
||||
pass
|
||||
@ -100,6 +106,14 @@ class MultiQueue(object):
|
||||
self.condition.release()
|
||||
return results
|
||||
|
||||
def complete(self, item):
|
||||
self.condition.acquire()
|
||||
try:
|
||||
if item in self.incomplete:
|
||||
self.incomplete.remove(item)
|
||||
finally:
|
||||
self.condition.release()
|
||||
|
||||
|
||||
class UpdateEvent(object):
|
||||
def updateRelatedChanges(self, session, change):
|
||||
@ -1335,6 +1349,7 @@ class Sync(object):
|
||||
try:
|
||||
task.run(self)
|
||||
task.complete(True)
|
||||
self.queue.complete(task)
|
||||
except requests.ConnectionError as e:
|
||||
self.log.warning("Offline due to: %s" % (e,))
|
||||
if not self.offline:
|
||||
@ -1347,6 +1362,7 @@ class Sync(object):
|
||||
return task
|
||||
except Exception:
|
||||
task.complete(False)
|
||||
self.queue.complete(task)
|
||||
self.log.exception('Exception running task %s' % (task,))
|
||||
self.app.status.update(error=True, refresh=False)
|
||||
self.offline = False
|
||||
|
Loading…
x
Reference in New Issue
Block a user