diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/BasicAuthSempClient.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/BasicAuthSempClient.java index 1e1b1c91cd12..4884bb61e628 100644 --- a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/BasicAuthSempClient.java +++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/BasicAuthSempClient.java @@ -25,7 +25,7 @@ import java.io.IOException; import org.apache.beam.sdk.annotations.Internal; import org.apache.beam.sdk.io.solace.data.Semp.Queue; -import org.apache.beam.sdk.io.solace.utils.SerializableSupplier; +import org.apache.beam.sdk.util.SerializableSupplier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/BasicAuthSempClientFactory.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/BasicAuthSempClientFactory.java index 168c5164ba47..4c01257373b4 100644 --- a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/BasicAuthSempClientFactory.java +++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/BasicAuthSempClientFactory.java @@ -20,9 +20,8 @@ import com.google.api.client.http.HttpRequestFactory; import com.google.api.client.http.javanet.NetHttpTransport; import com.google.auto.value.AutoValue; -import org.apache.beam.sdk.io.solace.utils.SerializableSupplier; +import org.apache.beam.sdk.util.SerializableSupplier; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; -import org.checkerframework.checker.nullness.qual.NonNull; import org.checkerframework.checker.nullness.qual.Nullable; /** @@ -34,15 +33,15 @@ @AutoValue public abstract class BasicAuthSempClientFactory implements SempClientFactory { - public abstract String host(); + abstract String host(); - public abstract String username(); + abstract String username(); - public abstract String password(); + abstract String password(); - public abstract String vpnName(); + abstract String vpnName(); - public abstract @Nullable SerializableSupplier httpRequestFactorySupplier(); + abstract @Nullable SerializableSupplier httpRequestFactorySupplier(); public static Builder builder() { return new AutoValue_BasicAuthSempClientFactory.Builder(); @@ -74,8 +73,7 @@ public SempClient create() { host(), username(), password(), vpnName(), getHttpRequestFactorySupplier()); } - @SuppressWarnings("return") - private @NonNull SerializableSupplier getHttpRequestFactorySupplier() { + SerializableSupplier getHttpRequestFactorySupplier() { SerializableSupplier httpRequestSupplier = httpRequestFactorySupplier(); return httpRequestSupplier != null ? httpRequestSupplier diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SempBasicAuthClientExecutor.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SempBasicAuthClientExecutor.java index 23f4d6401526..62a492775e7c 100644 --- a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SempBasicAuthClientExecutor.java +++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SempBasicAuthClientExecutor.java @@ -17,6 +17,8 @@ */ package org.apache.beam.sdk.io.solace.broker; +import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull; + import com.google.api.client.http.GenericUrl; import com.google.api.client.http.HttpContent; import com.google.api.client.http.HttpHeaders; @@ -31,8 +33,12 @@ import java.net.CookieManager; import java.net.HttpCookie; import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; +import org.checkerframework.checker.nullness.qual.Nullable; /** * A class to execute requests to SEMP v2 with Basic Auth authentication. @@ -44,13 +50,17 @@ * header to refresh the token. */ class SempBasicAuthClientExecutor implements Serializable { - private static final CookieManager COOKIE_MANAGER = new CookieManager(); + // Every request will be repeated 2 times in case of abnormal connection failures. + private static final int REQUEST_NUM_RETRIES = 2; + private static final Map COOKIE_MANAGER_MAP = + new ConcurrentHashMap(); private static final String COOKIES_HEADER = "Set-Cookie"; private final String username; private final String messageVpn; private final String baseUrl; private final String password; + private final CookieManagerKey cookieManagerKey; private final transient HttpRequestFactory requestFactory; SempBasicAuthClientExecutor( @@ -64,6 +74,8 @@ class SempBasicAuthClientExecutor implements Serializable { this.messageVpn = vpnName; this.password = password; this.requestFactory = httpRequestFactory; + this.cookieManagerKey = new CookieManagerKey(this.baseUrl, this.username); + COOKIE_MANAGER_MAP.putIfAbsent(this.cookieManagerKey, new CookieManager()); } private static String getQueueEndpoint(String messageVpn, String queueName) { @@ -125,9 +137,13 @@ private HttpResponse executePost(GenericUrl url, ImmutableMap pa } private HttpResponse execute(HttpRequest request) throws IOException { - request.setNumberOfRetries(2); + request.setNumberOfRetries(REQUEST_NUM_RETRIES); HttpHeaders httpHeaders = new HttpHeaders(); - boolean authFromCookie = COOKIE_MANAGER.getCookieStore().getCookies().size() > 0; + boolean authFromCookie = + !checkStateNotNull(COOKIE_MANAGER_MAP.get(cookieManagerKey)) + .getCookieStore() + .getCookies() + .isEmpty(); if (authFromCookie) { setCookiesFromCookieManager(httpHeaders); request.setHeaders(httpHeaders); @@ -141,10 +157,10 @@ private HttpResponse execute(HttpRequest request) throws IOException { response = request.execute(); } catch (HttpResponseException e) { if (authFromCookie && e.getStatusCode() == 401) { - COOKIE_MANAGER.getCookieStore().removeAll(); + checkStateNotNull(COOKIE_MANAGER_MAP.get(cookieManagerKey)).getCookieStore().removeAll(); // execute again without cookies to refresh the token. return execute(request); - } else { + } else { // we might need to handle other response codes here. throw e; } } @@ -155,7 +171,8 @@ private HttpResponse execute(HttpRequest request) throws IOException { private void setCookiesFromCookieManager(HttpHeaders httpHeaders) { httpHeaders.setCookie( - COOKIE_MANAGER.getCookieStore().getCookies().stream() + checkStateNotNull(COOKIE_MANAGER_MAP.get(cookieManagerKey)).getCookieStore().getCookies() + .stream() .map(s -> s.getName() + "=" + s.getValue()) .collect(Collectors.joining(";"))); } @@ -164,8 +181,37 @@ private void storeCookiesInCookieManager(HttpHeaders headers) { List cookiesHeader = headers.getHeaderStringValues(COOKIES_HEADER); if (cookiesHeader != null) { for (String cookie : cookiesHeader) { - COOKIE_MANAGER.getCookieStore().add(null, HttpCookie.parse(cookie).get(0)); + checkStateNotNull(COOKIE_MANAGER_MAP.get(cookieManagerKey)) + .getCookieStore() + .add(null, HttpCookie.parse(cookie).get(0)); } } } + + private static class CookieManagerKey implements Serializable { + private final String baseUrl; + private final String username; + + CookieManagerKey(String baseUrl, String username) { + this.baseUrl = baseUrl; + this.username = username; + } + + @Override + public boolean equals(@Nullable Object o) { + if (this == o) { + return true; + } + if (!(o instanceof CookieManagerKey)) { + return false; + } + CookieManagerKey that = (CookieManagerKey) o; + return Objects.equals(baseUrl, that.baseUrl) && Objects.equals(username, that.username); + } + + @Override + public int hashCode() { + return Objects.hash(baseUrl, username); + } + } } diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/utils/SerializableSupplier.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/utils/SerializableSupplier.java deleted file mode 100644 index c2e4fdfa69fb..000000000000 --- a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/utils/SerializableSupplier.java +++ /dev/null @@ -1,25 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.io.solace.utils; - -import java.io.Serializable; - -@FunctionalInterface -public interface SerializableSupplier extends Serializable { - OutputT get(); -} diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/utils/package-info.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/utils/package-info.java deleted file mode 100644 index abff210a6258..000000000000 --- a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/utils/package-info.java +++ /dev/null @@ -1,20 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** Solace IO connector - utility classes. */ -package org.apache.beam.sdk.io.solace.utils;