-
Notifications
You must be signed in to change notification settings - Fork 4.3k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Solace Read connector: adding implementations of SempClient and SempC…
…lientFactory (#31542) * Adding implementations of SempClient and SempClientFactory * Use core SerializableSupplier, remove SuppressWarnings, extract number to a variable, cookie store map
- Loading branch information
Showing
7 changed files
with
743 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
102 changes: 102 additions & 0 deletions
102
...ava/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/BasicAuthSempClient.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,102 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
package org.apache.beam.sdk.io.solace.broker; | ||
|
||
import com.fasterxml.jackson.core.JsonProcessingException; | ||
import com.fasterxml.jackson.databind.DeserializationFeature; | ||
import com.fasterxml.jackson.databind.ObjectMapper; | ||
import com.google.api.client.http.HttpRequestFactory; | ||
import com.solacesystems.jcsmp.JCSMPFactory; | ||
import java.io.IOException; | ||
import org.apache.beam.sdk.annotations.Internal; | ||
import org.apache.beam.sdk.io.solace.data.Semp.Queue; | ||
import org.apache.beam.sdk.util.SerializableSupplier; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
/** | ||
* A class that manages REST calls to the Solace Element Management Protocol (SEMP) using basic | ||
* authentication. | ||
* | ||
* <p>This class provides methods to check necessary information, such as if the queue is | ||
* non-exclusive, remaining backlog bytes of a queue. It can also create and execute calls to create | ||
* queue for a topic. | ||
*/ | ||
@Internal | ||
public class BasicAuthSempClient implements SempClient { | ||
private static final Logger LOG = LoggerFactory.getLogger(BasicAuthSempClient.class); | ||
private final ObjectMapper objectMapper = | ||
new ObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); | ||
|
||
private final SempBasicAuthClientExecutor sempBasicAuthClientExecutor; | ||
|
||
public BasicAuthSempClient( | ||
String host, | ||
String username, | ||
String password, | ||
String vpnName, | ||
SerializableSupplier<HttpRequestFactory> httpRequestFactorySupplier) { | ||
sempBasicAuthClientExecutor = | ||
new SempBasicAuthClientExecutor( | ||
host, username, password, vpnName, httpRequestFactorySupplier.get()); | ||
} | ||
|
||
@Override | ||
public boolean isQueueNonExclusive(String queueName) throws IOException { | ||
LOG.info("SolaceIO.Read: SempOperations: query SEMP if queue {} is nonExclusive", queueName); | ||
BrokerResponse response = sempBasicAuthClientExecutor.getQueueResponse(queueName); | ||
if (response.content == null) { | ||
throw new IOException("SolaceIO: response from SEMP is empty!"); | ||
} | ||
Queue q = mapJsonToClass(response.content, Queue.class); | ||
return q.data().accessType().equals("non-exclusive"); | ||
} | ||
|
||
@Override | ||
public com.solacesystems.jcsmp.Queue createQueueForTopic(String queueName, String topicName) | ||
throws IOException { | ||
createQueue(queueName); | ||
createSubscription(queueName, topicName); | ||
return JCSMPFactory.onlyInstance().createQueue(queueName); | ||
} | ||
|
||
@Override | ||
public long getBacklogBytes(String queueName) throws IOException { | ||
BrokerResponse response = sempBasicAuthClientExecutor.getQueueResponse(queueName); | ||
if (response.content == null) { | ||
throw new IOException("SolaceIO: response from SEMP is empty!"); | ||
} | ||
Queue q = mapJsonToClass(response.content, Queue.class); | ||
return q.data().msgSpoolUsage(); | ||
} | ||
|
||
private void createQueue(String queueName) throws IOException { | ||
LOG.info("SolaceIO.Read: Creating new queue {}.", queueName); | ||
sempBasicAuthClientExecutor.createQueueResponse(queueName); | ||
} | ||
|
||
private void createSubscription(String queueName, String topicName) throws IOException { | ||
LOG.info("SolaceIO.Read: Creating new subscription {} for topic {}.", queueName, topicName); | ||
sempBasicAuthClientExecutor.createSubscriptionResponse(queueName, topicName); | ||
} | ||
|
||
private <T> T mapJsonToClass(String content, Class<T> mapSuccessToClass) | ||
throws JsonProcessingException { | ||
return objectMapper.readValue(content, mapSuccessToClass); | ||
} | ||
} |
82 changes: 82 additions & 0 deletions
82
...solace/src/main/java/org/apache/beam/sdk/io/solace/broker/BasicAuthSempClientFactory.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,82 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
package org.apache.beam.sdk.io.solace.broker; | ||
|
||
import com.google.api.client.http.HttpRequestFactory; | ||
import com.google.api.client.http.javanet.NetHttpTransport; | ||
import com.google.auto.value.AutoValue; | ||
import org.apache.beam.sdk.util.SerializableSupplier; | ||
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; | ||
import org.checkerframework.checker.nullness.qual.Nullable; | ||
|
||
/** | ||
* A factory for creating {@link BasicAuthSempClient} instances. | ||
* | ||
* <p>This factory provides a way to create {@link BasicAuthSempClient} instances with different | ||
* configurations. | ||
*/ | ||
@AutoValue | ||
public abstract class BasicAuthSempClientFactory implements SempClientFactory { | ||
|
||
abstract String host(); | ||
|
||
abstract String username(); | ||
|
||
abstract String password(); | ||
|
||
abstract String vpnName(); | ||
|
||
abstract @Nullable SerializableSupplier<HttpRequestFactory> httpRequestFactorySupplier(); | ||
|
||
public static Builder builder() { | ||
return new AutoValue_BasicAuthSempClientFactory.Builder(); | ||
} | ||
|
||
@AutoValue.Builder | ||
public abstract static class Builder { | ||
/** Set Solace SEMP host, format: [Protocol://]Host[:Port]. e.g. "http://127.0.0.1:8080" */ | ||
public abstract Builder host(String host); | ||
|
||
/** Set Solace username. */ | ||
public abstract Builder username(String username); | ||
/** Set Solace password. */ | ||
public abstract Builder password(String password); | ||
|
||
/** Set Solace vpn name. */ | ||
public abstract Builder vpnName(String vpnName); | ||
|
||
@VisibleForTesting | ||
abstract Builder httpRequestFactorySupplier( | ||
SerializableSupplier<HttpRequestFactory> httpRequestFactorySupplier); | ||
|
||
public abstract BasicAuthSempClientFactory build(); | ||
} | ||
|
||
@Override | ||
public SempClient create() { | ||
return new BasicAuthSempClient( | ||
host(), username(), password(), vpnName(), getHttpRequestFactorySupplier()); | ||
} | ||
|
||
SerializableSupplier<HttpRequestFactory> getHttpRequestFactorySupplier() { | ||
SerializableSupplier<HttpRequestFactory> httpRequestSupplier = httpRequestFactorySupplier(); | ||
return httpRequestSupplier != null | ||
? httpRequestSupplier | ||
: () -> new NetHttpTransport().createRequestFactory(); | ||
} | ||
} |
62 changes: 62 additions & 0 deletions
62
sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/BrokerResponse.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,62 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
package org.apache.beam.sdk.io.solace.broker; | ||
|
||
import com.google.api.client.http.HttpResponse; | ||
import java.io.BufferedReader; | ||
import java.io.IOException; | ||
import java.io.InputStream; | ||
import java.io.InputStreamReader; | ||
import java.nio.charset.StandardCharsets; | ||
import java.util.stream.Collectors; | ||
import org.checkerframework.checker.nullness.qual.Nullable; | ||
|
||
public class BrokerResponse { | ||
final int code; | ||
final String message; | ||
@Nullable String content; | ||
|
||
public BrokerResponse(int responseCode, String message, @Nullable InputStream content) { | ||
this.code = responseCode; | ||
this.message = message; | ||
if (content != null) { | ||
this.content = | ||
new BufferedReader(new InputStreamReader(content, StandardCharsets.UTF_8)) | ||
.lines() | ||
.collect(Collectors.joining("\n")); | ||
} | ||
} | ||
|
||
public static BrokerResponse fromHttpResponse(HttpResponse response) throws IOException { | ||
return new BrokerResponse( | ||
response.getStatusCode(), response.getStatusMessage(), response.getContent()); | ||
} | ||
|
||
@Override | ||
public String toString() { | ||
return "BrokerResponse{" | ||
+ "code=" | ||
+ code | ||
+ ", message='" | ||
+ message | ||
+ '\'' | ||
+ ", content=" | ||
+ content | ||
+ '}'; | ||
} | ||
} |
Oops, something went wrong.