Send request and response data to Influxdb gzip encoded

Depends on https://review.openstack.org/#/c/171285/

Change-Id: Ic85591ac6dfa10992e96ba974eb9b6fd4911a0d9
This commit is contained in:
Deklan Dieterly 2015-04-06 15:47:28 -06:00
parent f51ef85404
commit 0f233cd76b

View File

@ -22,18 +22,29 @@ import com.google.inject.Inject;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.codec.binary.Base64;
import org.apache.http.Header;
import org.apache.http.HeaderElement;
import org.apache.http.HttpEntity;
import org.apache.http.HttpException;
import org.apache.http.HttpRequest;
import org.apache.http.HttpRequestInterceptor;
import org.apache.http.HttpResponse;
import org.apache.http.HttpResponseInterceptor;
import org.apache.http.HttpStatus;
import org.apache.http.client.entity.EntityBuilder;
import org.apache.http.client.entity.GzipDecompressingEntity;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.ContentType;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
import org.apache.http.protocol.HttpContext;
import org.apache.http.util.EntityUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.HashMap;
import monasca.persister.configuration.PersisterConfig;
@ -59,7 +70,6 @@ public class InfluxV9RepoWriter {
private final ObjectMapper objectMapper = new ObjectMapper();
@Inject
public InfluxV9RepoWriter(final PersisterConfig config) {
@ -77,7 +87,44 @@ public class InfluxV9RepoWriter {
PoolingHttpClientConnectionManager cm = new PoolingHttpClientConnectionManager();
cm.setMaxTotal(config.getInfluxDBConfiguration().getMaxHttpConnections());
this.httpClient = HttpClients.custom().setConnectionManager(cm).build();
if (config.getInfluxDBConfiguration().getGzip()) {
this.httpClient =
HttpClients.custom().setConnectionManager(cm)
.addInterceptorFirst(new HttpRequestInterceptor() {
public void process(final HttpRequest request, final HttpContext context)
throws HttpException, IOException {
if (!request.containsHeader("Accept-Encoding")) {
request.addHeader("Accept-Encoding", "gzip");
}
}
}).addInterceptorFirst(new HttpResponseInterceptor() {
public void process(final HttpResponse response, final HttpContext context)
throws HttpException, IOException {
HttpEntity entity = response.getEntity();
if (entity != null) {
Header ceheader = entity.getContentEncoding();
if (ceheader != null) {
HeaderElement[] codecs = ceheader.getElements();
for (int i = 0; i < codecs.length; i++) {
if (codecs[i].getName().equalsIgnoreCase("gzip")) {
response.setEntity(new GzipDecompressingEntity(response.getEntity()));
return;
}
}
}
}
}
}).build();
} else {
this.httpClient = HttpClients.custom().setConnectionManager(cm).build();
}
}
@ -85,20 +132,39 @@ public class InfluxV9RepoWriter {
HttpPost request = new HttpPost(this.influxUrl);
request.addHeader("content-type", "application/json");
request.addHeader("Content-Type", "application/json");
request.addHeader("Authorization", this.baseAuthHeader);
InfluxWrite
influxWrite =
new InfluxWrite(this.influxName, this.influxRetentionPolicy, influxPointArry,
new HashMap());
StringEntity params = new StringEntity(this.objectMapper.writeValueAsString(influxWrite));
request.setEntity(params);
String json = this.objectMapper.writeValueAsString(influxWrite);
if (this.config.getInfluxDBConfiguration().getGzip()) {
HttpEntity
requestEntity =
EntityBuilder.create().setText(json).setContentType(ContentType.APPLICATION_JSON)
.gzipCompress().build();
request.setEntity(requestEntity);
request.addHeader("Content-Encoding", "gzip");
} else {
StringEntity stringEntity = new StringEntity(json);
request.setEntity(stringEntity);
}
try {
logger.debug("Writing {} points to influxdb database {} at {}",
influxPointArry.length, this.influxName, this.influxUrl);
logger.debug("Writing {} points to influxdb database {} at {}", influxPointArry.length,
this.influxName, this.influxUrl);
HttpResponse response = this.httpClient.execute(request);
@ -106,17 +172,18 @@ public class InfluxV9RepoWriter {
if (rc != HttpStatus.SC_OK) {
HttpEntity entity = response.getEntity();
String responseString = EntityUtils.toString(entity, "UTF-8");
logger.error("Failed to write data to influx database {} at {}: {}",
this.influxName, this.influxUrl, String.valueOf(rc));
HttpEntity responseEntity = response.getEntity();
String responseString = EntityUtils.toString(responseEntity, "UTF-8");
logger.error("Failed to write data to influx database {} at {}: {}", this.influxName,
this.influxUrl, String.valueOf(rc));
logger.error("Http response: {}", responseString);
throw new Exception(rc + ":" + responseString);
}
logger.debug("Successfully wrote {} points to influx database {} at {}",
influxPointArry.length, this.influxName, this.influxUrl);
logger
.debug("Successfully wrote {} points to influx database {} at {}", influxPointArry.length,
this.influxName, this.influxUrl);
} finally {