diff --git a/sdks/java/io/solace/build.gradle b/sdks/java/io/solace/build.gradle
index 7c643dc91278..2d720ab7d929 100644
--- a/sdks/java/io/solace/build.gradle
+++ b/sdks/java/io/solace/build.gradle
@@ -43,6 +43,10 @@ dependencies {
implementation library.java.google_api_common
implementation library.java.gax
implementation library.java.threetenbp
+ implementation library.java.google_http_client
+ implementation library.java.google_http_client_gson
+ implementation library.java.jackson_core
+ implementation library.java.jackson_databind
testImplementation library.java.junit
testImplementation project(path: ":sdks:java:io:common")
diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/BasicAuthSempClient.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/BasicAuthSempClient.java
new file mode 100644
index 000000000000..4884bb61e628
--- /dev/null
+++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/BasicAuthSempClient.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.solace.broker;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.api.client.http.HttpRequestFactory;
+import com.solacesystems.jcsmp.JCSMPFactory;
+import java.io.IOException;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.io.solace.data.Semp.Queue;
+import org.apache.beam.sdk.util.SerializableSupplier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A class that manages REST calls to the Solace Element Management Protocol (SEMP) using basic
+ * authentication.
+ *
+ *
This class provides methods to check necessary information, such as if the queue is
+ * non-exclusive, remaining backlog bytes of a queue. It can also create and execute calls to create
+ * queue for a topic.
+ */
+@Internal
+public class BasicAuthSempClient implements SempClient {
+ private static final Logger LOG = LoggerFactory.getLogger(BasicAuthSempClient.class);
+ private final ObjectMapper objectMapper =
+ new ObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+
+ private final SempBasicAuthClientExecutor sempBasicAuthClientExecutor;
+
+ public BasicAuthSempClient(
+ String host,
+ String username,
+ String password,
+ String vpnName,
+ SerializableSupplier httpRequestFactorySupplier) {
+ sempBasicAuthClientExecutor =
+ new SempBasicAuthClientExecutor(
+ host, username, password, vpnName, httpRequestFactorySupplier.get());
+ }
+
+ @Override
+ public boolean isQueueNonExclusive(String queueName) throws IOException {
+ LOG.info("SolaceIO.Read: SempOperations: query SEMP if queue {} is nonExclusive", queueName);
+ BrokerResponse response = sempBasicAuthClientExecutor.getQueueResponse(queueName);
+ if (response.content == null) {
+ throw new IOException("SolaceIO: response from SEMP is empty!");
+ }
+ Queue q = mapJsonToClass(response.content, Queue.class);
+ return q.data().accessType().equals("non-exclusive");
+ }
+
+ @Override
+ public com.solacesystems.jcsmp.Queue createQueueForTopic(String queueName, String topicName)
+ throws IOException {
+ createQueue(queueName);
+ createSubscription(queueName, topicName);
+ return JCSMPFactory.onlyInstance().createQueue(queueName);
+ }
+
+ @Override
+ public long getBacklogBytes(String queueName) throws IOException {
+ BrokerResponse response = sempBasicAuthClientExecutor.getQueueResponse(queueName);
+ if (response.content == null) {
+ throw new IOException("SolaceIO: response from SEMP is empty!");
+ }
+ Queue q = mapJsonToClass(response.content, Queue.class);
+ return q.data().msgSpoolUsage();
+ }
+
+ private void createQueue(String queueName) throws IOException {
+ LOG.info("SolaceIO.Read: Creating new queue {}.", queueName);
+ sempBasicAuthClientExecutor.createQueueResponse(queueName);
+ }
+
+ private void createSubscription(String queueName, String topicName) throws IOException {
+ LOG.info("SolaceIO.Read: Creating new subscription {} for topic {}.", queueName, topicName);
+ sempBasicAuthClientExecutor.createSubscriptionResponse(queueName, topicName);
+ }
+
+ private T mapJsonToClass(String content, Class mapSuccessToClass)
+ throws JsonProcessingException {
+ return objectMapper.readValue(content, mapSuccessToClass);
+ }
+}
diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/BasicAuthSempClientFactory.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/BasicAuthSempClientFactory.java
new file mode 100644
index 000000000000..4c01257373b4
--- /dev/null
+++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/BasicAuthSempClientFactory.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.solace.broker;
+
+import com.google.api.client.http.HttpRequestFactory;
+import com.google.api.client.http.javanet.NetHttpTransport;
+import com.google.auto.value.AutoValue;
+import org.apache.beam.sdk.util.SerializableSupplier;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+/**
+ * A factory for creating {@link BasicAuthSempClient} instances.
+ *
+ * This factory provides a way to create {@link BasicAuthSempClient} instances with different
+ * configurations.
+ */
+@AutoValue
+public abstract class BasicAuthSempClientFactory implements SempClientFactory {
+
+ abstract String host();
+
+ abstract String username();
+
+ abstract String password();
+
+ abstract String vpnName();
+
+ abstract @Nullable SerializableSupplier httpRequestFactorySupplier();
+
+ public static Builder builder() {
+ return new AutoValue_BasicAuthSempClientFactory.Builder();
+ }
+
+ @AutoValue.Builder
+ public abstract static class Builder {
+ /** Set Solace SEMP host, format: [Protocol://]Host[:Port]. e.g. "http://127.0.0.1:8080" */
+ public abstract Builder host(String host);
+
+ /** Set Solace username. */
+ public abstract Builder username(String username);
+ /** Set Solace password. */
+ public abstract Builder password(String password);
+
+ /** Set Solace vpn name. */
+ public abstract Builder vpnName(String vpnName);
+
+ @VisibleForTesting
+ abstract Builder httpRequestFactorySupplier(
+ SerializableSupplier httpRequestFactorySupplier);
+
+ public abstract BasicAuthSempClientFactory build();
+ }
+
+ @Override
+ public SempClient create() {
+ return new BasicAuthSempClient(
+ host(), username(), password(), vpnName(), getHttpRequestFactorySupplier());
+ }
+
+ SerializableSupplier getHttpRequestFactorySupplier() {
+ SerializableSupplier httpRequestSupplier = httpRequestFactorySupplier();
+ return httpRequestSupplier != null
+ ? httpRequestSupplier
+ : () -> new NetHttpTransport().createRequestFactory();
+ }
+}
diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/BrokerResponse.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/BrokerResponse.java
new file mode 100644
index 000000000000..1a47f8012285
--- /dev/null
+++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/BrokerResponse.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.solace.broker;
+
+import com.google.api.client.http.HttpResponse;
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
+import java.util.stream.Collectors;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+public class BrokerResponse {
+ final int code;
+ final String message;
+ @Nullable String content;
+
+ public BrokerResponse(int responseCode, String message, @Nullable InputStream content) {
+ this.code = responseCode;
+ this.message = message;
+ if (content != null) {
+ this.content =
+ new BufferedReader(new InputStreamReader(content, StandardCharsets.UTF_8))
+ .lines()
+ .collect(Collectors.joining("\n"));
+ }
+ }
+
+ public static BrokerResponse fromHttpResponse(HttpResponse response) throws IOException {
+ return new BrokerResponse(
+ response.getStatusCode(), response.getStatusMessage(), response.getContent());
+ }
+
+ @Override
+ public String toString() {
+ return "BrokerResponse{"
+ + "code="
+ + code
+ + ", message='"
+ + message
+ + '\''
+ + ", content="
+ + content
+ + '}';
+ }
+}
diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SempBasicAuthClientExecutor.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SempBasicAuthClientExecutor.java
new file mode 100644
index 000000000000..62a492775e7c
--- /dev/null
+++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SempBasicAuthClientExecutor.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.solace.broker;
+
+import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
+
+import com.google.api.client.http.GenericUrl;
+import com.google.api.client.http.HttpContent;
+import com.google.api.client.http.HttpHeaders;
+import com.google.api.client.http.HttpRequest;
+import com.google.api.client.http.HttpRequestFactory;
+import com.google.api.client.http.HttpResponse;
+import com.google.api.client.http.HttpResponseException;
+import com.google.api.client.http.json.JsonHttpContent;
+import com.google.api.client.json.gson.GsonFactory;
+import java.io.IOException;
+import java.io.Serializable;
+import java.net.CookieManager;
+import java.net.HttpCookie;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+/**
+ * A class to execute requests to SEMP v2 with Basic Auth authentication.
+ *
+ * This approach takes advantage of SEMP Sessions. The
+ * session is established when a user authenticates with HTTP Basic authentication. When the
+ * response is 401 Unauthorized, the client will execute an additional request with Basic Auth
+ * header to refresh the token.
+ */
+class SempBasicAuthClientExecutor implements Serializable {
+ // Every request will be repeated 2 times in case of abnormal connection failures.
+ private static final int REQUEST_NUM_RETRIES = 2;
+ private static final Map COOKIE_MANAGER_MAP =
+ new ConcurrentHashMap();
+ private static final String COOKIES_HEADER = "Set-Cookie";
+
+ private final String username;
+ private final String messageVpn;
+ private final String baseUrl;
+ private final String password;
+ private final CookieManagerKey cookieManagerKey;
+ private final transient HttpRequestFactory requestFactory;
+
+ SempBasicAuthClientExecutor(
+ String host,
+ String username,
+ String password,
+ String vpnName,
+ HttpRequestFactory httpRequestFactory) {
+ this.baseUrl = String.format("%s/SEMP/v2", host);
+ this.username = username;
+ this.messageVpn = vpnName;
+ this.password = password;
+ this.requestFactory = httpRequestFactory;
+ this.cookieManagerKey = new CookieManagerKey(this.baseUrl, this.username);
+ COOKIE_MANAGER_MAP.putIfAbsent(this.cookieManagerKey, new CookieManager());
+ }
+
+ private static String getQueueEndpoint(String messageVpn, String queueName) {
+ return String.format("/monitor/msgVpns/%s/queues/%s", messageVpn, queueName);
+ }
+
+ private static String createQueueEndpoint(String messageVpn) {
+ return String.format("/config/msgVpns/%s/queues", messageVpn);
+ }
+
+ private static String subscriptionEndpoint(String messageVpn, String queueName) {
+ return String.format("/config/msgVpns/%s/queues/%s/subscriptions", messageVpn, queueName);
+ }
+
+ BrokerResponse getQueueResponse(String queueName) throws IOException {
+ String queryUrl = getQueueEndpoint(messageVpn, queueName);
+ HttpResponse response = executeGet(new GenericUrl(baseUrl + queryUrl));
+ return BrokerResponse.fromHttpResponse(response);
+ }
+
+ BrokerResponse createQueueResponse(String queueName) throws IOException {
+ String queryUrl = createQueueEndpoint(messageVpn);
+ ImmutableMap params =
+ ImmutableMap.builder()
+ .put("accessType", "non-exclusive")
+ .put("queueName", queueName)
+ .put("owner", username)
+ .put("permission", "consume")
+ .put("ingressEnabled", true)
+ .put("egressEnabled", true)
+ .build();
+
+ HttpResponse response = executePost(new GenericUrl(baseUrl + queryUrl), params);
+ return BrokerResponse.fromHttpResponse(response);
+ }
+
+ BrokerResponse createSubscriptionResponse(String queueName, String topicName) throws IOException {
+ String queryUrl = subscriptionEndpoint(messageVpn, queueName);
+
+ ImmutableMap params =
+ ImmutableMap.builder()
+ .put("subscriptionTopic", topicName)
+ .put("queueName", queueName)
+ .build();
+ HttpResponse response = executePost(new GenericUrl(baseUrl + queryUrl), params);
+ return BrokerResponse.fromHttpResponse(response);
+ }
+
+ private HttpResponse executeGet(GenericUrl url) throws IOException {
+ HttpRequest request = requestFactory.buildGetRequest(url);
+ return execute(request);
+ }
+
+ private HttpResponse executePost(GenericUrl url, ImmutableMap parameters)
+ throws IOException {
+ HttpContent content = new JsonHttpContent(GsonFactory.getDefaultInstance(), parameters);
+ HttpRequest request = requestFactory.buildPostRequest(url, content);
+ return execute(request);
+ }
+
+ private HttpResponse execute(HttpRequest request) throws IOException {
+ request.setNumberOfRetries(REQUEST_NUM_RETRIES);
+ HttpHeaders httpHeaders = new HttpHeaders();
+ boolean authFromCookie =
+ !checkStateNotNull(COOKIE_MANAGER_MAP.get(cookieManagerKey))
+ .getCookieStore()
+ .getCookies()
+ .isEmpty();
+ if (authFromCookie) {
+ setCookiesFromCookieManager(httpHeaders);
+ request.setHeaders(httpHeaders);
+ } else {
+ httpHeaders.setBasicAuthentication(username, password);
+ request.setHeaders(httpHeaders);
+ }
+
+ HttpResponse response;
+ try {
+ response = request.execute();
+ } catch (HttpResponseException e) {
+ if (authFromCookie && e.getStatusCode() == 401) {
+ checkStateNotNull(COOKIE_MANAGER_MAP.get(cookieManagerKey)).getCookieStore().removeAll();
+ // execute again without cookies to refresh the token.
+ return execute(request);
+ } else { // we might need to handle other response codes here.
+ throw e;
+ }
+ }
+
+ storeCookiesInCookieManager(response.getHeaders());
+ return response;
+ }
+
+ private void setCookiesFromCookieManager(HttpHeaders httpHeaders) {
+ httpHeaders.setCookie(
+ checkStateNotNull(COOKIE_MANAGER_MAP.get(cookieManagerKey)).getCookieStore().getCookies()
+ .stream()
+ .map(s -> s.getName() + "=" + s.getValue())
+ .collect(Collectors.joining(";")));
+ }
+
+ private void storeCookiesInCookieManager(HttpHeaders headers) {
+ List cookiesHeader = headers.getHeaderStringValues(COOKIES_HEADER);
+ if (cookiesHeader != null) {
+ for (String cookie : cookiesHeader) {
+ checkStateNotNull(COOKIE_MANAGER_MAP.get(cookieManagerKey))
+ .getCookieStore()
+ .add(null, HttpCookie.parse(cookie).get(0));
+ }
+ }
+ }
+
+ private static class CookieManagerKey implements Serializable {
+ private final String baseUrl;
+ private final String username;
+
+ CookieManagerKey(String baseUrl, String username) {
+ this.baseUrl = baseUrl;
+ this.username = username;
+ }
+
+ @Override
+ public boolean equals(@Nullable Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof CookieManagerKey)) {
+ return false;
+ }
+ CookieManagerKey that = (CookieManagerKey) o;
+ return Objects.equals(baseUrl, that.baseUrl) && Objects.equals(username, that.username);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(baseUrl, username);
+ }
+ }
+}
diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/data/Semp.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/data/Semp.java
new file mode 100644
index 000000000000..f6f0fb51d22e
--- /dev/null
+++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/data/Semp.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.solace.data;
+
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import com.fasterxml.jackson.databind.annotation.JsonPOJOBuilder;
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+import com.google.auto.value.AutoValue;
+
+public class Semp {
+
+ @AutoValue
+ @JsonSerialize(as = Queue.class)
+ @JsonDeserialize(builder = AutoValue_Semp_Queue.Builder.class)
+ public abstract static class Queue {
+
+ public abstract QueueData data();
+
+ public static Builder builder() {
+ return new AutoValue_Semp_Queue.Builder();
+ }
+
+ public abstract Builder toBuilder();
+
+ @AutoValue.Builder
+ @JsonPOJOBuilder(withPrefix = "set")
+ abstract static class Builder {
+
+ public abstract Builder setData(QueueData queueData);
+
+ public abstract Queue build();
+ }
+ }
+
+ @AutoValue
+ @JsonDeserialize(builder = AutoValue_Semp_QueueData.Builder.class)
+ public abstract static class QueueData {
+ public abstract String accessType();
+
+ public abstract long msgSpoolUsage();
+
+ public static Builder builder() {
+ return new AutoValue_Semp_QueueData.Builder();
+ }
+
+ public abstract Builder toBuilder();
+
+ @AutoValue.Builder
+ @JsonPOJOBuilder(withPrefix = "set")
+ abstract static class Builder {
+
+ public abstract Builder setAccessType(String accessType);
+
+ public abstract Builder setMsgSpoolUsage(long msgSpoolUsage);
+
+ public abstract QueueData build();
+ }
+ }
+}
diff --git a/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/broker/SempBasicAuthClientExecutorTest.java b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/broker/SempBasicAuthClientExecutorTest.java
new file mode 100644
index 000000000000..8cc48ed17ef6
--- /dev/null
+++ b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/broker/SempBasicAuthClientExecutorTest.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.solace.broker;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
+
+import com.google.api.client.http.HttpRequestFactory;
+import com.google.api.client.http.HttpResponseException;
+import com.google.api.client.http.LowLevelHttpRequest;
+import com.google.api.client.http.LowLevelHttpResponse;
+import com.google.api.client.json.Json;
+import com.google.api.client.testing.http.MockHttpTransport;
+import com.google.api.client.testing.http.MockLowLevelHttpRequest;
+import com.google.api.client.testing.http.MockLowLevelHttpResponse;
+import java.io.IOException;
+import java.util.List;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
+import org.junit.Test;
+
+public class SempBasicAuthClientExecutorTest {
+
+ @Test
+ public void testExecuteStatus4xx() {
+ MockHttpTransport transport =
+ new MockHttpTransport() {
+ @Override
+ public LowLevelHttpRequest buildRequest(String method, String url) {
+ return new MockLowLevelHttpRequest() {
+ @Override
+ public LowLevelHttpResponse execute() {
+ MockLowLevelHttpResponse response = new MockLowLevelHttpResponse();
+ response.setStatusCode(404);
+ response.setContentType(Json.MEDIA_TYPE);
+ response.setContent(
+ "{\"meta\":{\"error\":{\"code\":404,\"description\":\"some"
+ + " error\",\"status\":\"xx\"}}}");
+ return response;
+ }
+ };
+ }
+ };
+
+ HttpRequestFactory requestFactory = transport.createRequestFactory();
+ SempBasicAuthClientExecutor client =
+ new SempBasicAuthClientExecutor(
+ "http://host", "username", "password", "vpnName", requestFactory);
+
+ assertThrows(HttpResponseException.class, () -> client.getQueueResponse("queue"));
+ }
+
+ @Test
+ public void testExecuteStatus3xx() {
+ MockHttpTransport transport =
+ new MockHttpTransport() {
+ @Override
+ public LowLevelHttpRequest buildRequest(String method, String url) {
+ return new MockLowLevelHttpRequest() {
+ @Override
+ public LowLevelHttpResponse execute() {
+ MockLowLevelHttpResponse response = new MockLowLevelHttpResponse();
+ response.setStatusCode(301);
+ response.setContentType(Json.MEDIA_TYPE);
+ response.setContent(
+ "{\"meta\":{\"error\":{\"code\":301,\"description\":\"some"
+ + " error\",\"status\":\"xx\"}}}");
+ return response;
+ }
+ };
+ }
+ };
+
+ HttpRequestFactory requestFactory = transport.createRequestFactory();
+ SempBasicAuthClientExecutor client =
+ new SempBasicAuthClientExecutor(
+ "http://host", "username", "password", "vpnName", requestFactory);
+
+ assertThrows(HttpResponseException.class, () -> client.getQueueResponse("queue"));
+ }
+
+ /**
+ * In this test case, we test a situation when a session that we used to authenticate to Semp
+ * expires.
+ *
+ * To test this scenario, we need to do the following:
+ *
+ *
+ * - Send the first request, to initialize a session. This request has to contain the Basic
+ * Auth header and should not include any cookie headers. The response for this request
+ * contains a session cookie we can re-use in the following requests.
+ *
- Send the second request - this request should use a cookie from the previous response.
+ * There should be no Authorization header. To simulate an expired session scenario, we set
+ * the response of this request to the "401 Unauthorized". This should cause a the request
+ * to be retried, this time with the Authorization header.
+ *
- Validate the third request to contain the Basic Auth header and no session cookies.
+ *
+ */
+ @Test
+ public void testExecuteWithUnauthorized() throws IOException {
+ // Making it a final array, so that we can reference it from within the MockHttpTransport
+ // instance
+ final int[] requestCounter = {0};
+ MockHttpTransport transport =
+ new MockHttpTransport() {
+ @Override
+ public LowLevelHttpRequest buildRequest(String method, String url) {
+ return new MockLowLevelHttpRequest() {
+ @Override
+ public LowLevelHttpResponse execute() throws IOException {
+ MockLowLevelHttpResponse response = new MockLowLevelHttpResponse();
+ if (requestCounter[0] == 0) {
+ // The first request has to include Basic Auth header
+ assertTrue(this.getHeaders().containsKey("authorization"));
+ List authorizationHeaders = this.getHeaders().get("authorization");
+ assertEquals(1, authorizationHeaders.size());
+ assertTrue(authorizationHeaders.get(0).contains("Basic"));
+ assertFalse(this.getHeaders().containsKey("cookie"));
+
+ // Set the response to include Session cookies
+ response
+ .setHeaderNames(ImmutableList.of("Set-Cookie", "Set-Cookie"))
+ .setHeaderValues(
+ ImmutableList.of(
+ "ProxySession=JddSdJaGo6FYYmQk6nt8jXxFtq6n3FCFR14ebzRGQ5w;"
+ + " HttpOnly; SameSite=Strict;"
+ + " Path=/proxy; Max-Age=2592000",
+ "Session=JddSdJaGo6FYYmQk6nt8jXxFtq6n3FCFR14ebzRGQ5w;"
+ + " HttpOnly; SameSite=Strict;"
+ + " Path=/SEMP; Max-Age=2592000"));
+ response.setStatusCode(200);
+ } else if (requestCounter[0] == 1) {
+ // The second request does not include Basic Auth header
+ assertFalse(this.getHeaders().containsKey("authorization"));
+ // It must include a cookie header
+ assertTrue(this.getHeaders().containsKey("cookie"));
+ boolean hasSessionCookie =
+ this.getHeaders().get("cookie").stream()
+ .filter(
+ c ->
+ c.contains(
+ "Session=JddSdJaGo6FYYmQk6nt8jXxFtq6n3FCFR14ebzRGQ5w"))
+ .count()
+ == 1;
+ assertTrue(hasSessionCookie);
+
+ // Let's assume the Session expired - we return the 401
+ // unauthorized
+ response.setStatusCode(401);
+ } else {
+ // The second request has to be retried with a Basic Auth header
+ // this time
+ assertTrue(this.getHeaders().containsKey("authorization"));
+ List authorizationHeaders = this.getHeaders().get("authorization");
+ assertEquals(1, authorizationHeaders.size());
+ assertTrue(authorizationHeaders.get(0).contains("Basic"));
+ assertFalse(this.getHeaders().containsKey("cookie"));
+
+ response.setStatusCode(200);
+ }
+ response.setContentType(Json.MEDIA_TYPE);
+ requestCounter[0]++;
+ return response;
+ }
+ };
+ }
+ };
+
+ HttpRequestFactory requestFactory = transport.createRequestFactory();
+ SempBasicAuthClientExecutor client =
+ new SempBasicAuthClientExecutor(
+ "http://host", "username", "password", "vpnName", requestFactory);
+
+ // The first, initial request
+ client.getQueueResponse("queue");
+ // The second request, which will try to authenticate with a cookie, and then with Basic
+ // Auth when it receives a 401 unauthorized
+ client.getQueueResponse("queue");
+
+ // There should be 3 requests executed:
+ // the first one is the initial one with Basic Auth,
+ // the second one uses the session cookie, but we simulate it being expired,
+ // so there should be a third request with Basic Auth to create a new session.
+ assertEquals(3, requestCounter[0]);
+ }
+}