Skip to content

Commit

Permalink
Introduce new OpenSearchTransport based on Apache HttpClient 5 (#281) (
Browse files Browse the repository at this point in the history
…#328)

Signed-off-by: Andriy Redko <[email protected]>

Signed-off-by: Andriy Redko <[email protected]>

Signed-off-by: Andriy Redko <[email protected]>
  • Loading branch information
reta authored Jan 17, 2023
1 parent aa9daf4 commit 0081c56
Show file tree
Hide file tree
Showing 41 changed files with 3,709 additions and 29 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)
### Added
- Github workflow for changelog verification ([#239](https://github.com/opensearch-project/opensearch-java/pull/239))
- Github workflow for dependabot PRs ([#247](https://github.com/opensearch-project/opensearch-java/pull/247))
- Introduce new OpenSearchTransport based on Apache HttpClient 5 ([#328](https://github.com/opensearch-project/opensearch-java/pull/328))

### Dependencies
- Bumps `grgit-gradle` from 4.0.1 to 5.0.0
Expand Down
37 changes: 37 additions & 0 deletions USER_GUIDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

- [User Guide](#user-guide)
- [Sample data](#sample-data)
- [Create a client](#create-a-client)
- [Create an index](#create-an-index)
- [Index data](#index-data)
- [Search for the document](#search-for-the-document)
Expand Down Expand Up @@ -48,6 +49,42 @@ static class IndexData {
}
```

## Create a client

There are multiple low level transports which `OpenSearchClient` could be configured with.

### Create a client using `RestClientTransport`

```java
Transport transport = new RestClientTransport(restClient, new JacksonJsonpMapper());
OpenSearchClient client = new OpenSearchClient(transport);
```

The `JacksonJsonpMapper` class (2.x versions) only supports Java 7 objects by default. [Java 8 modules](https://github.com/FasterXML/jackson-modules-java8) to support JDK8 classes such as the Date and Time API (JSR-310), `Optional`, and more can be used by including [the additional datatype dependency](https://github.com/FasterXML/jackson-modules-java8#usage) and adding the module. For example, to include JSR-310 classes:

```java
Transport transport = new RestClientTransport(restClient,
new JacksonJsonpMapper(new ObjectMapper().registerModule(new JavaTimeModule())));
OpenSearchClient client = new OpenSearchClient(transport);
```

### Create a client using `ApacheHttpClient5Transport`

```java
final Transport transport = ApacheHttpClient5TransportBuilder
.builder(hosts)
.setMapper(new JacksonJsonpMapper())
.build();
OpenSearchClient client = new OpenSearchClient(transport);
```

The Apache HttpClient 5 based transport has dependences on Apache HttpClient 5 and Apache HttpCore 5 which has to be added to the project explicitly.

```gradle
implementation("org.apache.httpcomponents.client5", "httpclient5", "5.1.4")
implementation("org.apache.httpcomponents.core5", "httpcore5", "5.1.5")
```

## Create an index

```java
Expand Down
4 changes: 4 additions & 0 deletions java-client/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,10 @@ dependencies {
implementation("com.fasterxml.jackson.core", "jackson-databind", jacksonDatabindVersion)
testImplementation("com.fasterxml.jackson.datatype", "jackson-datatype-jakarta-jsonp", jacksonVersion)

// ApacheHttpClient5Transport dependencies (optional)
implementation("org.apache.httpcomponents.client5", "httpclient5", "5.1.4")
implementation("org.apache.httpcomponents.core5", "httpcore5", "5.1.5")

// For AwsSdk2Transport
"awsSdk2SupportImplementation"("software.amazon.awssdk","sdk-core","[2.15,3.0)")
"awsSdk2SupportImplementation"("software.amazon.awssdk","auth","[2.15,3.0)")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.client.transport;

public final class TransportHeaders {
public static final String ACCEPT = "Accept";
public static final String USER_AGENT = "User-Agent";

private TransportHeaders() {
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,267 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.client.transport.httpclient5;

import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Map.Entry;
import java.util.function.Function;
import java.util.stream.Collectors;

import org.apache.hc.client5.http.config.RequestConfig;
import org.apache.hc.core5.http.Header;
import org.apache.hc.core5.http.message.BasicHeader;
import org.apache.hc.core5.http.nio.AsyncResponseConsumer;
import org.opensearch.client.transport.TransportOptions;
import org.opensearch.client.transport.Version;

import static org.opensearch.client.transport.TransportHeaders.ACCEPT;
import static org.opensearch.client.transport.TransportHeaders.USER_AGENT;

public class ApacheHttpClient5Options implements TransportOptions {
/**
* Default request options.
*/
public static final ApacheHttpClient5Options DEFAULT = new Builder(
Collections.emptyList(),
HttpAsyncResponseConsumerFactory.HeapBufferedResponseConsumerFactory.DEFAULT,
null,
null
).build();

private final List<Header> headers;
private final HttpAsyncResponseConsumerFactory httpAsyncResponseConsumerFactory;
private final WarningsHandler warningsHandler;
private final RequestConfig requestConfig;

private ApacheHttpClient5Options(Builder builder) {
this.headers = Collections.unmodifiableList(new ArrayList<>(builder.headers));
this.httpAsyncResponseConsumerFactory = builder.httpAsyncResponseConsumerFactory;
this.warningsHandler = builder.warningsHandler;
this.requestConfig = builder.requestConfig;
}

public HttpAsyncResponseConsumerFactory getHttpAsyncResponseConsumerFactory() {
return httpAsyncResponseConsumerFactory;
}

public WarningsHandler getWarningsHandler() {
return warningsHandler;
}

public RequestConfig getRequestConfig() {
return requestConfig;
}

@Override
public Collection<Entry<String, String>> headers() {
return headers.stream()
.map(h -> new AbstractMap.SimpleImmutableEntry<>(h.getName(), h.getValue()))
.collect(Collectors.toList());
}

@Override
public Map<String, String> queryParameters() {
return null;
}

@Override
public Function<List<String>, Boolean> onWarnings() {
if (warningsHandler == null) {
return null;
} else {
return warnings -> warningsHandler.warningsShouldFailRequest(warnings);
}
}

@Override
public Builder toBuilder() {
return new Builder(headers, httpAsyncResponseConsumerFactory, warningsHandler, requestConfig);
}

public static class Builder implements TransportOptions.Builder {
private final List<Header> headers;
private HttpAsyncResponseConsumerFactory httpAsyncResponseConsumerFactory;
private WarningsHandler warningsHandler;
private RequestConfig requestConfig;

private Builder(Builder builder) {
this(builder.headers, builder.httpAsyncResponseConsumerFactory,
builder.warningsHandler, builder.requestConfig);
}

private Builder(
List<Header> headers,
HttpAsyncResponseConsumerFactory httpAsyncResponseConsumerFactory,
WarningsHandler warningsHandler,
RequestConfig requestConfig
) {
this.headers = new ArrayList<>(headers);
this.httpAsyncResponseConsumerFactory = httpAsyncResponseConsumerFactory;
this.warningsHandler = warningsHandler;
this.requestConfig = requestConfig;
}

/**
* Add the provided header to the request.
*
* @param name the header name
* @param value the header value
* @throws NullPointerException if {@code name} or {@code value} is null.
*/
@Override
public Builder addHeader(String name, String value) {
Objects.requireNonNull(name, "header name cannot be null");
Objects.requireNonNull(value, "header value cannot be null");
this.headers.add(new ReqHeader(name, value));
return this;
}

@Override
public TransportOptions.Builder setParameter(String name, String value) {
return this;
}

/**
* Called if there are warnings to determine if those warnings should fail the request.
*/
@Override
public TransportOptions.Builder onWarnings(Function<List<String>, Boolean> listener) {
if (listener == null) {
setWarningsHandler(null);
} else {
setWarningsHandler(w -> {
if (w != null && !w.isEmpty()) {
return listener.apply(w);
} else {
return false;
}
});
}

return this;
}

/**
* Set the {@link HttpAsyncResponseConsumerFactory} used to create one
* {@link AsyncResponseConsumer} callback per retry. Controls how the
* response body gets streamed from a non-blocking HTTP connection on the
* client side.
*
* @param httpAsyncResponseConsumerFactory factory for creating {@link AsyncResponseConsumer}.
* @throws NullPointerException if {@code httpAsyncResponseConsumerFactory} is null.
*/
public void setHttpAsyncResponseConsumerFactory(HttpAsyncResponseConsumerFactory httpAsyncResponseConsumerFactory) {
this.httpAsyncResponseConsumerFactory = Objects.requireNonNull(
httpAsyncResponseConsumerFactory,
"httpAsyncResponseConsumerFactory cannot be null"
);
}

/**
* How this request should handle warnings. If null (the default) then
* this request will default to the behavior dictacted by
* `setStrictDeprecationMode`.
* <p>
* This can be set to {@link WarningsHandler#PERMISSIVE} if the client
* should ignore all warnings which is the same behavior as setting
* strictDeprecationMode to true. It can be set to
* {@link WarningsHandler#STRICT} if the client should fail if there are
* any warnings which is the same behavior as settings
* strictDeprecationMode to false.
* <p>
* It can also be set to a custom implementation of
* {@linkplain WarningsHandler} to permit only certain warnings or to
* fail the request if the warnings returned don't
* <strong>exactly</strong> match some set.
*
* @param warningsHandler the {@link WarningsHandler} to be used
*/
public void setWarningsHandler(WarningsHandler warningsHandler) {
this.warningsHandler = warningsHandler;
}

/**
* set RequestConfig, which can set socketTimeout, connectTimeout
* and so on by request
* @param requestConfig http client RequestConfig
* @return Builder
*/
public Builder setRequestConfig(RequestConfig requestConfig) {
this.requestConfig = requestConfig;
return this;
}

@Override
public ApacheHttpClient5Options build() {
return new ApacheHttpClient5Options(this);
}
}

static ApacheHttpClient5Options initialOptions() {
String ua = String.format(
Locale.ROOT,
"opensearch-java/%s (Java/%s)",
Version.VERSION == null ? "Unknown" : Version.VERSION.toString(),
System.getProperty("java.version")
);

return new ApacheHttpClient5Options(
DEFAULT.toBuilder()
.addHeader(USER_AGENT, ua)
.addHeader(ACCEPT, ApacheHttpClient5Transport.JsonContentType.toString())
);
}

static ApacheHttpClient5Options of(TransportOptions options) {
if (options instanceof ApacheHttpClient5Options) {
return (ApacheHttpClient5Options)options;

} else {
final Builder builder = new Builder(DEFAULT.toBuilder());
options.headers().forEach(h -> builder.addHeader(h.getKey(), h.getValue()));
options.queryParameters().forEach(builder::setParameter);
builder.onWarnings(options.onWarnings());
return builder.build();
}
}

/**
* Custom implementation of {@link BasicHeader} that overrides equals and
* hashCode so it is easier to test equality of {@link ApacheHttpClient5Options}.
*/
static final class ReqHeader extends BasicHeader {
ReqHeader(String name, String value) {
super(name, value);
}

@Override
public boolean equals(Object other) {
if (this == other) {
return true;
}
if (other instanceof ReqHeader) {
Header otherHeader = (Header) other;
return Objects.equals(getName(), otherHeader.getName()) && Objects.equals(getValue(), otherHeader.getValue());
}
return false;
}

@Override
public int hashCode() {
return Objects.hash(getName(), getValue());
}
}
}
Loading

0 comments on commit 0081c56

Please sign in to comment.