diff --git a/momento-sdk/src/main/java/momento/sdk/IScsTopicConnection.java b/momento-sdk/src/main/java/momento/sdk/IScsTopicConnection.java
new file mode 100644
index 00000000..763af473
--- /dev/null
+++ b/momento-sdk/src/main/java/momento/sdk/IScsTopicConnection.java
@@ -0,0 +1,34 @@
+package momento.sdk;
+
+import grpc.cache_client.pubsub._SubscriptionItem;
+import grpc.cache_client.pubsub._SubscriptionRequest;
+
+/** Represents a connection to an ScsTopic for subscribing to events. */
+interface IScsTopicConnection {
+
+ /**
+ * Closes the connection.
+ *
+ *
Note: This method is intended for testing purposes and should never be called from outside
+ * of tests.
+ */
+ void close();
+
+ /**
+ * Opens the connection.
+ *
+ *
Note: This method is intended for testing purposes and should never be called from outside
+ * of tests.
+ */
+ void open();
+
+ /**
+ * Subscribes to a specific topic using the provided subscription request and observer.
+ *
+ * @param subscriptionRequest The subscription request containing details about the subscription.
+ * @param subscription The observer to handle incoming subscription items.
+ */
+ void subscribe(
+ _SubscriptionRequest subscriptionRequest,
+ CancelableClientCallStreamObserver<_SubscriptionItem> subscription);
+}
diff --git a/momento-sdk/src/main/java/momento/sdk/ISubscriptionCallbacks.java b/momento-sdk/src/main/java/momento/sdk/ISubscriptionCallbacks.java
index 2a42bc25..41af0c95 100644
--- a/momento-sdk/src/main/java/momento/sdk/ISubscriptionCallbacks.java
+++ b/momento-sdk/src/main/java/momento/sdk/ISubscriptionCallbacks.java
@@ -1,9 +1,12 @@
package momento.sdk;
import momento.sdk.responses.topic.TopicMessage;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/** Represents options for a topic subscription callback. */
public interface ISubscriptionCallbacks {
+ Logger logger = LoggerFactory.getLogger(SubscriptionWrapper.class);
/**
* Called when a new message is received on the subscribed topic.
*
@@ -20,4 +23,14 @@ public interface ISubscriptionCallbacks {
* @param t The throwable representing the error.
*/
void onError(Throwable t);
+
+ /** Called when the connection to the topic is lost. */
+ default void onConnectionLost() {
+ logger.info("Connection to topic lost");
+ }
+
+ /** Called when the connection to the topic is restored. */
+ default void onConnectionRestored() {
+ logger.info("Connection to topic restored");
+ }
}
diff --git a/momento-sdk/src/main/java/momento/sdk/ScsTopicClient.java b/momento-sdk/src/main/java/momento/sdk/ScsTopicClient.java
index c9de5755..43738e9f 100644
--- a/momento-sdk/src/main/java/momento/sdk/ScsTopicClient.java
+++ b/momento-sdk/src/main/java/momento/sdk/ScsTopicClient.java
@@ -2,6 +2,8 @@
import com.google.protobuf.ByteString;
import grpc.cache_client.pubsub._PublishRequest;
+import grpc.cache_client.pubsub._SubscriptionItem;
+import grpc.cache_client.pubsub._SubscriptionRequest;
import grpc.cache_client.pubsub._TopicValue;
import io.grpc.stub.StreamObserver;
import java.util.concurrent.CompletableFuture;
@@ -13,9 +15,12 @@
import momento.sdk.internal.SubscriptionState;
import momento.sdk.responses.topic.TopicPublishResponse;
import momento.sdk.responses.topic.TopicSubscribeResponse;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class ScsTopicClient extends ScsClient {
+ private final Logger logger = LoggerFactory.getLogger(ScsTopicClient.class);
private final ScsTopicGrpcStubsManager topicGrpcStubsManager;
public ScsTopicClient(
@@ -73,6 +78,8 @@ public CompletableFuture subscribe(
options::onItem,
options::onCompleted,
options::onError,
+ options::onConnectionLost,
+ options::onConnectionRestored,
subscriptionState,
subscription);
@@ -133,7 +140,28 @@ public void onCompleted() {
private CompletableFuture sendSubscribe(
SendSubscribeOptions sendSubscribeOptions) {
SubscriptionWrapper subscriptionWrapper;
- subscriptionWrapper = new SubscriptionWrapper(topicGrpcStubsManager, sendSubscribeOptions);
+
+ IScsTopicConnection connection =
+ new IScsTopicConnection() {
+ @Override
+ public void close() {
+ logger.warn("Closing the connection (for testing purposes only)");
+ }
+
+ @Override
+ public void open() {
+ logger.warn("Opening the connection (for testing purposes only)");
+ }
+
+ @Override
+ public void subscribe(
+ _SubscriptionRequest subscriptionRequest,
+ CancelableClientCallStreamObserver<_SubscriptionItem> subscription) {
+ topicGrpcStubsManager.getStub().subscribe(subscriptionRequest, subscription);
+ }
+ };
+
+ subscriptionWrapper = new SubscriptionWrapper(connection, sendSubscribeOptions);
final CompletableFuture subscribeFuture = subscriptionWrapper.subscribeWithRetry();
return subscribeFuture.handle(
(v, ex) -> {
diff --git a/momento-sdk/src/main/java/momento/sdk/SendSubscribeOptions.java b/momento-sdk/src/main/java/momento/sdk/SendSubscribeOptions.java
index d00aff4a..4e426393 100644
--- a/momento-sdk/src/main/java/momento/sdk/SendSubscribeOptions.java
+++ b/momento-sdk/src/main/java/momento/sdk/SendSubscribeOptions.java
@@ -10,6 +10,8 @@ class SendSubscribeOptions implements ISubscriptionCallbacks {
ItemCallback onItem;
CompletedCallback onCompleted;
ErrorCallback onError;
+ ConnectionLostCallback onConnectionLost;
+ ConnectionRestoredCallback onConnectionRestored;
SubscriptionState subscriptionState;
TopicSubscribeResponse.Subscription subscription;
@@ -19,6 +21,8 @@ class SendSubscribeOptions implements ISubscriptionCallbacks {
ItemCallback onItem,
CompletedCallback onCompleted,
ErrorCallback onError,
+ ConnectionLostCallback onConnectionLost,
+ ConnectionRestoredCallback onConnectionRestored,
SubscriptionState subscriptionState,
TopicSubscribeResponse.Subscription subscription) {
this.cacheName = cacheName;
@@ -26,6 +30,8 @@ class SendSubscribeOptions implements ISubscriptionCallbacks {
this.onItem = onItem;
this.onCompleted = onCompleted;
this.onError = onError;
+ this.onConnectionLost = onConnectionLost;
+ this.onConnectionRestored = onConnectionRestored;
this.subscriptionState = subscriptionState;
this.subscription = subscription;
}
@@ -73,6 +79,16 @@ public void onError(Throwable t) {
onError.onError(t);
}
+ @Override
+ public void onConnectionLost() {
+ onConnectionLost.onConnectionLost();
+ }
+
+ @Override
+ public void onConnectionRestored() {
+ onConnectionRestored.onConnectionRestored();
+ }
+
@FunctionalInterface
public interface ItemCallback {
void onItem(TopicMessage message);
@@ -87,4 +103,14 @@ public interface CompletedCallback {
public interface ErrorCallback {
void onError(Throwable t);
}
+
+ @FunctionalInterface
+ public interface ConnectionLostCallback {
+ void onConnectionLost();
+ }
+
+ @FunctionalInterface
+ public interface ConnectionRestoredCallback {
+ void onConnectionRestored();
+ }
}
diff --git a/momento-sdk/src/main/java/momento/sdk/SubscriptionWrapper.java b/momento-sdk/src/main/java/momento/sdk/SubscriptionWrapper.java
index 933ed6d4..3173ae25 100644
--- a/momento-sdk/src/main/java/momento/sdk/SubscriptionWrapper.java
+++ b/momento-sdk/src/main/java/momento/sdk/SubscriptionWrapper.java
@@ -6,7 +6,6 @@
import grpc.cache_client.pubsub._TopicItem;
import grpc.cache_client.pubsub._TopicValue;
import io.grpc.Status;
-import java.io.Closeable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
@@ -18,21 +17,33 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-class SubscriptionWrapper implements Closeable {
+class SubscriptionWrapper implements AutoCloseable {
private final Logger logger = LoggerFactory.getLogger(SubscriptionWrapper.class);
- private final ScsTopicGrpcStubsManager grpcManager;
+ private final IScsTopicConnection connection;
private final SendSubscribeOptions options;
private boolean firstMessage = true;
+ private boolean isConnectionLost = false;
+
private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
private CancelableClientCallStreamObserver<_SubscriptionItem> subscription;
- SubscriptionWrapper(ScsTopicGrpcStubsManager grpcManager, SendSubscribeOptions options) {
- this.grpcManager = grpcManager;
+ SubscriptionWrapper(IScsTopicConnection connection, SendSubscribeOptions options) {
+ this.connection = connection;
this.options = options;
}
- CompletableFuture subscribeWithRetry() {
+ /**
+ * Public method for testing purposes only. Do not call this method in production code or any
+ * context other than testing the topic client.
+ *
+ * This method returns a CompletableFuture that represents the asynchronous execution of the
+ * internal subscription logic with retry mechanism.
+ *
+ * @return A CompletableFuture representing the asynchronous execution of the internal
+ * subscription logic with retry mechanism.
+ */
+ public CompletableFuture subscribeWithRetry() {
CompletableFuture future = new CompletableFuture<>();
subscribeWithRetryInternal(future);
return future;
@@ -57,6 +68,10 @@ public void onNext(_SubscriptionItem item) {
firstMessage = false;
future.complete(null);
}
+ if (isConnectionLost) {
+ isConnectionLost = false;
+ options.onConnectionRestored();
+ }
handleSubscriptionItem(item);
}
@@ -67,6 +82,10 @@ public void onError(Throwable t) {
future.completeExceptionally(t);
} else {
logger.debug("Subscription failed, retrying...");
+ if (!isConnectionLost) {
+ isConnectionLost = true;
+ options.onConnectionLost();
+ }
if (t instanceof io.grpc.StatusRuntimeException) {
logger.debug(
"Throwable is an instance of StatusRuntimeException, checking status code...");
@@ -101,7 +120,7 @@ public void onCompleted() {
.build();
try {
- grpcManager.getStub().subscribe(subscriptionRequest, subscription);
+ connection.subscribe(subscriptionRequest, subscription);
options.subscriptionState.setSubscribed();
} catch (Exception e) {
future.completeExceptionally(
diff --git a/momento-sdk/src/test/java/momento/sdk/SubscriptionWrapperTest.java b/momento-sdk/src/test/java/momento/sdk/SubscriptionWrapperTest.java
new file mode 100644
index 00000000..17d37147
--- /dev/null
+++ b/momento-sdk/src/test/java/momento/sdk/SubscriptionWrapperTest.java
@@ -0,0 +1,111 @@
+package momento.sdk;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import grpc.cache_client.pubsub._Heartbeat;
+import grpc.cache_client.pubsub._SubscriptionItem;
+import grpc.cache_client.pubsub._SubscriptionRequest;
+import io.grpc.Status;
+import io.grpc.StatusRuntimeException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicBoolean;
+import momento.sdk.internal.SubscriptionState;
+import momento.sdk.responses.topic.TopicSubscribeResponse;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.MockitoAnnotations;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SubscriptionWrapperTest {
+ private final Logger logger = LoggerFactory.getLogger(SubscriptionWrapperTest.class);
+
+ @BeforeEach
+ public void setUp() {
+ MockitoAnnotations.openMocks(this);
+ }
+
+ @Test
+ public void testConnectionLostAndRestored() throws InterruptedException {
+ SubscriptionState state = new SubscriptionState();
+ TopicSubscribeResponse.Subscription subscription =
+ new TopicSubscribeResponse.Subscription(state);
+
+ AtomicBoolean gotConnectionLostCallback = new AtomicBoolean(false);
+ AtomicBoolean gotConnectionRestoredCallback = new AtomicBoolean(false);
+
+ Semaphore waitingForSubscriptionAttempt = new Semaphore(0);
+
+ SendSubscribeOptions options =
+ new SendSubscribeOptions(
+ "cache",
+ "topic",
+ (message) -> {},
+ () -> {},
+ (err) -> {},
+ () -> {
+ logger.info("Got to our connection lost callback!");
+ gotConnectionLostCallback.set(true);
+ },
+ () -> {
+ logger.info("Got to our connection restored callback!");
+ gotConnectionRestoredCallback.set(true);
+ },
+ state,
+ subscription);
+
+ IScsTopicConnection connection =
+ new IScsTopicConnection() {
+ boolean isOpen = true;
+ CancelableClientCallStreamObserver<_SubscriptionItem> subscription;
+
+ @Override
+ public void close() {
+ logger.info("Connection closed");
+ isOpen = false;
+ subscription.onError(new StatusRuntimeException(Status.UNAVAILABLE));
+ }
+
+ @Override
+ public void open() {
+ logger.info("Connection opened");
+ isOpen = true;
+ }
+
+ @Override
+ public void subscribe(
+ _SubscriptionRequest subscriptionRequest,
+ CancelableClientCallStreamObserver<_SubscriptionItem> subscription) {
+ this.subscription = subscription;
+ if (isOpen) {
+ _SubscriptionItem heartbeat =
+ _SubscriptionItem.newBuilder()
+ .setHeartbeat(_Heartbeat.newBuilder().build())
+ .build();
+ subscription.onNext(heartbeat);
+ } else {
+ subscription.onError(new StatusRuntimeException(Status.UNAVAILABLE));
+ }
+ waitingForSubscriptionAttempt.release();
+ }
+ };
+
+ SubscriptionWrapper subscriptionWrapper = new SubscriptionWrapper(connection, options);
+ CompletableFuture subscribeWithRetryResult = subscriptionWrapper.subscribeWithRetry();
+ subscribeWithRetryResult.join();
+
+ waitingForSubscriptionAttempt.acquire();
+
+ connection.close();
+
+ assertTrue(gotConnectionLostCallback.get());
+ assertFalse(gotConnectionRestoredCallback.get());
+
+ connection.open();
+ waitingForSubscriptionAttempt.acquire();
+
+ assertTrue(gotConnectionRestoredCallback.get());
+ }
+}
diff --git a/momento-sdk/src/test/resources/logback.xml b/momento-sdk/src/test/resources/logback.xml
new file mode 100644
index 00000000..9fa50918
--- /dev/null
+++ b/momento-sdk/src/test/resources/logback.xml
@@ -0,0 +1,17 @@
+
+
+
+
+
+
+
+
+
+ %d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} -%kvp- %msg%n
+
+
+
+
+
+
+