diff --git a/java/src/main/java/monasca/persister/repository/InfluxV9RepoWriter.java b/java/src/main/java/monasca/persister/repository/InfluxV9RepoWriter.java index cc89ee99..ca37d102 100644 --- a/java/src/main/java/monasca/persister/repository/InfluxV9RepoWriter.java +++ b/java/src/main/java/monasca/persister/repository/InfluxV9RepoWriter.java @@ -22,18 +22,29 @@ import com.google.inject.Inject; import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.commons.codec.binary.Base64; +import org.apache.http.Header; +import org.apache.http.HeaderElement; import org.apache.http.HttpEntity; +import org.apache.http.HttpException; +import org.apache.http.HttpRequest; +import org.apache.http.HttpRequestInterceptor; import org.apache.http.HttpResponse; +import org.apache.http.HttpResponseInterceptor; import org.apache.http.HttpStatus; +import org.apache.http.client.entity.EntityBuilder; +import org.apache.http.client.entity.GzipDecompressingEntity; import org.apache.http.client.methods.HttpPost; +import org.apache.http.entity.ContentType; import org.apache.http.entity.StringEntity; import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.client.HttpClients; import org.apache.http.impl.conn.PoolingHttpClientConnectionManager; +import org.apache.http.protocol.HttpContext; import org.apache.http.util.EntityUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; import java.util.HashMap; import monasca.persister.configuration.PersisterConfig; @@ -59,7 +70,6 @@ public class InfluxV9RepoWriter { private final ObjectMapper objectMapper = new ObjectMapper(); - @Inject public InfluxV9RepoWriter(final PersisterConfig config) { @@ -77,7 +87,44 @@ public class InfluxV9RepoWriter { PoolingHttpClientConnectionManager cm = new PoolingHttpClientConnectionManager(); cm.setMaxTotal(config.getInfluxDBConfiguration().getMaxHttpConnections()); - this.httpClient = HttpClients.custom().setConnectionManager(cm).build(); + if (config.getInfluxDBConfiguration().getGzip()) { + + this.httpClient = + HttpClients.custom().setConnectionManager(cm) + .addInterceptorFirst(new HttpRequestInterceptor() { + + public void process(final HttpRequest request, final HttpContext context) + throws HttpException, IOException { + if (!request.containsHeader("Accept-Encoding")) { + request.addHeader("Accept-Encoding", "gzip"); + } + } + }).addInterceptorFirst(new HttpResponseInterceptor() { + + public void process(final HttpResponse response, final HttpContext context) + throws HttpException, IOException { + HttpEntity entity = response.getEntity(); + if (entity != null) { + Header ceheader = entity.getContentEncoding(); + if (ceheader != null) { + HeaderElement[] codecs = ceheader.getElements(); + for (int i = 0; i < codecs.length; i++) { + if (codecs[i].getName().equalsIgnoreCase("gzip")) { + response.setEntity(new GzipDecompressingEntity(response.getEntity())); + return; + } + } + } + } + } + + }).build(); + + } else { + + this.httpClient = HttpClients.custom().setConnectionManager(cm).build(); + + } } @@ -85,20 +132,39 @@ public class InfluxV9RepoWriter { HttpPost request = new HttpPost(this.influxUrl); - request.addHeader("content-type", "application/json"); + request.addHeader("Content-Type", "application/json"); request.addHeader("Authorization", this.baseAuthHeader); InfluxWrite influxWrite = new InfluxWrite(this.influxName, this.influxRetentionPolicy, influxPointArry, new HashMap()); - StringEntity params = new StringEntity(this.objectMapper.writeValueAsString(influxWrite)); - request.setEntity(params); + + String json = this.objectMapper.writeValueAsString(influxWrite); + + if (this.config.getInfluxDBConfiguration().getGzip()) { + + HttpEntity + requestEntity = + EntityBuilder.create().setText(json).setContentType(ContentType.APPLICATION_JSON) + .gzipCompress().build(); + + request.setEntity(requestEntity); + + request.addHeader("Content-Encoding", "gzip"); + + } else { + + StringEntity stringEntity = new StringEntity(json); + + request.setEntity(stringEntity); + + } try { - logger.debug("Writing {} points to influxdb database {} at {}", - influxPointArry.length, this.influxName, this.influxUrl); + logger.debug("Writing {} points to influxdb database {} at {}", influxPointArry.length, + this.influxName, this.influxUrl); HttpResponse response = this.httpClient.execute(request); @@ -106,17 +172,18 @@ public class InfluxV9RepoWriter { if (rc != HttpStatus.SC_OK) { - HttpEntity entity = response.getEntity(); - String responseString = EntityUtils.toString(entity, "UTF-8"); - logger.error("Failed to write data to influx database {} at {}: {}", - this.influxName, this.influxUrl, String.valueOf(rc)); + HttpEntity responseEntity = response.getEntity(); + String responseString = EntityUtils.toString(responseEntity, "UTF-8"); + logger.error("Failed to write data to influx database {} at {}: {}", this.influxName, + this.influxUrl, String.valueOf(rc)); logger.error("Http response: {}", responseString); throw new Exception(rc + ":" + responseString); } - logger.debug("Successfully wrote {} points to influx database {} at {}", - influxPointArry.length, this.influxName, this.influxUrl); + logger + .debug("Successfully wrote {} points to influx database {} at {}", influxPointArry.length, + this.influxName, this.influxUrl); } finally {