Skip to content

Commit

Permalink
version_conflict_engine_exception handling
Browse files Browse the repository at this point in the history
Signed-off-by: Andriy Redko <[email protected]>
  • Loading branch information
reta committed Jun 30, 2022
1 parent 2637684 commit 40cd35c
Show file tree
Hide file tree
Showing 5 changed files with 216 additions and 8 deletions.
4 changes: 3 additions & 1 deletion checkstyle/checkstyle.xml
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,9 @@
<module name="BooleanExpressionComplexity"/>

<!-- See http://checkstyle.sourceforge.net/config_metrics.html#ClassFanOutComplexity -->
<module name="ClassFanOutComplexity"/>
<module name="ClassFanOutComplexity">
<property name="max" value="21"/>
</module>

<!-- See http://checkstyle.sourceforge.net/config_metrics.html#CyclomaticComplexity -->
<module name="CyclomaticComplexity"/>
Expand Down
8 changes: 8 additions & 0 deletions docs/opensearch-sink-connector-config-options.rst
Original file line number Diff line number Diff line change
Expand Up @@ -163,3 +163,11 @@ Data Conversion
* Default: fail
* Valid Values: [ignore, warn, fail]
* Importance: low

``behavior.on.version.conflict``
How to handle records that OpenSearch rejects due to document's version conflicts. It may happen when offsets were not committed or/and records have to be reprocessed. Valid options are 'ignore', 'warn', and 'fail'.

* Type: string
* Default: fail
* Valid Values: [ignore, warn, fail]
* Importance: low
110 changes: 105 additions & 5 deletions src/main/java/io/aiven/kafka/connect/opensearch/BulkProcessor.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.opensearch.action.bulk.BulkResponse;
import org.opensearch.client.RequestOptions;
import org.opensearch.client.RestHighLevelClient;
import org.opensearch.rest.RestStatus;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -63,6 +64,7 @@ public class BulkProcessor {
private final int maxRetries;
private final long retryBackoffMs;
private final BehaviorOnMalformedDoc behaviorOnMalformedDoc;
private final BehaviorOnVersionConflict behaviorOnVersionConflict;

private final Thread farmer;
private final ExecutorService executor;
Expand Down Expand Up @@ -90,12 +92,24 @@ public BulkProcessor(final Time time,
this.maxRetries = config.maxRetry();
this.retryBackoffMs = config.retryBackoffMs();
this.behaviorOnMalformedDoc = config.behaviorOnMalformedDoc();
this.behaviorOnVersionConflict = config.behaviorOnVersionConflict();

unsentRecords = new ArrayDeque<>(maxBufferedRecords);

final ThreadFactory threadFactory = makeThreadFactory();
farmer = threadFactory.newThread(farmerTask());
executor = Executors.newFixedThreadPool(config.maxInFlightRequests(), threadFactory);

if (!config.ignoreKey() && config.behaviorOnVersionConflict() == BehaviorOnVersionConflict.FAIL) {
LOGGER.warn("The {} is set to `false` which assumes external version and optimistic locking."
+ " You may consider changing the configuration property '{}' from '{}' to '{}' or '{}'"
+ " to deal with possible version conflicts.",
OpensearchSinkConnectorConfig.KEY_IGNORE_CONFIG,
OpensearchSinkConnectorConfig.BEHAVIOR_ON_VERSION_CONFLICT_CONFIG,
BehaviorOnMalformedDoc.FAIL,
BehaviorOnMalformedDoc.IGNORE,
BehaviorOnMalformedDoc.WARN);
}
}

private ThreadFactory makeThreadFactory() {
Expand Down Expand Up @@ -363,10 +377,13 @@ private BulkResponse execute() throws Exception {
if (!itemResponse.getFailure().isAborted()) {
if (responseContainsMalformedDocError(itemResponse)) {
handleMalformedDoc(itemResponse);
}
throw new RuntimeException(
"One of the item in the bulk response failed. Reason: "
} else if (responseContainsVersionConflict(itemResponse)) {
handleVersionConflict(itemResponse);
} else {
throw new RuntimeException(
"One of the item in the bulk response failed. Reason: "
+ itemResponse.getFailureMessage());
}
} else {
throw new ConnectException(
"One of the item in the bulk response aborted. Reason: "
Expand All @@ -382,6 +399,35 @@ private BulkResponse execute() throws Exception {
}, maxRetries, retryBackoffMs, RuntimeException.class);
}

private void handleVersionConflict(final BulkItemResponse bulkItemResponse) {
// if the elasticsearch request failed because of a version conflict,
// the behavior is configurable.
switch (behaviorOnVersionConflict) {
case IGNORE:
LOGGER.debug("Encountered a version conflict when executing batch {} of {}"
+ " records. Ignoring and will keep an existing record. Error was {}",
batchId, batch.size(), bulkItemResponse.getFailureMessage());
break;
case WARN:
LOGGER.warn("Encountered a version conflict when executing batch {} of {}"
+ " records. Ignoring and will keep an existing record. Error was {}",
batchId, batch.size(), bulkItemResponse.getFailureMessage());
break;
case FAIL:
default:
LOGGER.error("Encountered a version conflict when executing batch {} of {}"
+ " records. Error was {} (to ignore version conflicts you may consider"
+ " changing the configuration property '{}' from '{}' to '{}').",
batchId, batch.size(), bulkItemResponse.getFailureMessage(),
OpensearchSinkConnectorConfig.BEHAVIOR_ON_VERSION_CONFLICT_CONFIG,
BehaviorOnMalformedDoc.FAIL,
BehaviorOnMalformedDoc.IGNORE);
throw new ConnectException(
"One of the item in the bulk response failed. Reason: "
+ bulkItemResponse.getFailureMessage());
}
}

private void handleMalformedDoc(final BulkItemResponse bulkItemResponse) {
// if the elasticsearch request failed because of a malformed document,
// the behavior is configurable.
Expand All @@ -390,12 +436,12 @@ private void handleMalformedDoc(final BulkItemResponse bulkItemResponse) {
LOGGER.debug("Encountered an illegal document error when executing batch {} of {}"
+ " records. Ignoring and will not index record. Error was {}",
batchId, batch.size(), bulkItemResponse.getFailureMessage());
return;
break;
case WARN:
LOGGER.warn("Encountered an illegal document error when executing batch {} of {}"
+ " records. Ignoring and will not index record. Error was {}",
batchId, batch.size(), bulkItemResponse.getFailureMessage());
return;
break;
case FAIL:
default:
LOGGER.error("Encountered an illegal document error when executing batch {} of {}"
Expand All @@ -409,6 +455,11 @@ private void handleMalformedDoc(final BulkItemResponse bulkItemResponse) {
}
}
}

private boolean responseContainsVersionConflict(final BulkItemResponse bulkItemResponse) {
return bulkItemResponse.getFailure().getStatus() == RestStatus.CONFLICT
|| bulkItemResponse.getFailureMessage().contains("version_conflict_engine_exception");
}

private boolean responseContainsMalformedDocError(final BulkItemResponse bulkItemResponse) {
return bulkItemResponse.getFailureMessage().contains("strict_dynamic_mapping_exception")
Expand Down Expand Up @@ -492,4 +543,53 @@ public String toString() {
return name().toLowerCase(Locale.ROOT);
}
}

public enum BehaviorOnVersionConflict {
IGNORE,
WARN,
FAIL;

public static final BehaviorOnVersionConflict DEFAULT = FAIL;

// Want values for "behavior.on.version.conflict" property to be case-insensitive
public static final ConfigDef.Validator VALIDATOR = new ConfigDef.Validator() {
private final ConfigDef.ValidString validator = ConfigDef.ValidString.in(names());

@Override
public void ensureValid(final String name, final Object value) {
if (value instanceof String) {
final String lowerCaseStringValue = ((String) value).toLowerCase(Locale.ROOT);
validator.ensureValid(name, lowerCaseStringValue);
} else {
validator.ensureValid(name, value);
}
}

// Overridden here so that ConfigDef.toEnrichedRst shows possible values correctly
@Override
public String toString() {
return validator.toString();
}
};

public static String[] names() {
final BehaviorOnVersionConflict[] behaviors = values();
final String[] result = new String[behaviors.length];

for (int i = 0; i < behaviors.length; i++) {
result[i] = behaviors[i].toString();
}

return result;
}

public static BehaviorOnVersionConflict forValue(final String value) {
return valueOf(value.toUpperCase(Locale.ROOT));
}

@Override
public String toString() {
return name().toLowerCase(Locale.ROOT);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,11 @@ public class OpensearchSinkConnectorConfig extends AbstractConfig {
+ "Opensearch rejects due to some malformation of the document itself, such as an index"
+ " mapping conflict or a field name containing illegal characters. Valid options are "
+ "'ignore', 'warn', and 'fail'.";

public static final String BEHAVIOR_ON_VERSION_CONFLICT_CONFIG = "behavior.on.version.conflict";
private static final String BEHAVIOR_ON_VERSION_CONFLICT_DOC = "How to handle records that "
+ "Opensearch rejects due to version conflicts (if optimistic locking mechanism has been"
+ "activated). Valid options are 'ignore', 'warn', and 'fail'.";

protected static ConfigDef baseConfigDef() {
final ConfigDef configDef = new ConfigDef();
Expand Down Expand Up @@ -392,7 +397,18 @@ private static void addConversionConfigs(final ConfigDef configDef) {
group,
++order,
Width.SHORT,
"Behavior on malformed documents");
"Behavior on malformed documents"
).define(
BEHAVIOR_ON_VERSION_CONFLICT_CONFIG,
Type.STRING,
BulkProcessor.BehaviorOnVersionConflict.DEFAULT.toString(),
BulkProcessor.BehaviorOnVersionConflict.VALIDATOR,
Importance.LOW,
BEHAVIOR_ON_VERSION_CONFLICT_DOC,
group,
++order,
Width.SHORT,
"Behavior on document's version conflict (optimistic locking)");
}

public static final ConfigDef CONFIG = baseConfigDef();
Expand Down Expand Up @@ -522,6 +538,12 @@ public BulkProcessor.BehaviorOnMalformedDoc behaviorOnMalformedDoc() {
);
}

public BulkProcessor.BehaviorOnVersionConflict behaviorOnVersionConflict() {
return BulkProcessor.BehaviorOnVersionConflict.forValue(
getString(OpensearchSinkConnectorConfig.BEHAVIOR_ON_VERSION_CONFLICT_CONFIG)
);
}

public static void main(final String[] args) {
System.out.println("=========================================");
System.out.println("Opensearch Sink Connector Configuration Options");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@
import org.opensearch.common.bytes.BytesReference;
import org.opensearch.common.xcontent.XContentType;

import io.aiven.kafka.connect.opensearch.BulkProcessor.BehaviorOnVersionConflict;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
Expand All @@ -47,6 +49,7 @@
import static io.aiven.kafka.connect.opensearch.BulkProcessor.BehaviorOnMalformedDoc;
import static io.aiven.kafka.connect.opensearch.OpensearchSinkConnectorConfig.BATCH_SIZE_CONFIG;
import static io.aiven.kafka.connect.opensearch.OpensearchSinkConnectorConfig.BEHAVIOR_ON_MALFORMED_DOCS_CONFIG;
import static io.aiven.kafka.connect.opensearch.OpensearchSinkConnectorConfig.BEHAVIOR_ON_VERSION_CONFLICT_CONFIG;
import static io.aiven.kafka.connect.opensearch.OpensearchSinkConnectorConfig.CONNECTION_URL_CONFIG;
import static io.aiven.kafka.connect.opensearch.OpensearchSinkConnectorConfig.LINGER_MS_CONFIG;
import static io.aiven.kafka.connect.opensearch.OpensearchSinkConnectorConfig.MAX_BUFFERED_RECORDS_CONFIG;
Expand Down Expand Up @@ -413,10 +416,83 @@ public void ignoreOrWarnOnMalformedDoc(final @Mock RestHighLevelClient client) t
bulkProcessor.add(newIndexRequest(43), 1);

final int flushTimeoutMs = 1000;
assertThrows(ConnectException.class, () -> bulkProcessor.flush(flushTimeoutMs));
bulkProcessor.flush(flushTimeoutMs);

assertTrue(clientAnswer.expectationsMet());
}
}

@Test
public void failOnVersionConfict(final @Mock RestHighLevelClient client) throws IOException {
final var clientAnswer = new ClientAnswer();
when(client.bulk(any(BulkRequest.class), eq(RequestOptions.DEFAULT))).thenAnswer(clientAnswer);
final String errorInfo =
" [{\"type\":\"version_conflict_engine_exception\","
+ "\"reason\":\"[1]: version conflict, current version [3] is higher or"
+ " equal to the one provided [3]\""
+ "}]";
final var config = new OpensearchSinkConnectorConfig(Map.of(
CONNECTION_URL_CONFIG, "http://localhost",
MAX_BUFFERED_RECORDS_CONFIG, "100",
MAX_IN_FLIGHT_REQUESTS_CONFIG, "5",
BATCH_SIZE_CONFIG, "2",
LINGER_MS_CONFIG, "5",
MAX_RETRIES_CONFIG, "3",
READ_TIMEOUT_MS_CONFIG, "1",
BEHAVIOR_ON_VERSION_CONFLICT_CONFIG, BehaviorOnVersionConflict.FAIL.toString()
));
final var bulkProcessor = new BulkProcessor(Time.SYSTEM, client, config);
clientAnswer.expect(
List.of(
newIndexRequest(42),
newIndexRequest(43)
), failedResponse(errorInfo));

bulkProcessor.start();

bulkProcessor.add(newIndexRequest(42), 1);
bulkProcessor.add(newIndexRequest(43), 1);

assertThrows(ConnectException.class, () -> bulkProcessor.flush(1000));
verify(client, times(1)).bulk(any(BulkRequest.class), eq(RequestOptions.DEFAULT));
assertTrue(clientAnswer.expectationsMet());
}

@Test
public void ignoreOnVersionConfict(final @Mock RestHighLevelClient client) throws IOException {
final var clientAnswer = new ClientAnswer();
when(client.bulk(any(BulkRequest.class), eq(RequestOptions.DEFAULT))).thenAnswer(clientAnswer);
final String errorInfo =
" [{\"type\":\"version_conflict_engine_exception\","
+ "\"reason\":\"[1]: version conflict, current version [3] is higher or"
+ " equal to the one provided [3]\""
+ "}]";
final var config = new OpensearchSinkConnectorConfig(Map.of(
CONNECTION_URL_CONFIG, "http://localhost",
MAX_BUFFERED_RECORDS_CONFIG, "100",
MAX_IN_FLIGHT_REQUESTS_CONFIG, "5",
BATCH_SIZE_CONFIG, "2",
LINGER_MS_CONFIG, "5",
MAX_RETRIES_CONFIG, "3",
READ_TIMEOUT_MS_CONFIG, "1",
BEHAVIOR_ON_VERSION_CONFLICT_CONFIG, BehaviorOnVersionConflict.IGNORE.toString()
));
final var bulkProcessor = new BulkProcessor(Time.SYSTEM, client, config);
clientAnswer.expect(
List.of(
newIndexRequest(42),
newIndexRequest(43)
), failedResponse(errorInfo));

bulkProcessor.start();

bulkProcessor.add(newIndexRequest(42), 1);
bulkProcessor.add(newIndexRequest(43), 1);
bulkProcessor.flush(1000);

assertTrue(clientAnswer.expectationsMet());
}

IndexRequest newIndexRequest(final int body) {
return new IndexRequest("idx").id("some_id").source(body, XContentType.JSON);
}
Expand Down

0 comments on commit 40cd35c

Please sign in to comment.