Merge "Python persister batches data points with common name"
This commit is contained in:
commit
a326e04ef0
@ -212,7 +212,8 @@ class AbstractPersister(threading.Thread):
|
||||
self._database_batch_size = kafka_conf.database_batch_size
|
||||
self._kafka_topic = kafka_conf.topic
|
||||
|
||||
self._json_body = []
|
||||
self._message_count = 0
|
||||
self._data_points = {}
|
||||
self._last_flush = datetime.now()
|
||||
self._last_partition_check = datetime.now()
|
||||
|
||||
@ -222,12 +223,19 @@ class AbstractPersister(threading.Thread):
|
||||
|
||||
def _flush(self, partitions):
|
||||
|
||||
if self._json_body:
|
||||
self._influxdb_client.write_points(self._json_body)
|
||||
if self._data_points:
|
||||
try:
|
||||
self._influxdb_client.write_points(self._data_points.values())
|
||||
except Exception:
|
||||
log.exception("Error writing to influxdb: {}"
|
||||
.format(self._data_points.values()))
|
||||
raise
|
||||
|
||||
self._consumer.commit(partitions=partitions)
|
||||
LOG.info("Processed {} messages from topic '{}'".format(
|
||||
len(self._json_body), self._kafka_topic))
|
||||
self._json_body = []
|
||||
self._message_count, self._kafka_topic))
|
||||
self._data_points = {}
|
||||
self._message_count = 0
|
||||
self._last_flush = datetime.now()
|
||||
|
||||
def _is_time_for_repartition_check(self):
|
||||
@ -238,7 +246,6 @@ class AbstractPersister(threading.Thread):
|
||||
self._partition_interval_recheck_secs)
|
||||
|
||||
def _process_messages(self, partitions):
|
||||
|
||||
while 1:
|
||||
|
||||
if self._is_time_for_repartition_check():
|
||||
@ -252,7 +259,17 @@ class AbstractPersister(threading.Thread):
|
||||
|
||||
try:
|
||||
|
||||
self._json_body.append(self.process_message(message))
|
||||
data_point = self.process_message(message)
|
||||
|
||||
key = data_point['name']
|
||||
|
||||
if key in self._data_points:
|
||||
points = data_point['points']
|
||||
self._data_points[key]['points'].extend(points)
|
||||
else:
|
||||
self._data_points[key] = data_point
|
||||
|
||||
self._message_count += 1
|
||||
|
||||
if self._is_time_for_repartition_check():
|
||||
return
|
||||
@ -261,7 +278,7 @@ class AbstractPersister(threading.Thread):
|
||||
LOG.exception('Error processing message. Message is '
|
||||
'being dropped. {}'.format(message))
|
||||
|
||||
if len(self._json_body) >= self._database_batch_size:
|
||||
if self._message_count >= self._database_batch_size:
|
||||
self._flush(partitions)
|
||||
|
||||
def _get_set_partitioner(self):
|
||||
|
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Loading…
x
Reference in New Issue
Block a user