Skip to content

Commit

Permalink
Use core SerializableSupplier, remove SuppressWarnings, extract numbe…
Browse files Browse the repository at this point in the history
…r to a variable, cookie store map
  • Loading branch information
bzablocki committed Jul 8, 2024
1 parent 353f959 commit 9b6b5b2
Show file tree
Hide file tree
Showing 5 changed files with 61 additions and 62 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import java.io.IOException;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.io.solace.data.Semp.Queue;
import org.apache.beam.sdk.io.solace.utils.SerializableSupplier;
import org.apache.beam.sdk.util.SerializableSupplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,8 @@
import com.google.api.client.http.HttpRequestFactory;
import com.google.api.client.http.javanet.NetHttpTransport;
import com.google.auto.value.AutoValue;
import org.apache.beam.sdk.io.solace.utils.SerializableSupplier;
import org.apache.beam.sdk.util.SerializableSupplier;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.Nullable;

/**
Expand All @@ -34,15 +33,15 @@
@AutoValue
public abstract class BasicAuthSempClientFactory implements SempClientFactory {

public abstract String host();
abstract String host();

public abstract String username();
abstract String username();

public abstract String password();
abstract String password();

public abstract String vpnName();
abstract String vpnName();

public abstract @Nullable SerializableSupplier<HttpRequestFactory> httpRequestFactorySupplier();
abstract @Nullable SerializableSupplier<HttpRequestFactory> httpRequestFactorySupplier();

public static Builder builder() {
return new AutoValue_BasicAuthSempClientFactory.Builder();
Expand Down Expand Up @@ -74,8 +73,7 @@ public SempClient create() {
host(), username(), password(), vpnName(), getHttpRequestFactorySupplier());
}

@SuppressWarnings("return")
private @NonNull SerializableSupplier<HttpRequestFactory> getHttpRequestFactorySupplier() {
SerializableSupplier<HttpRequestFactory> getHttpRequestFactorySupplier() {
SerializableSupplier<HttpRequestFactory> httpRequestSupplier = httpRequestFactorySupplier();
return httpRequestSupplier != null
? httpRequestSupplier
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
*/
package org.apache.beam.sdk.io.solace.broker;

import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;

import com.google.api.client.http.GenericUrl;
import com.google.api.client.http.HttpContent;
import com.google.api.client.http.HttpHeaders;
Expand All @@ -31,8 +33,12 @@
import java.net.CookieManager;
import java.net.HttpCookie;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.checkerframework.checker.nullness.qual.Nullable;

/**
* A class to execute requests to SEMP v2 with Basic Auth authentication.
Expand All @@ -44,13 +50,17 @@
* header to refresh the token.
*/
class SempBasicAuthClientExecutor implements Serializable {
private static final CookieManager COOKIE_MANAGER = new CookieManager();
// Every request will be repeated 2 times in case of abnormal connection failures.
private static final int REQUEST_NUM_RETRIES = 2;
private static final Map<CookieManagerKey, CookieManager> COOKIE_MANAGER_MAP =
new ConcurrentHashMap<CookieManagerKey, CookieManager>();
private static final String COOKIES_HEADER = "Set-Cookie";

private final String username;
private final String messageVpn;
private final String baseUrl;
private final String password;
private final CookieManagerKey cookieManagerKey;
private final transient HttpRequestFactory requestFactory;

SempBasicAuthClientExecutor(
Expand All @@ -64,6 +74,8 @@ class SempBasicAuthClientExecutor implements Serializable {
this.messageVpn = vpnName;
this.password = password;
this.requestFactory = httpRequestFactory;
this.cookieManagerKey = new CookieManagerKey(this.baseUrl, this.username);
COOKIE_MANAGER_MAP.putIfAbsent(this.cookieManagerKey, new CookieManager());
}

private static String getQueueEndpoint(String messageVpn, String queueName) {
Expand Down Expand Up @@ -125,9 +137,13 @@ private HttpResponse executePost(GenericUrl url, ImmutableMap<String, Object> pa
}

private HttpResponse execute(HttpRequest request) throws IOException {
request.setNumberOfRetries(2);
request.setNumberOfRetries(REQUEST_NUM_RETRIES);
HttpHeaders httpHeaders = new HttpHeaders();
boolean authFromCookie = COOKIE_MANAGER.getCookieStore().getCookies().size() > 0;
boolean authFromCookie =
!checkStateNotNull(COOKIE_MANAGER_MAP.get(cookieManagerKey))
.getCookieStore()
.getCookies()
.isEmpty();
if (authFromCookie) {
setCookiesFromCookieManager(httpHeaders);
request.setHeaders(httpHeaders);
Expand All @@ -141,10 +157,10 @@ private HttpResponse execute(HttpRequest request) throws IOException {
response = request.execute();
} catch (HttpResponseException e) {
if (authFromCookie && e.getStatusCode() == 401) {
COOKIE_MANAGER.getCookieStore().removeAll();
checkStateNotNull(COOKIE_MANAGER_MAP.get(cookieManagerKey)).getCookieStore().removeAll();
// execute again without cookies to refresh the token.
return execute(request);
} else {
} else { // we might need to handle other response codes here.
throw e;
}
}
Expand All @@ -155,7 +171,8 @@ private HttpResponse execute(HttpRequest request) throws IOException {

private void setCookiesFromCookieManager(HttpHeaders httpHeaders) {
httpHeaders.setCookie(
COOKIE_MANAGER.getCookieStore().getCookies().stream()
checkStateNotNull(COOKIE_MANAGER_MAP.get(cookieManagerKey)).getCookieStore().getCookies()
.stream()
.map(s -> s.getName() + "=" + s.getValue())
.collect(Collectors.joining(";")));
}
Expand All @@ -164,8 +181,37 @@ private void storeCookiesInCookieManager(HttpHeaders headers) {
List<String> cookiesHeader = headers.getHeaderStringValues(COOKIES_HEADER);
if (cookiesHeader != null) {
for (String cookie : cookiesHeader) {
COOKIE_MANAGER.getCookieStore().add(null, HttpCookie.parse(cookie).get(0));
checkStateNotNull(COOKIE_MANAGER_MAP.get(cookieManagerKey))
.getCookieStore()
.add(null, HttpCookie.parse(cookie).get(0));
}
}
}

private static class CookieManagerKey implements Serializable {
private final String baseUrl;
private final String username;

CookieManagerKey(String baseUrl, String username) {
this.baseUrl = baseUrl;
this.username = username;
}

@Override
public boolean equals(@Nullable Object o) {
if (this == o) {
return true;
}
if (!(o instanceof CookieManagerKey)) {
return false;
}
CookieManagerKey that = (CookieManagerKey) o;
return Objects.equals(baseUrl, that.baseUrl) && Objects.equals(username, that.username);
}

@Override
public int hashCode() {
return Objects.hash(baseUrl, username);
}
}
}

This file was deleted.

This file was deleted.

0 comments on commit 9b6b5b2

Please sign in to comment.