Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Solace Read connector: adding implementations of SempClient and SempClientFactory #31542

Merged
merged 2 commits into from
Jul 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions sdks/java/io/solace/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ dependencies {
implementation library.java.google_api_common
implementation library.java.gax
implementation library.java.threetenbp
implementation library.java.google_http_client
implementation library.java.google_http_client_gson
implementation library.java.jackson_core
implementation library.java.jackson_databind

testImplementation library.java.junit
testImplementation project(path: ":sdks:java:io:common")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.solace.broker;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.api.client.http.HttpRequestFactory;
import com.solacesystems.jcsmp.JCSMPFactory;
import java.io.IOException;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.io.solace.data.Semp.Queue;
import org.apache.beam.sdk.util.SerializableSupplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* A class that manages REST calls to the Solace Element Management Protocol (SEMP) using basic
* authentication.
*
* <p>This class provides methods to check necessary information, such as if the queue is
* non-exclusive, remaining backlog bytes of a queue. It can also create and execute calls to create
* queue for a topic.
*/
@Internal
public class BasicAuthSempClient implements SempClient {
private static final Logger LOG = LoggerFactory.getLogger(BasicAuthSempClient.class);
private final ObjectMapper objectMapper =
new ObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);

private final SempBasicAuthClientExecutor sempBasicAuthClientExecutor;

public BasicAuthSempClient(
String host,
String username,
String password,
String vpnName,
SerializableSupplier<HttpRequestFactory> httpRequestFactorySupplier) {
sempBasicAuthClientExecutor =
new SempBasicAuthClientExecutor(
host, username, password, vpnName, httpRequestFactorySupplier.get());
}

@Override
public boolean isQueueNonExclusive(String queueName) throws IOException {
LOG.info("SolaceIO.Read: SempOperations: query SEMP if queue {} is nonExclusive", queueName);
BrokerResponse response = sempBasicAuthClientExecutor.getQueueResponse(queueName);
if (response.content == null) {
throw new IOException("SolaceIO: response from SEMP is empty!");
}
Queue q = mapJsonToClass(response.content, Queue.class);
return q.data().accessType().equals("non-exclusive");
}

@Override
public com.solacesystems.jcsmp.Queue createQueueForTopic(String queueName, String topicName)
throws IOException {
createQueue(queueName);
createSubscription(queueName, topicName);
return JCSMPFactory.onlyInstance().createQueue(queueName);
}

@Override
public long getBacklogBytes(String queueName) throws IOException {
BrokerResponse response = sempBasicAuthClientExecutor.getQueueResponse(queueName);
if (response.content == null) {
throw new IOException("SolaceIO: response from SEMP is empty!");
}
Queue q = mapJsonToClass(response.content, Queue.class);
return q.data().msgSpoolUsage();
}

private void createQueue(String queueName) throws IOException {
LOG.info("SolaceIO.Read: Creating new queue {}.", queueName);
sempBasicAuthClientExecutor.createQueueResponse(queueName);
}

private void createSubscription(String queueName, String topicName) throws IOException {
LOG.info("SolaceIO.Read: Creating new subscription {} for topic {}.", queueName, topicName);
sempBasicAuthClientExecutor.createSubscriptionResponse(queueName, topicName);
}

private <T> T mapJsonToClass(String content, Class<T> mapSuccessToClass)
throws JsonProcessingException {
return objectMapper.readValue(content, mapSuccessToClass);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.solace.broker;

import com.google.api.client.http.HttpRequestFactory;
import com.google.api.client.http.javanet.NetHttpTransport;
import com.google.auto.value.AutoValue;
import org.apache.beam.sdk.util.SerializableSupplier;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.checkerframework.checker.nullness.qual.Nullable;

/**
* A factory for creating {@link BasicAuthSempClient} instances.
*
* <p>This factory provides a way to create {@link BasicAuthSempClient} instances with different
* configurations.
*/
@AutoValue
public abstract class BasicAuthSempClientFactory implements SempClientFactory {

abstract String host();

abstract String username();

abstract String password();

abstract String vpnName();

abstract @Nullable SerializableSupplier<HttpRequestFactory> httpRequestFactorySupplier();

public static Builder builder() {
return new AutoValue_BasicAuthSempClientFactory.Builder();
}

@AutoValue.Builder
public abstract static class Builder {
/** Set Solace SEMP host, format: [Protocol://]Host[:Port]. e.g. "http://127.0.0.1:8080" */
public abstract Builder host(String host);

/** Set Solace username. */
public abstract Builder username(String username);
/** Set Solace password. */
public abstract Builder password(String password);

/** Set Solace vpn name. */
public abstract Builder vpnName(String vpnName);

@VisibleForTesting
abstract Builder httpRequestFactorySupplier(
SerializableSupplier<HttpRequestFactory> httpRequestFactorySupplier);

public abstract BasicAuthSempClientFactory build();
}

@Override
public SempClient create() {
return new BasicAuthSempClient(
host(), username(), password(), vpnName(), getHttpRequestFactorySupplier());
}

SerializableSupplier<HttpRequestFactory> getHttpRequestFactorySupplier() {
SerializableSupplier<HttpRequestFactory> httpRequestSupplier = httpRequestFactorySupplier();
return httpRequestSupplier != null
? httpRequestSupplier
: () -> new NetHttpTransport().createRequestFactory();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.solace.broker;

import com.google.api.client.http.HttpResponse;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.util.stream.Collectors;
import org.checkerframework.checker.nullness.qual.Nullable;

public class BrokerResponse {
final int code;
final String message;
@Nullable String content;

public BrokerResponse(int responseCode, String message, @Nullable InputStream content) {
this.code = responseCode;
this.message = message;
if (content != null) {
this.content =
new BufferedReader(new InputStreamReader(content, StandardCharsets.UTF_8))
.lines()
.collect(Collectors.joining("\n"));
}
}

public static BrokerResponse fromHttpResponse(HttpResponse response) throws IOException {
return new BrokerResponse(
response.getStatusCode(), response.getStatusMessage(), response.getContent());
}

@Override
public String toString() {
return "BrokerResponse{"
+ "code="
+ code
+ ", message='"
+ message
+ '\''
+ ", content="
+ content
+ '}';
}
}
Loading
Loading