Skip to content

Commit

Permalink
feat: add topic example (#341)
Browse files Browse the repository at this point in the history
  • Loading branch information
rishtigupta authored Dec 14, 2023
1 parent 7de0569 commit 4460c67
Show file tree
Hide file tree
Showing 5 changed files with 194 additions and 6 deletions.
1 change: 1 addition & 0 deletions examples/settings.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,4 @@ include("cache")
include("cache-with-aws")
include("lambda:docker")
include("token")
include("topic")
52 changes: 52 additions & 0 deletions examples/topic/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* This file was generated by the Gradle 'init' task.
*
* This generated file contains a sample Java library project to get you started.
* For more details take a look at the 'Building Java & JVM projects' chapter in the Gradle
* User Manual available at https://docs.gradle.org/7.2/userguide/building_java_projects.html
*/

plugins {
application
id("com.diffplug.spotless") version "5.15.1"
}

repositories {
// Use Maven Central for resolving dependencies.
mavenCentral()
}

dependencies {
implementation("software.momento.java:sdk:1.7.0")

implementation("com.google.guava:guava:31.1-android")

// Logging framework to log and enable logging in the Momento client.
implementation("ch.qos.logback:logback-classic:1.4.7")

// Histogram for collecting stats in the load generator
implementation("org.hdrhistogram:HdrHistogram:2.1.12")

// Use JUnit Jupiter for testing.
testImplementation("org.junit.jupiter:junit-jupiter:5.9.2")
}

spotless {
java {
removeUnusedImports()
googleJavaFormat("1.11.0")
}
}

tasks.test {
// Use JUnit Platform for unit tests.
useJUnitPlatform()
}

task("topic", JavaExec::class) {
description = "Run the disposable token example"
classpath = sourceSets.main.get().runtimeClasspath
mainClass.set("momento.client.example.TopicExample")
}

task("prepareKotlinBuildScriptModel") {}
123 changes: 123 additions & 0 deletions examples/topic/src/main/java/momento/client/example/TopicExample.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
package momento.client.example;

import java.time.Duration;
import momento.sdk.CacheClient;
import momento.sdk.ISubscriptionCallbacks;
import momento.sdk.TopicClient;
import momento.sdk.auth.CredentialProvider;
import momento.sdk.auth.EnvVarCredentialProvider;
import momento.sdk.config.Configurations;
import momento.sdk.config.TopicConfigurations;
import momento.sdk.exceptions.AlreadyExistsException;
import momento.sdk.exceptions.SdkException;
import momento.sdk.responses.cache.control.CacheCreateResponse;
import momento.sdk.responses.topic.TopicMessage;
import momento.sdk.responses.topic.TopicPublishResponse;
import momento.sdk.responses.topic.TopicSubscribeResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TopicExample {

private static final String API_KEY_ENV_VAR = "MOMENTO_API_KEY";
private static final Duration DEFAULT_ITEM_TTL = Duration.ofSeconds(60);

private static final String CACHE_NAME = "topic-example-cache";
private static final String TOPIC_NAME = "example-topic";

private static final Logger logger = LoggerFactory.getLogger(TopicExample.class);

public static void main(String[] args) {
logStartBanner();

final CredentialProvider credentialProvider;
try {
credentialProvider = new EnvVarCredentialProvider(API_KEY_ENV_VAR);
} catch (SdkException e) {
logger.error("Unable to load credential from environment variable " + API_KEY_ENV_VAR, e);
throw e;
}

try (final CacheClient client =
CacheClient.create(
credentialProvider, Configurations.Laptop.latest(), DEFAULT_ITEM_TTL);
final TopicClient topicClient =
TopicClient.create(credentialProvider, TopicConfigurations.Laptop.latest())) {

// Create a cache
final CacheCreateResponse createResponse = client.createCache(CACHE_NAME).join();
if (createResponse instanceof CacheCreateResponse.Error error) {
if (error.getCause() instanceof AlreadyExistsException) {
logger.info("Cache with name '{}' already exists.", CACHE_NAME);
} else {
logger.error("Cache creation failed with error " + error.getErrorCode(), error);
}
}

// Subscribe to a topic
TopicSubscribeResponse.Subscription subscription = subscribeToTopic(topicClient);

// Publish messages to the topic
for (int i = 0; i < 100; i++) {
publishToTopic(topicClient, "message " + i);
Thread.sleep(1000);
}

subscription.unsubscribe();

} catch (Exception e) {
logger.error("An unexpected error occurred", e);
throw new RuntimeException(e);
}

logEndBanner();
}

private static TopicSubscribeResponse.Subscription subscribeToTopic(TopicClient topicClient) {
final TopicSubscribeResponse subscribeResponse =
topicClient
.subscribe(
TopicExample.CACHE_NAME,
TOPIC_NAME,
new ISubscriptionCallbacks() {
@Override
public void onItem(TopicMessage message) {
logger.info("Received message on topic {}: {}", TOPIC_NAME, message.toString());
}

@Override
public void onError(Throwable error) {
logger.error("Subscription to topic {} failed with error", TOPIC_NAME, error);
}

@Override
public void onCompleted() {
logger.info("Subscription to topic {} completed", TOPIC_NAME);
}
})
.join();
return subscribeResponse.orElseThrow(
() -> new RuntimeException("Unable to subscribe to topic " + TOPIC_NAME));
}

private static void publishToTopic(TopicClient topicClient, String message) {
final TopicPublishResponse publishResponse =
topicClient.publish(TopicExample.CACHE_NAME, TOPIC_NAME, message).join();
if (publishResponse instanceof TopicPublishResponse.Error error) {
logger.error(
"Topic {} publish failed with error {}", TOPIC_NAME, error.getErrorCode(), error);
}
}

private static void logStartBanner() {
logger.info("******************************************************************");
logger.info("Example Start");
logger.info("******************************************************************");
}

private static void logEndBanner() {
logger.info("******************************************************************");
logger.info("Example End");
logger.info("******************************************************************");
}
}
17 changes: 17 additions & 0 deletions examples/topic/src/main/resources/logback.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE configuration>

<configuration>
<import class="ch.qos.logback.classic.encoder.PatternLayoutEncoder"/>
<import class="ch.qos.logback.core.ConsoleAppender"/>

<appender name="STDOUT" class="ConsoleAppender">
<encoder class="PatternLayoutEncoder">
<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} -%kvp- %msg%n</pattern>
</encoder>
</appender>

<root level="info">
<appender-ref ref="STDOUT"/>
</root>
</configuration>
Original file line number Diff line number Diff line change
Expand Up @@ -201,11 +201,6 @@ public void close() {
if (subscription != null) {
subscription.onCompleted();
}
if (scheduler != null) {
scheduler.shutdown();
}
if (grpcManager != null) {
grpcManager.close();
}
scheduler.shutdown();
}
}

0 comments on commit 4460c67

Please sign in to comment.