Skip to content

Commit

Permalink
feat: adding configuration to provide exclusion for failOnError with …
Browse files Browse the repository at this point in the history
…http post processor (#21)

* feat: adding configuration to provide exclusion for failOnError with http post processor

* fix(dagger-core): renamed exclusion function and added printStackTrace

* fix: move failOnErrorsExclusionSet out-side of response handler

* fix: set failOnErrorsExclusionSet from open

---------

Co-authored-by: Mayank Rai <[email protected]>
  • Loading branch information
mayankrai09 and mayankrai09 authored Aug 16, 2023
1 parent b799435 commit 11fc8e1
Show file tree
Hide file tree
Showing 14 changed files with 332 additions and 116 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
import com.gotocompany.dagger.common.metrics.managers.MeterStatsManager;
import com.gotocompany.dagger.core.processors.external.AsyncConnector;
import com.gotocompany.dagger.core.processors.external.ExternalMetricConfig;
import io.netty.util.internal.StringUtil;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.types.Row;

Expand All @@ -20,6 +22,13 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import static org.asynchttpclient.Dsl.asyncHttpClient;
import static org.asynchttpclient.Dsl.config;

Expand All @@ -31,6 +40,7 @@ public class HttpAsyncConnector extends AsyncConnector {
private static final Logger LOGGER = LoggerFactory.getLogger(HttpAsyncConnector.class.getName());
private AsyncHttpClient httpClient;
private HttpSourceConfig httpSourceConfig;
private Set<Integer> failOnErrorsExclusionSet;

/**
* Instantiates a new Http async connector with specified http client.
Expand Down Expand Up @@ -80,6 +90,12 @@ protected void createClient() {
}
}

@Override
public void open(Configuration configuration) throws Exception {
super.open(configuration);
setFailOnErrorsExclusionSet(httpSourceConfig.getExcludeFailOnErrorsCodeRange());
}

@Override
public void close() throws Exception {
httpClient.close();
Expand All @@ -102,9 +118,8 @@ protected void process(Row input, ResultFuture<Row> resultFuture) {
if (getEndpointHandler().isQueryInvalid(resultFuture, rowManager, httpSourceConfig.getRequestVariables(), requestVariablesValues) || getEndpointHandler().isQueryInvalid(resultFuture, rowManager, httpSourceConfig.getHeaderVariables(), dynamicHeaderVariablesValues)) {
return;
}

BoundRequestBuilder request = HttpRequestFactory.createRequest(httpSourceConfig, httpClient, requestVariablesValues, dynamicHeaderVariablesValues, endpointVariablesValues);
HttpResponseHandler httpResponseHandler = new HttpResponseHandler(httpSourceConfig, getMeterStatsManager(),
HttpResponseHandler httpResponseHandler = new HttpResponseHandler(httpSourceConfig, getFailOnErrorsExclusionSet(), getMeterStatsManager(),
rowManager, getColumnNameManager(), getOutputDescriptor(resultFuture), resultFuture, getErrorReporter(), new PostResponseTelemetry());
httpResponseHandler.startTimer();
request.execute(httpResponseHandler);
Expand All @@ -114,4 +129,19 @@ protected void process(Row input, ResultFuture<Row> resultFuture) {
}

}

protected Set<Integer> getFailOnErrorsExclusionSet() {
return failOnErrorsExclusionSet;
}

private void setFailOnErrorsExclusionSet(String excludeFailOnErrorsCodeRange) {
failOnErrorsExclusionSet = new HashSet<Integer>();
if (!StringUtil.isNullOrEmpty(excludeFailOnErrorsCodeRange)) {
String[] ranges = excludeFailOnErrorsCodeRange.split(",");
Arrays.stream(ranges).forEach(range -> {
List<Integer> rangeList = Arrays.stream(range.split("-")).map(Integer::parseInt).collect(Collectors.toList());
IntStream.rangeClosed(rangeList.get(0), rangeList.get(rangeList.size() - 1)).forEach(statusCode -> failOnErrorsExclusionSet.add(statusCode));
});
}
}
}
Original file line number Diff line number Diff line change
@@ -1,30 +1,30 @@
package com.gotocompany.dagger.core.processors.external.http;

import com.google.protobuf.Descriptors;
import com.gotocompany.dagger.common.metrics.managers.MeterStatsManager;
import com.gotocompany.dagger.common.serde.typehandler.TypeHandler;
import com.gotocompany.dagger.common.serde.typehandler.TypeHandlerFactory;
import com.gotocompany.dagger.core.exception.HttpFailureException;
import com.gotocompany.dagger.core.metrics.aspects.ExternalSourceAspects;
import com.gotocompany.dagger.core.metrics.reporters.ErrorReporter;
import com.gotocompany.dagger.core.processors.ColumnNameManager;
import com.gotocompany.dagger.core.processors.common.OutputMapping;
import com.gotocompany.dagger.core.processors.common.PostResponseTelemetry;
import com.gotocompany.dagger.core.processors.common.RowManager;
import com.gotocompany.dagger.common.metrics.managers.MeterStatsManager;
import com.gotocompany.dagger.common.serde.typehandler.TypeHandler;
import com.gotocompany.dagger.common.serde.typehandler.TypeHandlerFactory;
import com.google.protobuf.Descriptors;
import com.jayway.jsonpath.JsonPath;
import com.jayway.jsonpath.PathNotFoundException;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.types.Row;

import org.asynchttpclient.AsyncCompletionHandler;
import org.asynchttpclient.Response;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Map;
import java.util.Collections;
import java.util.Set;
import java.util.regex.Pattern;

/**
Expand All @@ -39,6 +39,7 @@ public class HttpResponseHandler extends AsyncCompletionHandler<Object> {
private Descriptors.Descriptor descriptor;
private ResultFuture<Row> resultFuture;
private HttpSourceConfig httpSourceConfig;
private Set<Integer> failOnErrorsExclusionSet;
private MeterStatsManager meterStatsManager;
private Instant startTime;
private ErrorReporter errorReporter;
Expand All @@ -48,20 +49,22 @@ public class HttpResponseHandler extends AsyncCompletionHandler<Object> {
/**
* Instantiates a new Http response handler.
*
* @param httpSourceConfig the http source config
* @param meterStatsManager the meter stats manager
* @param rowManager the row manager
* @param columnNameManager the column name manager
* @param descriptor the descriptor
* @param resultFuture the result future
* @param errorReporter the error reporter
* @param postResponseTelemetry the post response telemetry
* @param httpSourceConfig the http source config
* @param failOnErrorsExclusionSet the fail on error exclusion set
* @param meterStatsManager the meter stats manager
* @param rowManager the row manager
* @param columnNameManager the column name manager
* @param descriptor the descriptor
* @param resultFuture the result future
* @param errorReporter the error reporter
* @param postResponseTelemetry the post response telemetry
*/
public HttpResponseHandler(HttpSourceConfig httpSourceConfig, MeterStatsManager meterStatsManager, RowManager rowManager,
public HttpResponseHandler(HttpSourceConfig httpSourceConfig, Set<Integer> failOnErrorsExclusionSet, MeterStatsManager meterStatsManager, RowManager rowManager,
ColumnNameManager columnNameManager, Descriptors.Descriptor descriptor, ResultFuture<Row> resultFuture,
ErrorReporter errorReporter, PostResponseTelemetry postResponseTelemetry) {

this.httpSourceConfig = httpSourceConfig;
this.failOnErrorsExclusionSet = failOnErrorsExclusionSet;
this.meterStatsManager = meterStatsManager;
this.rowManager = rowManager;
this.columnNameManager = columnNameManager;
Expand All @@ -86,15 +89,16 @@ public Object onCompleted(Response response) {
successHandler(response);
} else {
postResponseTelemetry.validateResponseCode(meterStatsManager, statusCode);
failureHandler("Received status code : " + statusCode);
failureHandler("Received status code : " + statusCode, statusCode);
}
return response;
}

@Override
public void onThrowable(Throwable t) {
t.printStackTrace();
meterStatsManager.markEvent(ExternalSourceAspects.OTHER_ERRORS);
failureHandler(t.getMessage());
failureHandler(t.getMessage(), 0);
}

private void successHandler(Response response) {
Expand Down Expand Up @@ -123,19 +127,27 @@ private void successHandler(Response response) {
* Failure handler.
*
* @param logMessage the log message
* @param statusCode the status code
*/
public void failureHandler(String logMessage) {
public void failureHandler(String logMessage, Integer statusCode) {
postResponseTelemetry.sendFailureTelemetry(meterStatsManager, startTime);
LOGGER.error(logMessage);
Exception httpFailureException = new HttpFailureException(logMessage);
if (httpSourceConfig.isFailOnErrors()) {
if (shouldFailOnError(statusCode)) {
reportAndThrowError(httpFailureException);
} else {
errorReporter.reportNonFatalException(httpFailureException);
}
resultFuture.complete(Collections.singleton(rowManager.getAll()));
}

private boolean shouldFailOnError(Integer statusCode) {
if (httpSourceConfig.isFailOnErrors() && (statusCode == 0 || !failOnErrorsExclusionSet.contains(statusCode))) {
return true;
}
return false;
}

private void setField(String key, Object value, int fieldIndex) {
if (!httpSourceConfig.isRetainResponseType() || httpSourceConfig.hasType()) {
setFieldUsingType(key, value, fieldIndex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ public class HttpSourceConfig implements Serializable, SourceConfig {
private String streamTimeout;
private String connectTimeout;
private boolean failOnErrors;
private String excludeFailOnErrorsCodeRange;
@SerializedName(value = "type", alternate = {"Type", "TYPE"})
private String type;
private String capacity;
Expand All @@ -49,14 +50,15 @@ public class HttpSourceConfig implements Serializable, SourceConfig {
* @param streamTimeout the stream timeout
* @param connectTimeout the connect timeout
* @param failOnErrors the fail on errors
* @param excludeFailOnErrorsCodeRange the exclude fail on errors code range
* @param type the type
* @param capacity the capacity
* @param headers the static headers
* @param outputMapping the output mapping
* @param metricId the metric id
* @param retainResponseType the retain response type
*/
public HttpSourceConfig(String endpoint, String endpointVariables, String verb, String requestPattern, String requestVariables, String headerPattern, String headerVariables, String streamTimeout, String connectTimeout, boolean failOnErrors, String type, String capacity, Map<String, String> headers, Map<String, OutputMapping> outputMapping, String metricId, boolean retainResponseType) {
public HttpSourceConfig(String endpoint, String endpointVariables, String verb, String requestPattern, String requestVariables, String headerPattern, String headerVariables, String streamTimeout, String connectTimeout, boolean failOnErrors, String excludeFailOnErrorsCodeRange, String type, String capacity, Map<String, String> headers, Map<String, OutputMapping> outputMapping, String metricId, boolean retainResponseType) {
this.endpoint = endpoint;
this.endpointVariables = endpointVariables;
this.verb = verb;
Expand All @@ -67,6 +69,7 @@ public HttpSourceConfig(String endpoint, String endpointVariables, String verb,
this.streamTimeout = streamTimeout;
this.connectTimeout = connectTimeout;
this.failOnErrors = failOnErrors;
this.excludeFailOnErrorsCodeRange = excludeFailOnErrorsCodeRange;
this.type = type;
this.capacity = capacity;
this.headers = headers;
Expand Down Expand Up @@ -162,6 +165,16 @@ public boolean isFailOnErrors() {
return failOnErrors;
}

/**
* Gets failOnErrorsCodeRange Variable.
*
* @return the failOnErrorsCodeRange Variable
*/
public String getExcludeFailOnErrorsCodeRange() {
return excludeFailOnErrorsCodeRange;
}


@Override
public String getMetricId() {
return metricId;
Expand Down Expand Up @@ -245,11 +258,11 @@ public boolean equals(Object o) {
return false;
}
HttpSourceConfig that = (HttpSourceConfig) o;
return failOnErrors == that.failOnErrors && retainResponseType == that.retainResponseType && Objects.equals(endpoint, that.endpoint) && Objects.equals(verb, that.verb) && Objects.equals(requestPattern, that.requestPattern) && Objects.equals(requestVariables, that.requestVariables) && Objects.equals(headerPattern, that.headerPattern) && Objects.equals(headerVariables, that.headerVariables) && Objects.equals(streamTimeout, that.streamTimeout) && Objects.equals(connectTimeout, that.connectTimeout) && Objects.equals(type, that.type) && Objects.equals(capacity, that.capacity) && Objects.equals(headers, that.headers) && Objects.equals(outputMapping, that.outputMapping) && Objects.equals(metricId, that.metricId);
return failOnErrors == that.failOnErrors && excludeFailOnErrorsCodeRange == that.excludeFailOnErrorsCodeRange && retainResponseType == that.retainResponseType && Objects.equals(endpoint, that.endpoint) && Objects.equals(verb, that.verb) && Objects.equals(requestPattern, that.requestPattern) && Objects.equals(requestVariables, that.requestVariables) && Objects.equals(headerPattern, that.headerPattern) && Objects.equals(headerVariables, that.headerVariables) && Objects.equals(streamTimeout, that.streamTimeout) && Objects.equals(connectTimeout, that.connectTimeout) && Objects.equals(type, that.type) && Objects.equals(capacity, that.capacity) && Objects.equals(headers, that.headers) && Objects.equals(outputMapping, that.outputMapping) && Objects.equals(metricId, that.metricId);
}

@Override
public int hashCode() {
return Objects.hash(endpoint, endpointVariables, verb, requestPattern, requestVariables, headerPattern, headerVariables, streamTimeout, connectTimeout, failOnErrors, type, capacity, headers, outputMapping, metricId, retainResponseType);
return Objects.hash(endpoint, endpointVariables, verb, requestPattern, requestVariables, headerPattern, headerVariables, streamTimeout, connectTimeout, failOnErrors, excludeFailOnErrorsCodeRange, type, capacity, headers, outputMapping, metricId, retainResponseType);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public void shouldReturnHttpExternalSourceConfig() {
outputMapping = new OutputMapping("$.data.tensor.values[0]");
outputMappings.put("surge_factor", outputMapping);

HttpSourceConfig httpSourceConfig = new HttpSourceConfig("http://localhost:8000", "", "post", null, null, null, null, "5000", "5000", true, null, null, headerMap, outputMappings, null, false);
HttpSourceConfig httpSourceConfig = new HttpSourceConfig("http://localhost:8000", "", "post", null, null, null, null, "5000", "5000", true, null, null, null, headerMap, outputMappings, null, false);

assertEquals(httpSourceConfig, defaultPostProcessorConfig.getExternalSource().getHttpConfig().get(0));
}
Expand Down Expand Up @@ -120,7 +120,7 @@ public void shouldBeEmptyWhenNoneOfTheConfigsExist() {
@Test
public void shouldNotBeEmptyWhenExternalSourceHasHttpConfigExist() {
ArrayList<HttpSourceConfig> http = new ArrayList<>();
http.add(new HttpSourceConfig("", "", "", "", "", "", "", "", "", false, "", "", new HashMap<>(), new HashMap<>(), "metricId_01", false));
http.add(new HttpSourceConfig("", "", "", "", "", "", "", "", "", false, null, "", "", new HashMap<>(), new HashMap<>(), "metricId_01", false));
ArrayList<EsSourceConfig> es = new ArrayList<>();
ArrayList<PgSourceConfig> pg = new ArrayList<>();
ExternalSourceConfig externalSourceConfig = new ExternalSourceConfig(http, es, pg, new ArrayList<>());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public void setup() {
HashMap<String, OutputMapping> httpColumnNames = new HashMap<>();
httpColumnNames.put("http_field_1", new OutputMapping(""));
httpColumnNames.put("http_field_2", new OutputMapping(""));
HttpSourceConfig httpSourceConfig = new HttpSourceConfig("endpoint", "", "POST", "/some/patttern/%s", "variable", "", "", "123", "234", false, "type", "20", new HashMap<>(), httpColumnNames, "metricId_01", false);
HttpSourceConfig httpSourceConfig = new HttpSourceConfig("endpoint", "", "POST", "/some/patttern/%s", "variable", "", "", "123", "234", false, null, "type", "20", new HashMap<>(), httpColumnNames, "metricId_01", false);
HashMap<String, OutputMapping> esOutputMapping = new HashMap<>();
esOutputMapping.put("es_field_1", new OutputMapping(""));
EsSourceConfig esSourceConfig = new EsSourceConfig("host", "port", "", "", "endpointPattern",
Expand Down Expand Up @@ -132,7 +132,7 @@ public void shouldProcessWithRightConfiguration() {
outputMapping.put("order_id", new OutputMapping("path"));

List<HttpSourceConfig> httpSourceConfigs = new ArrayList<>();
HttpSourceConfig httpSourceConfig = new HttpSourceConfig("endpoint", "", "POST", "/some/patttern/%s", "variable", "", "", "123", "234", false, "type", "20", new HashMap<>(), outputMapping, "metricId_01", false);
HttpSourceConfig httpSourceConfig = new HttpSourceConfig("endpoint", "", "POST", "/some/patttern/%s", "variable", "", "", "123", "234", false, null, "type", "20", new HashMap<>(), outputMapping, "metricId_01", false);
httpSourceConfigs.add(httpSourceConfig);

List<EsSourceConfig> esSourceConfigs = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public void setUp() {
HashMap<String, OutputMapping> httpOutputMapping = new HashMap<>();
httpOutputMapping.put("http_field_1", new OutputMapping(""));
httpOutputMapping.put("http_field_2", new OutputMapping(""));
HttpSourceConfig httpSourceConfig = new HttpSourceConfig("endpoint", "", "POST", "/some/patttern/%s", "variable", "", "", "123", "234", false, "type", "20", new HashMap<>(), httpOutputMapping, "metricId_01", false);
HttpSourceConfig httpSourceConfig = new HttpSourceConfig("endpoint", "", "POST", "/some/patttern/%s", "variable", "", "", "123", "234", false, null, "type", "20", new HashMap<>(), httpOutputMapping, "metricId_01", false);
http = new ArrayList<>();
http.add(httpSourceConfig);
es = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,13 +119,13 @@ public void hasTypeShouldBeTrueWhenTypeIsPresent() {

@Test
public void hasTypeShouldBeFalseWhenTypeIsNull() {
HttpSourceConfig httpSourceConfig = new HttpSourceConfig("", "", "", "", "", "", "", null, "", false, null, "", new HashMap<>(), new HashMap<>(), metricId, false);
HttpSourceConfig httpSourceConfig = new HttpSourceConfig("", "", "", "", "", "", "", null, "", false, "", null, "", new HashMap<>(), new HashMap<>(), metricId, false);
assertFalse(httpSourceConfig.hasType());
}

@Test
public void hasTypeShouldBeFalseWhenTypeIsEmpty() {
HttpSourceConfig httpSourceConfig = new HttpSourceConfig("", "", "", "", "", "", "", "", "", false, "", "", new HashMap<>(), new HashMap<>(), metricId, false);
HttpSourceConfig httpSourceConfig = new HttpSourceConfig("", "", "", "", "", "", "", "", "", false, "", "", "", new HashMap<>(), new HashMap<>(), metricId, false);
assertFalse(httpSourceConfig.hasType());
}

Expand Down
Loading

0 comments on commit 11fc8e1

Please sign in to comment.