From 0a1d04790ce11295e234d74232892048e55eeb4a Mon Sep 17 00:00:00 2001 From: rishtigupta Date: Mon, 11 Nov 2024 14:23:54 -0800 Subject: [PATCH] feat: handle discontinuity and heartbeat in topics and add sequence page to topics --- gradle/libs.versions.toml | 2 +- .../java/momento/sdk/TopicClientTest.java | 11 +++++ .../momento/sdk/ISubscriptionCallbacks.java | 11 +++++ .../main/java/momento/sdk/ScsTopicClient.java | 2 + .../momento/sdk/SendSubscribeOptions.java | 35 +++++++++++++++ .../java/momento/sdk/SubscriptionWrapper.java | 20 ++++++++- .../sdk/internal/SubscriptionState.java | 15 +++++++ .../responses/topic/TopicDiscontinuity.java | 43 +++++++++++++++++++ .../momento/sdk/SubscriptionWrapperTest.java | 2 + 9 files changed, 139 insertions(+), 2 deletions(-) create mode 100644 momento-sdk/src/main/java/momento/sdk/responses/topic/TopicDiscontinuity.java diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index 08e52c2b..e3388dac 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -7,7 +7,7 @@ jsr305 = "3.0.2" junit = "5.9.2" gson = "2.10.1" guava = "31.1-android" -java-protos = "0.113.0" +java-protos = "0.119.2" slf4j = "1.7.36" logback = "1.4.8" mockito = "5.4.0" diff --git a/momento-sdk/src/intTest/java/momento/sdk/TopicClientTest.java b/momento-sdk/src/intTest/java/momento/sdk/TopicClientTest.java index 9aabdabb..065b7269 100644 --- a/momento-sdk/src/intTest/java/momento/sdk/TopicClientTest.java +++ b/momento-sdk/src/intTest/java/momento/sdk/TopicClientTest.java @@ -9,6 +9,7 @@ import java.util.concurrent.CountDownLatch; import momento.sdk.config.TopicConfigurations; import momento.sdk.exceptions.MomentoErrorCode; +import momento.sdk.responses.topic.TopicDiscontinuity; import momento.sdk.responses.topic.TopicMessage; import momento.sdk.responses.topic.TopicPublishResponse; import momento.sdk.responses.topic.TopicSubscribeResponse; @@ -61,6 +62,16 @@ public void onCompleted() { public void onError(Throwable t) { logger.info("onError Invoked"); } + + @Override + public void onDiscontinuity(TopicDiscontinuity discontinuity) { + logger.info("onDiscontinuity Invoked"); + } + + @Override + public void onHeartbeat() { + logger.info("onHeartbeat Invoked"); + } }; } diff --git a/momento-sdk/src/main/java/momento/sdk/ISubscriptionCallbacks.java b/momento-sdk/src/main/java/momento/sdk/ISubscriptionCallbacks.java index 41af0c95..8ab35f06 100644 --- a/momento-sdk/src/main/java/momento/sdk/ISubscriptionCallbacks.java +++ b/momento-sdk/src/main/java/momento/sdk/ISubscriptionCallbacks.java @@ -1,5 +1,6 @@ package momento.sdk; +import momento.sdk.responses.topic.TopicDiscontinuity; import momento.sdk.responses.topic.TopicMessage; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -24,6 +25,16 @@ public interface ISubscriptionCallbacks { */ void onError(Throwable t); + /** Called when a discontinuity occurs during the subscription. */ + default void onDiscontinuity(TopicDiscontinuity discontinuity) { + logger.info("Discontinuity occurred: {}", discontinuity); + }; + + /** Called when a heartbeat is received during the subscription. */ + default void onHeartbeat() { + logger.info("Heartbeat received"); + }; + /** Called when the connection to the topic is lost. */ default void onConnectionLost() { logger.info("Connection to topic lost"); diff --git a/momento-sdk/src/main/java/momento/sdk/ScsTopicClient.java b/momento-sdk/src/main/java/momento/sdk/ScsTopicClient.java index 8cfb5e72..d12d4f59 100644 --- a/momento-sdk/src/main/java/momento/sdk/ScsTopicClient.java +++ b/momento-sdk/src/main/java/momento/sdk/ScsTopicClient.java @@ -79,6 +79,8 @@ public CompletableFuture subscribe( options::onItem, options::onCompleted, options::onError, + options::onDiscontinuity , + options::onHeartbeat, options::onConnectionLost, options::onConnectionRestored, subscriptionState, diff --git a/momento-sdk/src/main/java/momento/sdk/SendSubscribeOptions.java b/momento-sdk/src/main/java/momento/sdk/SendSubscribeOptions.java index 4e426393..936b673e 100644 --- a/momento-sdk/src/main/java/momento/sdk/SendSubscribeOptions.java +++ b/momento-sdk/src/main/java/momento/sdk/SendSubscribeOptions.java @@ -1,6 +1,7 @@ package momento.sdk; import momento.sdk.internal.SubscriptionState; +import momento.sdk.responses.topic.TopicDiscontinuity; import momento.sdk.responses.topic.TopicMessage; import momento.sdk.responses.topic.TopicSubscribeResponse; @@ -10,6 +11,8 @@ class SendSubscribeOptions implements ISubscriptionCallbacks { ItemCallback onItem; CompletedCallback onCompleted; ErrorCallback onError; + DiscontinuityCallback onDiscontinuity; + HeartbeatCallback onHeartbeat; ConnectionLostCallback onConnectionLost; ConnectionRestoredCallback onConnectionRestored; SubscriptionState subscriptionState; @@ -21,6 +24,8 @@ class SendSubscribeOptions implements ISubscriptionCallbacks { ItemCallback onItem, CompletedCallback onCompleted, ErrorCallback onError, + DiscontinuityCallback onDiscontinuity, + HeartbeatCallback onHeartbeat, ConnectionLostCallback onConnectionLost, ConnectionRestoredCallback onConnectionRestored, SubscriptionState subscriptionState, @@ -30,6 +35,8 @@ class SendSubscribeOptions implements ISubscriptionCallbacks { this.onItem = onItem; this.onCompleted = onCompleted; this.onError = onError; + this.onDiscontinuity = onDiscontinuity; + this.onHeartbeat = onHeartbeat; this.onConnectionLost = onConnectionLost; this.onConnectionRestored = onConnectionRestored; this.subscriptionState = subscriptionState; @@ -56,6 +63,14 @@ public ErrorCallback getOnError() { return onError; } + public DiscontinuityCallback getOnDiscontinuity() { + return onDiscontinuity; + } + + public HeartbeatCallback getOnHeartbeat() { + return onHeartbeat; + } + public SubscriptionState getSubscriptionState() { return subscriptionState; } @@ -79,6 +94,16 @@ public void onError(Throwable t) { onError.onError(t); } + @Override + public void onDiscontinuity(TopicDiscontinuity discontinuity) { + onDiscontinuity.onDiscontinuity(discontinuity); + } + + @Override + public void onHeartbeat() { + onHeartbeat.onHeartbeat(); + } + @Override public void onConnectionLost() { onConnectionLost.onConnectionLost(); @@ -104,6 +129,16 @@ public interface ErrorCallback { void onError(Throwable t); } + @FunctionalInterface + public interface DiscontinuityCallback { + void onDiscontinuity(TopicDiscontinuity discontinuity); + } + + @FunctionalInterface + public interface HeartbeatCallback { + void onHeartbeat(); + } + @FunctionalInterface public interface ConnectionLostCallback { void onConnectionLost(); diff --git a/momento-sdk/src/main/java/momento/sdk/SubscriptionWrapper.java b/momento-sdk/src/main/java/momento/sdk/SubscriptionWrapper.java index 3173ae25..a8500481 100644 --- a/momento-sdk/src/main/java/momento/sdk/SubscriptionWrapper.java +++ b/momento-sdk/src/main/java/momento/sdk/SubscriptionWrapper.java @@ -12,6 +12,7 @@ import java.util.concurrent.TimeUnit; import momento.sdk.exceptions.CacheServiceExceptionMapper; import momento.sdk.exceptions.InternalServerException; +import momento.sdk.responses.topic.TopicDiscontinuity; import momento.sdk.responses.topic.TopicMessage; import momento.sdk.responses.topic.TopicSubscribeResponse; import org.slf4j.Logger; @@ -117,6 +118,7 @@ public void onCompleted() { .setTopic(options.getTopicName()) .setResumeAtTopicSequenceNumber( options.subscriptionState.getResumeAtTopicSequenceNumber()) + .setSequencePage(options.subscriptionState.getResumeAtTopicSequencePage()) .build(); try { @@ -155,15 +157,21 @@ private void handleSubscriptionItem(_SubscriptionItem item) { private void handleSubscriptionDiscontinuity(_SubscriptionItem discontinuityItem) { logger.debug( - "{}, {}, {}, {}", + "discontinuity {}, {}, {}, {}, {}, {}", options.getCacheName(), options.getTopicName(), discontinuityItem.getDiscontinuity().getLastTopicSequence(), + discontinuityItem.getDiscontinuity().getNewTopicSequence(), + discontinuityItem.getDiscontinuity().getLastTopicSequence(), discontinuityItem.getDiscontinuity().getNewTopicSequence()); + + options.onDiscontinuity(new TopicDiscontinuity((int) discontinuityItem.getDiscontinuity().getLastTopicSequence(), + (int) discontinuityItem.getDiscontinuity().getNewTopicSequence(), (int) discontinuityItem.getDiscontinuity().getNewSequencePage())); } private void handleSubscriptionHeartbeat() { logger.debug("heartbeat {} {}", options.getCacheName(), options.getTopicName()); + options.onHeartbeat(); } private void handleSubscriptionUnknown() { @@ -175,6 +183,16 @@ private void handleSubscriptionItemMessage(_SubscriptionItem item) { _TopicValue topicValue = topicItem.getValue(); options.subscriptionState.setResumeAtTopicSequenceNumber( (int) topicItem.getTopicSequenceNumber()); + options.subscriptionState.setResumeAtTopicSequencePage( + (int) topicItem.getSequencePage()); + + logger.debug( + "Received Item on subscription stream: {}, {}, {}, {}, {}", + options.getCacheName(), + options.getTopicName(), + topicItem.getPublisherId(), + topicItem.getTopicSequenceNumber(), + topicItem.getSequencePage()); TopicMessage message; switch (topicValue.getKindCase()) { diff --git a/momento-sdk/src/main/java/momento/sdk/internal/SubscriptionState.java b/momento-sdk/src/main/java/momento/sdk/internal/SubscriptionState.java index 67b8c766..eca35127 100644 --- a/momento-sdk/src/main/java/momento/sdk/internal/SubscriptionState.java +++ b/momento-sdk/src/main/java/momento/sdk/internal/SubscriptionState.java @@ -5,6 +5,7 @@ public class SubscriptionState { private Runnable unsubscribeFn; private Integer lastTopicSequenceNumber; + private Integer lastTopicSequencePage; private boolean isSubscribed; /** Constructs a new SubscriptionState instance with default values. */ @@ -27,6 +28,20 @@ public void setResumeAtTopicSequenceNumber(int lastTopicSequenceNumber) { this.lastTopicSequenceNumber = lastTopicSequenceNumber; } + /** + * Gets the topic sequence page to resume the subscription from. + * + * @return The topic sequence page to resume from. + */ + public int getResumeAtTopicSequencePage() { + return lastTopicSequencePage != null ? lastTopicSequencePage : 0; + } + + /** Sets the topic sequence page to resume the subscription from. */ + public void setResumeAtTopicSequencePage(int lastTopicSequencePage) { + this.lastTopicSequencePage = lastTopicSequencePage; + } + /** Sets the subscription state to "subscribed." */ public void setSubscribed() { this.isSubscribed = true; diff --git a/momento-sdk/src/main/java/momento/sdk/responses/topic/TopicDiscontinuity.java b/momento-sdk/src/main/java/momento/sdk/responses/topic/TopicDiscontinuity.java new file mode 100644 index 00000000..3e1b8bc3 --- /dev/null +++ b/momento-sdk/src/main/java/momento/sdk/responses/topic/TopicDiscontinuity.java @@ -0,0 +1,43 @@ +package momento.sdk.responses.topic; + +public class TopicDiscontinuity { + private final Integer lastSequenceNumber; + private final Integer newSequenceNumber; + private final Integer newSequencePage; + + public TopicDiscontinuity(Integer lastSequenceNumber, Integer newSequenceNumber, Integer newSequencePage) { + this.lastSequenceNumber = lastSequenceNumber; + this.newSequenceNumber = newSequenceNumber; + this.newSequencePage = newSequencePage; + } + + @Override + public String toString() { + return "TopicDiscontinuity{" + + "lastSequenceNumber=" + lastSequenceNumber + + ", newSequenceNumber=" + newSequenceNumber + + ", newSequencePage=" + newSequencePage + + '}'; + } + + /* + * Gets the last sequence number. + */ + public Integer getLastSequenceNumber() { + return lastSequenceNumber; + } + + /* + * Gets the new sequence number. + */ + public Integer getNewSequenceNumber() { + return newSequenceNumber; + } + + /* + * Gets the new sequence page. + */ + public Integer getNewSequencePage() { + return newSequencePage; + } +} diff --git a/momento-sdk/src/test/java/momento/sdk/SubscriptionWrapperTest.java b/momento-sdk/src/test/java/momento/sdk/SubscriptionWrapperTest.java index 17d37147..5dd1452a 100644 --- a/momento-sdk/src/test/java/momento/sdk/SubscriptionWrapperTest.java +++ b/momento-sdk/src/test/java/momento/sdk/SubscriptionWrapperTest.java @@ -45,6 +45,8 @@ public void testConnectionLostAndRestored() throws InterruptedException { (message) -> {}, () -> {}, (err) -> {}, + (discontinuity) -> {}, + () -> {}, () -> { logger.info("Got to our connection lost callback!"); gotConnectionLostCallback.set(true);