Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add topic example #341

Merged
merged 2 commits into from
Dec 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions examples/settings.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,4 @@ include("cache")
include("cache-with-aws")
include("lambda:docker")
include("token")
include("topic")
52 changes: 52 additions & 0 deletions examples/topic/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* This file was generated by the Gradle 'init' task.
*
* This generated file contains a sample Java library project to get you started.
* For more details take a look at the 'Building Java & JVM projects' chapter in the Gradle
* User Manual available at https://docs.gradle.org/7.2/userguide/building_java_projects.html
*/

plugins {
application
id("com.diffplug.spotless") version "5.15.1"
}

repositories {
// Use Maven Central for resolving dependencies.
mavenCentral()
}

dependencies {
implementation("software.momento.java:sdk:1.7.0")

implementation("com.google.guava:guava:31.1-android")

// Logging framework to log and enable logging in the Momento client.
implementation("ch.qos.logback:logback-classic:1.4.7")

// Histogram for collecting stats in the load generator
implementation("org.hdrhistogram:HdrHistogram:2.1.12")

// Use JUnit Jupiter for testing.
testImplementation("org.junit.jupiter:junit-jupiter:5.9.2")
}

spotless {
java {
removeUnusedImports()
googleJavaFormat("1.11.0")
}
}

tasks.test {
// Use JUnit Platform for unit tests.
useJUnitPlatform()
}

task("topic", JavaExec::class) {
description = "Run the disposable token example"
classpath = sourceSets.main.get().runtimeClasspath
mainClass.set("momento.client.example.TopicExample")
}

task("prepareKotlinBuildScriptModel") {}
123 changes: 123 additions & 0 deletions examples/topic/src/main/java/momento/client/example/TopicExample.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
package momento.client.example;

import java.time.Duration;
import momento.sdk.CacheClient;
import momento.sdk.ISubscriptionCallbacks;
import momento.sdk.TopicClient;
import momento.sdk.auth.CredentialProvider;
import momento.sdk.auth.EnvVarCredentialProvider;
import momento.sdk.config.Configurations;
import momento.sdk.config.TopicConfigurations;
import momento.sdk.exceptions.AlreadyExistsException;
import momento.sdk.exceptions.SdkException;
import momento.sdk.responses.cache.control.CacheCreateResponse;
import momento.sdk.responses.topic.TopicMessage;
import momento.sdk.responses.topic.TopicPublishResponse;
import momento.sdk.responses.topic.TopicSubscribeResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TopicExample {

private static final String API_KEY_ENV_VAR = "MOMENTO_API_KEY";
private static final Duration DEFAULT_ITEM_TTL = Duration.ofSeconds(60);

private static final String CACHE_NAME = "topic-example-cache";
private static final String TOPIC_NAME = "example-topic";

private static final Logger logger = LoggerFactory.getLogger(TopicExample.class);

public static void main(String[] args) {
logStartBanner();

final CredentialProvider credentialProvider;
try {
credentialProvider = new EnvVarCredentialProvider(API_KEY_ENV_VAR);
} catch (SdkException e) {
logger.error("Unable to load credential from environment variable " + API_KEY_ENV_VAR, e);
throw e;
}

try (final CacheClient client =
CacheClient.create(
credentialProvider, Configurations.Laptop.latest(), DEFAULT_ITEM_TTL);
final TopicClient topicClient =
TopicClient.create(credentialProvider, TopicConfigurations.Laptop.latest())) {

// Create a cache
final CacheCreateResponse createResponse = client.createCache(CACHE_NAME).join();
if (createResponse instanceof CacheCreateResponse.Error error) {
if (error.getCause() instanceof AlreadyExistsException) {
logger.info("Cache with name '{}' already exists.", CACHE_NAME);
} else {
logger.error("Cache creation failed with error " + error.getErrorCode(), error);
}
}

// Subscribe to a topic
TopicSubscribeResponse.Subscription subscription = subscribeToTopic(topicClient);

// Publish messages to the topic
for (int i = 0; i < 100; i++) {
publishToTopic(topicClient, "message " + i);
Thread.sleep(1000);
}

subscription.unsubscribe();

} catch (Exception e) {
logger.error("An unexpected error occurred", e);
throw new RuntimeException(e);
}

logEndBanner();
}

private static TopicSubscribeResponse.Subscription subscribeToTopic(TopicClient topicClient) {
final TopicSubscribeResponse subscribeResponse =
topicClient
.subscribe(
TopicExample.CACHE_NAME,
TOPIC_NAME,
new ISubscriptionCallbacks() {
@Override
public void onItem(TopicMessage message) {
logger.info("Received message on topic {}: {}", TOPIC_NAME, message.toString());
}

@Override
public void onError(Throwable error) {
logger.error("Subscription to topic {} failed with error", TOPIC_NAME, error);
}

@Override
public void onCompleted() {
logger.info("Subscription to topic {} completed", TOPIC_NAME);
}
})
.join();
return subscribeResponse.orElseThrow(
() -> new RuntimeException("Unable to subscribe to topic " + TOPIC_NAME));
}

private static void publishToTopic(TopicClient topicClient, String message) {
final TopicPublishResponse publishResponse =
topicClient.publish(TopicExample.CACHE_NAME, TOPIC_NAME, message).join();
if (publishResponse instanceof TopicPublishResponse.Error error) {
logger.error(
"Topic {} publish failed with error {}", TOPIC_NAME, error.getErrorCode(), error);
}
}

private static void logStartBanner() {
logger.info("******************************************************************");
logger.info("Example Start");
logger.info("******************************************************************");
}

private static void logEndBanner() {
logger.info("******************************************************************");
logger.info("Example End");
logger.info("******************************************************************");
}
}
17 changes: 17 additions & 0 deletions examples/topic/src/main/resources/logback.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE configuration>

<configuration>
<import class="ch.qos.logback.classic.encoder.PatternLayoutEncoder"/>
<import class="ch.qos.logback.core.ConsoleAppender"/>

<appender name="STDOUT" class="ConsoleAppender">
<encoder class="PatternLayoutEncoder">
<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} -%kvp- %msg%n</pattern>
</encoder>
</appender>

<root level="info">
<appender-ref ref="STDOUT"/>
</root>
</configuration>
Original file line number Diff line number Diff line change
Expand Up @@ -201,11 +201,6 @@ public void close() {
if (subscription != null) {
subscription.onCompleted();
}
if (scheduler != null) {
scheduler.shutdown();
}
if (grpcManager != null) {
grpcManager.close();
}
scheduler.shutdown();
}
}
Loading