Skip to content

Commit

Permalink
feat: add onConnectionLost and onConnectionRestored callback functions (
Browse files Browse the repository at this point in the history
  • Loading branch information
rishtigupta authored Dec 15, 2023
1 parent 4460c67 commit 51ca3b4
Show file tree
Hide file tree
Showing 7 changed files with 256 additions and 8 deletions.
34 changes: 34 additions & 0 deletions momento-sdk/src/main/java/momento/sdk/IScsTopicConnection.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package momento.sdk;

import grpc.cache_client.pubsub._SubscriptionItem;
import grpc.cache_client.pubsub._SubscriptionRequest;

/** Represents a connection to an ScsTopic for subscribing to events. */
interface IScsTopicConnection {

/**
* Closes the connection.
*
* <p>Note: This method is intended for testing purposes and should never be called from outside
* of tests.
*/
void close();

/**
* Opens the connection.
*
* <p>Note: This method is intended for testing purposes and should never be called from outside
* of tests.
*/
void open();

/**
* Subscribes to a specific topic using the provided subscription request and observer.
*
* @param subscriptionRequest The subscription request containing details about the subscription.
* @param subscription The observer to handle incoming subscription items.
*/
void subscribe(
_SubscriptionRequest subscriptionRequest,
CancelableClientCallStreamObserver<_SubscriptionItem> subscription);
}
13 changes: 13 additions & 0 deletions momento-sdk/src/main/java/momento/sdk/ISubscriptionCallbacks.java
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
package momento.sdk;

import momento.sdk.responses.topic.TopicMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/** Represents options for a topic subscription callback. */
public interface ISubscriptionCallbacks {
Logger logger = LoggerFactory.getLogger(SubscriptionWrapper.class);
/**
* Called when a new message is received on the subscribed topic.
*
Expand All @@ -20,4 +23,14 @@ public interface ISubscriptionCallbacks {
* @param t The throwable representing the error.
*/
void onError(Throwable t);

/** Called when the connection to the topic is lost. */
default void onConnectionLost() {
logger.info("Connection to topic lost");
}

/** Called when the connection to the topic is restored. */
default void onConnectionRestored() {
logger.info("Connection to topic restored");
}
}
30 changes: 29 additions & 1 deletion momento-sdk/src/main/java/momento/sdk/ScsTopicClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

import com.google.protobuf.ByteString;
import grpc.cache_client.pubsub._PublishRequest;
import grpc.cache_client.pubsub._SubscriptionItem;
import grpc.cache_client.pubsub._SubscriptionRequest;
import grpc.cache_client.pubsub._TopicValue;
import io.grpc.stub.StreamObserver;
import java.util.concurrent.CompletableFuture;
Expand All @@ -13,9 +15,12 @@
import momento.sdk.internal.SubscriptionState;
import momento.sdk.responses.topic.TopicPublishResponse;
import momento.sdk.responses.topic.TopicSubscribeResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ScsTopicClient extends ScsClient {

private final Logger logger = LoggerFactory.getLogger(ScsTopicClient.class);
private final ScsTopicGrpcStubsManager topicGrpcStubsManager;

public ScsTopicClient(
Expand Down Expand Up @@ -73,6 +78,8 @@ public CompletableFuture<TopicSubscribeResponse> subscribe(
options::onItem,
options::onCompleted,
options::onError,
options::onConnectionLost,
options::onConnectionRestored,
subscriptionState,
subscription);

Expand Down Expand Up @@ -133,7 +140,28 @@ public void onCompleted() {
private CompletableFuture<TopicSubscribeResponse> sendSubscribe(
SendSubscribeOptions sendSubscribeOptions) {
SubscriptionWrapper subscriptionWrapper;
subscriptionWrapper = new SubscriptionWrapper(topicGrpcStubsManager, sendSubscribeOptions);

IScsTopicConnection connection =
new IScsTopicConnection() {
@Override
public void close() {
logger.warn("Closing the connection (for testing purposes only)");
}

@Override
public void open() {
logger.warn("Opening the connection (for testing purposes only)");
}

@Override
public void subscribe(
_SubscriptionRequest subscriptionRequest,
CancelableClientCallStreamObserver<_SubscriptionItem> subscription) {
topicGrpcStubsManager.getStub().subscribe(subscriptionRequest, subscription);
}
};

subscriptionWrapper = new SubscriptionWrapper(connection, sendSubscribeOptions);
final CompletableFuture<Void> subscribeFuture = subscriptionWrapper.subscribeWithRetry();
return subscribeFuture.handle(
(v, ex) -> {
Expand Down
26 changes: 26 additions & 0 deletions momento-sdk/src/main/java/momento/sdk/SendSubscribeOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ class SendSubscribeOptions implements ISubscriptionCallbacks {
ItemCallback onItem;
CompletedCallback onCompleted;
ErrorCallback onError;
ConnectionLostCallback onConnectionLost;
ConnectionRestoredCallback onConnectionRestored;
SubscriptionState subscriptionState;
TopicSubscribeResponse.Subscription subscription;

Expand All @@ -19,13 +21,17 @@ class SendSubscribeOptions implements ISubscriptionCallbacks {
ItemCallback onItem,
CompletedCallback onCompleted,
ErrorCallback onError,
ConnectionLostCallback onConnectionLost,
ConnectionRestoredCallback onConnectionRestored,
SubscriptionState subscriptionState,
TopicSubscribeResponse.Subscription subscription) {
this.cacheName = cacheName;
this.topicName = topicName;
this.onItem = onItem;
this.onCompleted = onCompleted;
this.onError = onError;
this.onConnectionLost = onConnectionLost;
this.onConnectionRestored = onConnectionRestored;
this.subscriptionState = subscriptionState;
this.subscription = subscription;
}
Expand Down Expand Up @@ -73,6 +79,16 @@ public void onError(Throwable t) {
onError.onError(t);
}

@Override
public void onConnectionLost() {
onConnectionLost.onConnectionLost();
}

@Override
public void onConnectionRestored() {
onConnectionRestored.onConnectionRestored();
}

@FunctionalInterface
public interface ItemCallback {
void onItem(TopicMessage message);
Expand All @@ -87,4 +103,14 @@ public interface CompletedCallback {
public interface ErrorCallback {
void onError(Throwable t);
}

@FunctionalInterface
public interface ConnectionLostCallback {
void onConnectionLost();
}

@FunctionalInterface
public interface ConnectionRestoredCallback {
void onConnectionRestored();
}
}
33 changes: 26 additions & 7 deletions momento-sdk/src/main/java/momento/sdk/SubscriptionWrapper.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import grpc.cache_client.pubsub._TopicItem;
import grpc.cache_client.pubsub._TopicValue;
import io.grpc.Status;
import java.io.Closeable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
Expand All @@ -18,21 +17,33 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class SubscriptionWrapper implements Closeable {
class SubscriptionWrapper implements AutoCloseable {
private final Logger logger = LoggerFactory.getLogger(SubscriptionWrapper.class);
private final ScsTopicGrpcStubsManager grpcManager;
private final IScsTopicConnection connection;
private final SendSubscribeOptions options;
private boolean firstMessage = true;
private boolean isConnectionLost = false;

private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();

private CancelableClientCallStreamObserver<_SubscriptionItem> subscription;

SubscriptionWrapper(ScsTopicGrpcStubsManager grpcManager, SendSubscribeOptions options) {
this.grpcManager = grpcManager;
SubscriptionWrapper(IScsTopicConnection connection, SendSubscribeOptions options) {
this.connection = connection;
this.options = options;
}

CompletableFuture<Void> subscribeWithRetry() {
/**
* Public method for testing purposes only. Do not call this method in production code or any
* context other than testing the topic client.
*
* <p>This method returns a CompletableFuture that represents the asynchronous execution of the
* internal subscription logic with retry mechanism.
*
* @return A CompletableFuture representing the asynchronous execution of the internal
* subscription logic with retry mechanism.
*/
public CompletableFuture<Void> subscribeWithRetry() {
CompletableFuture<Void> future = new CompletableFuture<>();
subscribeWithRetryInternal(future);
return future;
Expand All @@ -57,6 +68,10 @@ public void onNext(_SubscriptionItem item) {
firstMessage = false;
future.complete(null);
}
if (isConnectionLost) {
isConnectionLost = false;
options.onConnectionRestored();
}
handleSubscriptionItem(item);
}

Expand All @@ -67,6 +82,10 @@ public void onError(Throwable t) {
future.completeExceptionally(t);
} else {
logger.debug("Subscription failed, retrying...");
if (!isConnectionLost) {
isConnectionLost = true;
options.onConnectionLost();
}
if (t instanceof io.grpc.StatusRuntimeException) {
logger.debug(
"Throwable is an instance of StatusRuntimeException, checking status code...");
Expand Down Expand Up @@ -101,7 +120,7 @@ public void onCompleted() {
.build();

try {
grpcManager.getStub().subscribe(subscriptionRequest, subscription);
connection.subscribe(subscriptionRequest, subscription);
options.subscriptionState.setSubscribed();
} catch (Exception e) {
future.completeExceptionally(
Expand Down
111 changes: 111 additions & 0 deletions momento-sdk/src/test/java/momento/sdk/SubscriptionWrapperTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
package momento.sdk;

import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;

import grpc.cache_client.pubsub._Heartbeat;
import grpc.cache_client.pubsub._SubscriptionItem;
import grpc.cache_client.pubsub._SubscriptionRequest;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean;
import momento.sdk.internal.SubscriptionState;
import momento.sdk.responses.topic.TopicSubscribeResponse;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.MockitoAnnotations;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SubscriptionWrapperTest {
private final Logger logger = LoggerFactory.getLogger(SubscriptionWrapperTest.class);

@BeforeEach
public void setUp() {
MockitoAnnotations.openMocks(this);
}

@Test
public void testConnectionLostAndRestored() throws InterruptedException {
SubscriptionState state = new SubscriptionState();
TopicSubscribeResponse.Subscription subscription =
new TopicSubscribeResponse.Subscription(state);

AtomicBoolean gotConnectionLostCallback = new AtomicBoolean(false);
AtomicBoolean gotConnectionRestoredCallback = new AtomicBoolean(false);

Semaphore waitingForSubscriptionAttempt = new Semaphore(0);

SendSubscribeOptions options =
new SendSubscribeOptions(
"cache",
"topic",
(message) -> {},
() -> {},
(err) -> {},
() -> {
logger.info("Got to our connection lost callback!");
gotConnectionLostCallback.set(true);
},
() -> {
logger.info("Got to our connection restored callback!");
gotConnectionRestoredCallback.set(true);
},
state,
subscription);

IScsTopicConnection connection =
new IScsTopicConnection() {
boolean isOpen = true;
CancelableClientCallStreamObserver<_SubscriptionItem> subscription;

@Override
public void close() {
logger.info("Connection closed");
isOpen = false;
subscription.onError(new StatusRuntimeException(Status.UNAVAILABLE));
}

@Override
public void open() {
logger.info("Connection opened");
isOpen = true;
}

@Override
public void subscribe(
_SubscriptionRequest subscriptionRequest,
CancelableClientCallStreamObserver<_SubscriptionItem> subscription) {
this.subscription = subscription;
if (isOpen) {
_SubscriptionItem heartbeat =
_SubscriptionItem.newBuilder()
.setHeartbeat(_Heartbeat.newBuilder().build())
.build();
subscription.onNext(heartbeat);
} else {
subscription.onError(new StatusRuntimeException(Status.UNAVAILABLE));
}
waitingForSubscriptionAttempt.release();
}
};

SubscriptionWrapper subscriptionWrapper = new SubscriptionWrapper(connection, options);
CompletableFuture<Void> subscribeWithRetryResult = subscriptionWrapper.subscribeWithRetry();
subscribeWithRetryResult.join();

waitingForSubscriptionAttempt.acquire();

connection.close();

assertTrue(gotConnectionLostCallback.get());
assertFalse(gotConnectionRestoredCallback.get());

connection.open();
waitingForSubscriptionAttempt.acquire();

assertTrue(gotConnectionRestoredCallback.get());
}
}
17 changes: 17 additions & 0 deletions momento-sdk/src/test/resources/logback.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE configuration>

<configuration>
<import class="ch.qos.logback.classic.encoder.PatternLayoutEncoder"/>
<import class="ch.qos.logback.core.ConsoleAppender"/>

<appender name="STDOUT" class="ConsoleAppender">
<encoder class="PatternLayoutEncoder">
<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} -%kvp- %msg%n</pattern>
</encoder>
</appender>

<root level="debug">
<appender-ref ref="STDOUT"/>
</root>
</configuration>

0 comments on commit 51ca3b4

Please sign in to comment.