Skip to content

Commit

Permalink
feat: handle discontinuity and heartbeat in topics and add sequence p…
Browse files Browse the repository at this point in the history
…age to topics
  • Loading branch information
rishtigupta committed Nov 11, 2024
1 parent 68ee067 commit 0a1d047
Show file tree
Hide file tree
Showing 9 changed files with 139 additions and 2 deletions.
2 changes: 1 addition & 1 deletion gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ jsr305 = "3.0.2"
junit = "5.9.2"
gson = "2.10.1"
guava = "31.1-android"
java-protos = "0.113.0"
java-protos = "0.119.2"
slf4j = "1.7.36"
logback = "1.4.8"
mockito = "5.4.0"
Expand Down
11 changes: 11 additions & 0 deletions momento-sdk/src/intTest/java/momento/sdk/TopicClientTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import java.util.concurrent.CountDownLatch;
import momento.sdk.config.TopicConfigurations;
import momento.sdk.exceptions.MomentoErrorCode;
import momento.sdk.responses.topic.TopicDiscontinuity;
import momento.sdk.responses.topic.TopicMessage;
import momento.sdk.responses.topic.TopicPublishResponse;
import momento.sdk.responses.topic.TopicSubscribeResponse;
Expand Down Expand Up @@ -61,6 +62,16 @@ public void onCompleted() {
public void onError(Throwable t) {
logger.info("onError Invoked");
}

@Override
public void onDiscontinuity(TopicDiscontinuity discontinuity) {
logger.info("onDiscontinuity Invoked");
}

@Override
public void onHeartbeat() {
logger.info("onHeartbeat Invoked");
}
};
}

Expand Down
11 changes: 11 additions & 0 deletions momento-sdk/src/main/java/momento/sdk/ISubscriptionCallbacks.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package momento.sdk;

import momento.sdk.responses.topic.TopicDiscontinuity;
import momento.sdk.responses.topic.TopicMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -24,6 +25,16 @@ public interface ISubscriptionCallbacks {
*/
void onError(Throwable t);

/** Called when a discontinuity occurs during the subscription. */
default void onDiscontinuity(TopicDiscontinuity discontinuity) {
logger.info("Discontinuity occurred: {}", discontinuity);
};

/** Called when a heartbeat is received during the subscription. */
default void onHeartbeat() {
logger.info("Heartbeat received");
};

/** Called when the connection to the topic is lost. */
default void onConnectionLost() {
logger.info("Connection to topic lost");
Expand Down
2 changes: 2 additions & 0 deletions momento-sdk/src/main/java/momento/sdk/ScsTopicClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ public CompletableFuture<TopicSubscribeResponse> subscribe(
options::onItem,
options::onCompleted,
options::onError,
options::onDiscontinuity ,
options::onHeartbeat,
options::onConnectionLost,
options::onConnectionRestored,
subscriptionState,
Expand Down
35 changes: 35 additions & 0 deletions momento-sdk/src/main/java/momento/sdk/SendSubscribeOptions.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package momento.sdk;

import momento.sdk.internal.SubscriptionState;
import momento.sdk.responses.topic.TopicDiscontinuity;
import momento.sdk.responses.topic.TopicMessage;
import momento.sdk.responses.topic.TopicSubscribeResponse;

Expand All @@ -10,6 +11,8 @@ class SendSubscribeOptions implements ISubscriptionCallbacks {
ItemCallback onItem;
CompletedCallback onCompleted;
ErrorCallback onError;
DiscontinuityCallback onDiscontinuity;
HeartbeatCallback onHeartbeat;
ConnectionLostCallback onConnectionLost;
ConnectionRestoredCallback onConnectionRestored;
SubscriptionState subscriptionState;
Expand All @@ -21,6 +24,8 @@ class SendSubscribeOptions implements ISubscriptionCallbacks {
ItemCallback onItem,
CompletedCallback onCompleted,
ErrorCallback onError,
DiscontinuityCallback onDiscontinuity,
HeartbeatCallback onHeartbeat,
ConnectionLostCallback onConnectionLost,
ConnectionRestoredCallback onConnectionRestored,
SubscriptionState subscriptionState,
Expand All @@ -30,6 +35,8 @@ class SendSubscribeOptions implements ISubscriptionCallbacks {
this.onItem = onItem;
this.onCompleted = onCompleted;
this.onError = onError;
this.onDiscontinuity = onDiscontinuity;
this.onHeartbeat = onHeartbeat;
this.onConnectionLost = onConnectionLost;
this.onConnectionRestored = onConnectionRestored;
this.subscriptionState = subscriptionState;
Expand All @@ -56,6 +63,14 @@ public ErrorCallback getOnError() {
return onError;
}

public DiscontinuityCallback getOnDiscontinuity() {
return onDiscontinuity;
}

public HeartbeatCallback getOnHeartbeat() {
return onHeartbeat;
}

public SubscriptionState getSubscriptionState() {
return subscriptionState;
}
Expand All @@ -79,6 +94,16 @@ public void onError(Throwable t) {
onError.onError(t);
}

@Override
public void onDiscontinuity(TopicDiscontinuity discontinuity) {
onDiscontinuity.onDiscontinuity(discontinuity);
}

@Override
public void onHeartbeat() {
onHeartbeat.onHeartbeat();
}

@Override
public void onConnectionLost() {
onConnectionLost.onConnectionLost();
Expand All @@ -104,6 +129,16 @@ public interface ErrorCallback {
void onError(Throwable t);
}

@FunctionalInterface
public interface DiscontinuityCallback {
void onDiscontinuity(TopicDiscontinuity discontinuity);
}

@FunctionalInterface
public interface HeartbeatCallback {
void onHeartbeat();
}

@FunctionalInterface
public interface ConnectionLostCallback {
void onConnectionLost();
Expand Down
20 changes: 19 additions & 1 deletion momento-sdk/src/main/java/momento/sdk/SubscriptionWrapper.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import java.util.concurrent.TimeUnit;
import momento.sdk.exceptions.CacheServiceExceptionMapper;
import momento.sdk.exceptions.InternalServerException;
import momento.sdk.responses.topic.TopicDiscontinuity;
import momento.sdk.responses.topic.TopicMessage;
import momento.sdk.responses.topic.TopicSubscribeResponse;
import org.slf4j.Logger;
Expand Down Expand Up @@ -117,6 +118,7 @@ public void onCompleted() {
.setTopic(options.getTopicName())
.setResumeAtTopicSequenceNumber(
options.subscriptionState.getResumeAtTopicSequenceNumber())
.setSequencePage(options.subscriptionState.getResumeAtTopicSequencePage())
.build();

try {
Expand Down Expand Up @@ -155,15 +157,21 @@ private void handleSubscriptionItem(_SubscriptionItem item) {

private void handleSubscriptionDiscontinuity(_SubscriptionItem discontinuityItem) {
logger.debug(
"{}, {}, {}, {}",
"discontinuity {}, {}, {}, {}, {}, {}",
options.getCacheName(),
options.getTopicName(),
discontinuityItem.getDiscontinuity().getLastTopicSequence(),
discontinuityItem.getDiscontinuity().getNewTopicSequence(),
discontinuityItem.getDiscontinuity().getLastTopicSequence(),
discontinuityItem.getDiscontinuity().getNewTopicSequence());

options.onDiscontinuity(new TopicDiscontinuity((int) discontinuityItem.getDiscontinuity().getLastTopicSequence(),
(int) discontinuityItem.getDiscontinuity().getNewTopicSequence(), (int) discontinuityItem.getDiscontinuity().getNewSequencePage()));
}

private void handleSubscriptionHeartbeat() {
logger.debug("heartbeat {} {}", options.getCacheName(), options.getTopicName());
options.onHeartbeat();
}

private void handleSubscriptionUnknown() {
Expand All @@ -175,6 +183,16 @@ private void handleSubscriptionItemMessage(_SubscriptionItem item) {
_TopicValue topicValue = topicItem.getValue();
options.subscriptionState.setResumeAtTopicSequenceNumber(
(int) topicItem.getTopicSequenceNumber());
options.subscriptionState.setResumeAtTopicSequencePage(
(int) topicItem.getSequencePage());

logger.debug(
"Received Item on subscription stream: {}, {}, {}, {}, {}",
options.getCacheName(),
options.getTopicName(),
topicItem.getPublisherId(),
topicItem.getTopicSequenceNumber(),
topicItem.getSequencePage());
TopicMessage message;

switch (topicValue.getKindCase()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ public class SubscriptionState {

private Runnable unsubscribeFn;
private Integer lastTopicSequenceNumber;
private Integer lastTopicSequencePage;
private boolean isSubscribed;

/** Constructs a new SubscriptionState instance with default values. */
Expand All @@ -27,6 +28,20 @@ public void setResumeAtTopicSequenceNumber(int lastTopicSequenceNumber) {
this.lastTopicSequenceNumber = lastTopicSequenceNumber;
}

/**
* Gets the topic sequence page to resume the subscription from.
*
* @return The topic sequence page to resume from.
*/
public int getResumeAtTopicSequencePage() {
return lastTopicSequencePage != null ? lastTopicSequencePage : 0;
}

/** Sets the topic sequence page to resume the subscription from. */
public void setResumeAtTopicSequencePage(int lastTopicSequencePage) {
this.lastTopicSequencePage = lastTopicSequencePage;
}

/** Sets the subscription state to "subscribed." */
public void setSubscribed() {
this.isSubscribed = true;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package momento.sdk.responses.topic;

public class TopicDiscontinuity {
private final Integer lastSequenceNumber;
private final Integer newSequenceNumber;
private final Integer newSequencePage;

public TopicDiscontinuity(Integer lastSequenceNumber, Integer newSequenceNumber, Integer newSequencePage) {
this.lastSequenceNumber = lastSequenceNumber;
this.newSequenceNumber = newSequenceNumber;
this.newSequencePage = newSequencePage;
}

@Override
public String toString() {
return "TopicDiscontinuity{" +
"lastSequenceNumber=" + lastSequenceNumber +
", newSequenceNumber=" + newSequenceNumber +
", newSequencePage=" + newSequencePage +
'}';
}

/*
* Gets the last sequence number.
*/
public Integer getLastSequenceNumber() {
return lastSequenceNumber;
}

/*
* Gets the new sequence number.
*/
public Integer getNewSequenceNumber() {
return newSequenceNumber;
}

/*
* Gets the new sequence page.
*/
public Integer getNewSequencePage() {
return newSequencePage;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ public void testConnectionLostAndRestored() throws InterruptedException {
(message) -> {},
() -> {},
(err) -> {},
(discontinuity) -> {},
() -> {},
() -> {
logger.info("Got to our connection lost callback!");
gotConnectionLostCallback.set(true);
Expand Down

0 comments on commit 0a1d047

Please sign in to comment.