Skip to content

Commit

Permalink
Solace Read connector: adding implementations of SempClient and SempC…
Browse files Browse the repository at this point in the history
…lientFactory (apache#31542)

* Adding implementations of SempClient and SempClientFactory

* Use core SerializableSupplier, remove SuppressWarnings, extract number to a variable, cookie store map
  • Loading branch information
bzablocki authored and acrites committed Jul 17, 2024
1 parent 2a49415 commit d39ff2b
Show file tree
Hide file tree
Showing 7 changed files with 743 additions and 0 deletions.
4 changes: 4 additions & 0 deletions sdks/java/io/solace/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ dependencies {
implementation library.java.google_api_common
implementation library.java.gax
implementation library.java.threetenbp
implementation library.java.google_http_client
implementation library.java.google_http_client_gson
implementation library.java.jackson_core
implementation library.java.jackson_databind

testImplementation library.java.junit
testImplementation project(path: ":sdks:java:io:common")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.solace.broker;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.api.client.http.HttpRequestFactory;
import com.solacesystems.jcsmp.JCSMPFactory;
import java.io.IOException;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.io.solace.data.Semp.Queue;
import org.apache.beam.sdk.util.SerializableSupplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* A class that manages REST calls to the Solace Element Management Protocol (SEMP) using basic
* authentication.
*
* <p>This class provides methods to check necessary information, such as if the queue is
* non-exclusive, remaining backlog bytes of a queue. It can also create and execute calls to create
* queue for a topic.
*/
@Internal
public class BasicAuthSempClient implements SempClient {
private static final Logger LOG = LoggerFactory.getLogger(BasicAuthSempClient.class);
private final ObjectMapper objectMapper =
new ObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);

private final SempBasicAuthClientExecutor sempBasicAuthClientExecutor;

public BasicAuthSempClient(
String host,
String username,
String password,
String vpnName,
SerializableSupplier<HttpRequestFactory> httpRequestFactorySupplier) {
sempBasicAuthClientExecutor =
new SempBasicAuthClientExecutor(
host, username, password, vpnName, httpRequestFactorySupplier.get());
}

@Override
public boolean isQueueNonExclusive(String queueName) throws IOException {
LOG.info("SolaceIO.Read: SempOperations: query SEMP if queue {} is nonExclusive", queueName);
BrokerResponse response = sempBasicAuthClientExecutor.getQueueResponse(queueName);
if (response.content == null) {
throw new IOException("SolaceIO: response from SEMP is empty!");
}
Queue q = mapJsonToClass(response.content, Queue.class);
return q.data().accessType().equals("non-exclusive");
}

@Override
public com.solacesystems.jcsmp.Queue createQueueForTopic(String queueName, String topicName)
throws IOException {
createQueue(queueName);
createSubscription(queueName, topicName);
return JCSMPFactory.onlyInstance().createQueue(queueName);
}

@Override
public long getBacklogBytes(String queueName) throws IOException {
BrokerResponse response = sempBasicAuthClientExecutor.getQueueResponse(queueName);
if (response.content == null) {
throw new IOException("SolaceIO: response from SEMP is empty!");
}
Queue q = mapJsonToClass(response.content, Queue.class);
return q.data().msgSpoolUsage();
}

private void createQueue(String queueName) throws IOException {
LOG.info("SolaceIO.Read: Creating new queue {}.", queueName);
sempBasicAuthClientExecutor.createQueueResponse(queueName);
}

private void createSubscription(String queueName, String topicName) throws IOException {
LOG.info("SolaceIO.Read: Creating new subscription {} for topic {}.", queueName, topicName);
sempBasicAuthClientExecutor.createSubscriptionResponse(queueName, topicName);
}

private <T> T mapJsonToClass(String content, Class<T> mapSuccessToClass)
throws JsonProcessingException {
return objectMapper.readValue(content, mapSuccessToClass);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.solace.broker;

import com.google.api.client.http.HttpRequestFactory;
import com.google.api.client.http.javanet.NetHttpTransport;
import com.google.auto.value.AutoValue;
import org.apache.beam.sdk.util.SerializableSupplier;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.checkerframework.checker.nullness.qual.Nullable;

/**
* A factory for creating {@link BasicAuthSempClient} instances.
*
* <p>This factory provides a way to create {@link BasicAuthSempClient} instances with different
* configurations.
*/
@AutoValue
public abstract class BasicAuthSempClientFactory implements SempClientFactory {

abstract String host();

abstract String username();

abstract String password();

abstract String vpnName();

abstract @Nullable SerializableSupplier<HttpRequestFactory> httpRequestFactorySupplier();

public static Builder builder() {
return new AutoValue_BasicAuthSempClientFactory.Builder();
}

@AutoValue.Builder
public abstract static class Builder {
/** Set Solace SEMP host, format: [Protocol://]Host[:Port]. e.g. "http://127.0.0.1:8080" */
public abstract Builder host(String host);

/** Set Solace username. */
public abstract Builder username(String username);
/** Set Solace password. */
public abstract Builder password(String password);

/** Set Solace vpn name. */
public abstract Builder vpnName(String vpnName);

@VisibleForTesting
abstract Builder httpRequestFactorySupplier(
SerializableSupplier<HttpRequestFactory> httpRequestFactorySupplier);

public abstract BasicAuthSempClientFactory build();
}

@Override
public SempClient create() {
return new BasicAuthSempClient(
host(), username(), password(), vpnName(), getHttpRequestFactorySupplier());
}

SerializableSupplier<HttpRequestFactory> getHttpRequestFactorySupplier() {
SerializableSupplier<HttpRequestFactory> httpRequestSupplier = httpRequestFactorySupplier();
return httpRequestSupplier != null
? httpRequestSupplier
: () -> new NetHttpTransport().createRequestFactory();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.solace.broker;

import com.google.api.client.http.HttpResponse;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.util.stream.Collectors;
import org.checkerframework.checker.nullness.qual.Nullable;

public class BrokerResponse {
final int code;
final String message;
@Nullable String content;

public BrokerResponse(int responseCode, String message, @Nullable InputStream content) {
this.code = responseCode;
this.message = message;
if (content != null) {
this.content =
new BufferedReader(new InputStreamReader(content, StandardCharsets.UTF_8))
.lines()
.collect(Collectors.joining("\n"));
}
}

public static BrokerResponse fromHttpResponse(HttpResponse response) throws IOException {
return new BrokerResponse(
response.getStatusCode(), response.getStatusMessage(), response.getContent());
}

@Override
public String toString() {
return "BrokerResponse{"
+ "code="
+ code
+ ", message='"
+ message
+ '\''
+ ", content="
+ content
+ '}';
}
}
Loading

0 comments on commit d39ff2b

Please sign in to comment.