From 243bce0b7be013a0d2546e6899e1dc2d5799b96c Mon Sep 17 00:00:00 2001 From: anitarua Date: Tue, 5 Mar 2024 15:46:23 -0800 Subject: [PATCH 1/6] feat: add grpc config options and turn off keepalive for Lambda config --- .gitignore | 2 + .../main/java/momento/sdk/CacheClient.java | 2 +- .../java/momento/sdk/ScsControlClient.java | 6 +- .../sdk/ScsControlGrpcStubsManager.java | 26 ++- .../momento/sdk/ScsDataGrpcStubsManager.java | 8 +- .../momento/sdk/ScsTokenGrpcStubsManager.java | 12 +- .../momento/sdk/ScsTopicGrpcStubsManager.java | 15 +- .../momento/sdk/config/Configurations.java | 15 +- .../sdk/config/TopicConfigurations.java | 21 ++- .../config/transport/GrpcConfiguration.java | 171 +++++++++++++++++- .../sdk/internal/GrpcChannelOptions.java | 38 ++++ .../test/java/momento/sdk/Configurations.java | 39 ++++ 12 files changed, 323 insertions(+), 32 deletions(-) create mode 100644 momento-sdk/src/main/java/momento/sdk/internal/GrpcChannelOptions.java create mode 100644 momento-sdk/src/test/java/momento/sdk/Configurations.java diff --git a/.gitignore b/.gitignore index 484d31bd..e00185e6 100644 --- a/.gitignore +++ b/.gitignore @@ -45,3 +45,5 @@ examples/lib/.settings examples/lib/.classpath examples/lib/.project examples/lib/bin +.vscode/ +*logback.xml diff --git a/momento-sdk/src/main/java/momento/sdk/CacheClient.java b/momento-sdk/src/main/java/momento/sdk/CacheClient.java index 8b84bcac..14bed021 100644 --- a/momento-sdk/src/main/java/momento/sdk/CacheClient.java +++ b/momento-sdk/src/main/java/momento/sdk/CacheClient.java @@ -81,7 +81,7 @@ public CacheClient( @Nonnull CredentialProvider credentialProvider, @Nonnull Configuration configuration, @Nonnull Duration itemDefaultTtl) { - this.scsControlClient = new ScsControlClient(credentialProvider); + this.scsControlClient = new ScsControlClient(credentialProvider, configuration); this.scsDataClient = new ScsDataClient(credentialProvider, configuration, itemDefaultTtl); logger.info("Creating Momento Cache Client"); diff --git a/momento-sdk/src/main/java/momento/sdk/ScsControlClient.java b/momento-sdk/src/main/java/momento/sdk/ScsControlClient.java index 2e4c2de0..b4228a9d 100644 --- a/momento-sdk/src/main/java/momento/sdk/ScsControlClient.java +++ b/momento-sdk/src/main/java/momento/sdk/ScsControlClient.java @@ -30,6 +30,7 @@ import java.util.stream.Collectors; import javax.annotation.Nonnull; import momento.sdk.auth.CredentialProvider; +import momento.sdk.config.Configuration; import momento.sdk.exceptions.CacheServiceExceptionMapper; import momento.sdk.exceptions.InternalServerException; import momento.sdk.responses.cache.control.CacheCreateResponse; @@ -48,9 +49,10 @@ final class ScsControlClient extends ScsClient { private final CredentialProvider credentialProvider; private final ScsControlGrpcStubsManager controlGrpcStubsManager; - ScsControlClient(@Nonnull CredentialProvider credentialProvider) { + ScsControlClient(@Nonnull CredentialProvider credentialProvider, Configuration configuration) { this.credentialProvider = credentialProvider; - this.controlGrpcStubsManager = new ScsControlGrpcStubsManager(credentialProvider); + this.controlGrpcStubsManager = + new ScsControlGrpcStubsManager(credentialProvider, configuration); } CompletableFuture createCache(String cacheName) { diff --git a/momento-sdk/src/main/java/momento/sdk/ScsControlGrpcStubsManager.java b/momento-sdk/src/main/java/momento/sdk/ScsControlGrpcStubsManager.java index db8ddb42..1a5858e7 100644 --- a/momento-sdk/src/main/java/momento/sdk/ScsControlGrpcStubsManager.java +++ b/momento-sdk/src/main/java/momento/sdk/ScsControlGrpcStubsManager.java @@ -10,6 +10,9 @@ import java.util.concurrent.TimeUnit; import javax.annotation.Nonnull; import momento.sdk.auth.CredentialProvider; +import momento.sdk.config.Configuration; +import momento.sdk.config.transport.GrpcConfiguration; +import momento.sdk.internal.GrpcChannelOptions; /** * Manager responsible for GRPC channels and stubs for the Control Plane. @@ -26,16 +29,29 @@ final class ScsControlGrpcStubsManager implements AutoCloseable { private final ScsControlGrpc.ScsControlFutureStub futureStub; - ScsControlGrpcStubsManager(@Nonnull CredentialProvider credentialProvider) { - this.channel = setupConnection(credentialProvider); + ScsControlGrpcStubsManager( + @Nonnull CredentialProvider credentialProvider, Configuration configuration) { + this.channel = setupConnection(credentialProvider, configuration); this.futureStub = ScsControlGrpc.newFutureStub(channel); } - private static ManagedChannel setupConnection(CredentialProvider credentialProvider) { + private static ManagedChannel setupConnection( + CredentialProvider credentialProvider, Configuration configuration) { final NettyChannelBuilder channelBuilder = NettyChannelBuilder.forAddress(credentialProvider.getControlEndpoint(), 443); - channelBuilder.useTransportSecurity(); - channelBuilder.disableRetry(); + + // Override grpc config to disable keepalive for control clients + final GrpcConfiguration controlConfig = + configuration + .getTransportStrategy() + .getGrpcConfiguration() + .withKeepAliveTime(0) + .withKeepAliveTimeout(0) + .withKeepAliveWithoutCalls(false); + + // set additional channel options (message size, keepalive, auth, etc) + GrpcChannelOptions.GrpcOptionsFromGrpcConfig(controlConfig, channelBuilder); + final List clientInterceptors = new ArrayList<>(); clientInterceptors.add(new UserHeaderInterceptor(credentialProvider.getAuthToken())); channelBuilder.intercept(clientInterceptors); diff --git a/momento-sdk/src/main/java/momento/sdk/ScsDataGrpcStubsManager.java b/momento-sdk/src/main/java/momento/sdk/ScsDataGrpcStubsManager.java index dc8ae6b3..d98b2479 100644 --- a/momento-sdk/src/main/java/momento/sdk/ScsDataGrpcStubsManager.java +++ b/momento-sdk/src/main/java/momento/sdk/ScsDataGrpcStubsManager.java @@ -23,6 +23,7 @@ import javax.annotation.Nonnull; import momento.sdk.auth.CredentialProvider; import momento.sdk.config.Configuration; +import momento.sdk.internal.GrpcChannelOptions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -171,8 +172,11 @@ private ManagedChannel setupChannel( CredentialProvider credentialProvider, Configuration configuration) { final NettyChannelBuilder channelBuilder = NettyChannelBuilder.forAddress(credentialProvider.getCacheEndpoint(), 443); - channelBuilder.useTransportSecurity(); - channelBuilder.disableRetry(); + + // set additional channel options (message size, keepalive, auth, etc) + GrpcChannelOptions.GrpcOptionsFromGrpcConfig( + configuration.getTransportStrategy().getGrpcConfiguration(), channelBuilder); + final List clientInterceptors = new ArrayList<>(); clientInterceptors.add(new UserHeaderInterceptor(credentialProvider.getAuthToken())); clientInterceptors.add( diff --git a/momento-sdk/src/main/java/momento/sdk/ScsTokenGrpcStubsManager.java b/momento-sdk/src/main/java/momento/sdk/ScsTokenGrpcStubsManager.java index 75a190f2..af974628 100644 --- a/momento-sdk/src/main/java/momento/sdk/ScsTokenGrpcStubsManager.java +++ b/momento-sdk/src/main/java/momento/sdk/ScsTokenGrpcStubsManager.java @@ -9,6 +9,8 @@ import java.util.concurrent.TimeUnit; import javax.annotation.Nonnull; import momento.sdk.auth.CredentialProvider; +import momento.sdk.config.transport.GrpcConfiguration; +import momento.sdk.internal.GrpcChannelOptions; import momento.token.TokenGrpc; /** @@ -34,8 +36,14 @@ final class ScsTokenGrpcStubsManager implements AutoCloseable { private static ManagedChannel setupConnection(CredentialProvider credentialProvider) { final NettyChannelBuilder channelBuilder = NettyChannelBuilder.forAddress(credentialProvider.getTokenEndpoint(), 443); - channelBuilder.useTransportSecurity(); - channelBuilder.disableRetry(); + + // Note: This is hard-coded for now but we may want to expose it via configuration object + // in the future, as we do with some of the other clients. + final GrpcConfiguration grpcConfig = new GrpcConfiguration(Duration.ofMillis(15000)); + + // set additional channel options (message size, keepalive, auth, etc) + GrpcChannelOptions.GrpcOptionsFromGrpcConfig(grpcConfig, channelBuilder); + final List clientInterceptors = new ArrayList<>(); clientInterceptors.add(new UserHeaderInterceptor(credentialProvider.getAuthToken())); channelBuilder.intercept(clientInterceptors); diff --git a/momento-sdk/src/main/java/momento/sdk/ScsTopicGrpcStubsManager.java b/momento-sdk/src/main/java/momento/sdk/ScsTopicGrpcStubsManager.java index 3b259e03..5a608bc2 100644 --- a/momento-sdk/src/main/java/momento/sdk/ScsTopicGrpcStubsManager.java +++ b/momento-sdk/src/main/java/momento/sdk/ScsTopicGrpcStubsManager.java @@ -7,10 +7,10 @@ import java.io.Closeable; import java.util.ArrayList; import java.util.List; -import java.util.concurrent.TimeUnit; import javax.annotation.Nonnull; import momento.sdk.auth.CredentialProvider; import momento.sdk.config.TopicConfiguration; +import momento.sdk.internal.GrpcChannelOptions; /** * Manager responsible for GRPC channels and stubs for the Topics. @@ -28,20 +28,19 @@ final class ScsTopicGrpcStubsManager implements Closeable { ScsTopicGrpcStubsManager( @Nonnull CredentialProvider credentialProvider, @Nonnull TopicConfiguration configuration) { - this.channel = setupConnection(credentialProvider); + this.channel = setupConnection(credentialProvider, configuration); this.stub = PubsubGrpc.newStub(channel); this.configuration = configuration; } - private static ManagedChannel setupConnection(CredentialProvider credentialProvider) { + private static ManagedChannel setupConnection( + CredentialProvider credentialProvider, TopicConfiguration configuration) { final NettyChannelBuilder channelBuilder = NettyChannelBuilder.forAddress(credentialProvider.getCacheEndpoint(), 443); - channelBuilder.useTransportSecurity(); - channelBuilder.disableRetry(); - channelBuilder.keepAliveTime(10, TimeUnit.SECONDS); - channelBuilder.keepAliveTimeout(5, TimeUnit.SECONDS); - channelBuilder.keepAliveWithoutCalls(true); + // set additional channel options (message size, keepalive, auth, etc) + GrpcChannelOptions.GrpcOptionsFromGrpcConfig( + configuration.getTransportStrategy().getGrpcConfiguration(), channelBuilder); final List clientInterceptors = new ArrayList<>(); clientInterceptors.add(new UserHeaderInterceptor(credentialProvider.getAuthToken())); diff --git a/momento-sdk/src/main/java/momento/sdk/config/Configurations.java b/momento-sdk/src/main/java/momento/sdk/config/Configurations.java index 34bb6ef8..553835c7 100644 --- a/momento-sdk/src/main/java/momento/sdk/config/Configurations.java +++ b/momento-sdk/src/main/java/momento/sdk/config/Configurations.java @@ -137,11 +137,22 @@ private Lambda(TransportStrategy transportStrategy, RetryStrategy retryStrategy) *

This configuration may change in future releases to take advantage of improvements we * identify for default configurations. * + *

NOTE: keep-alives are very important for long-lived server environments where there may be + * periods of time when the connection is idle. However, they are very problematic for lambda + * environments where the lambda runtime is continuously frozen and unfrozen, because the lambda + * may be frozen before the "ACK" is received from the server. This can cause the keep-alive to + * timeout even though the connection is completely healthy. Therefore, keep-alives should be + * disabled in lambda and similar environments. + * * @return the latest Lambda configuration */ public static Configuration latest() { - final TransportStrategy transportStrategy = - new StaticTransportStrategy(new GrpcConfiguration(Duration.ofMillis(1100))); + final GrpcConfiguration grpcConfig = + new GrpcConfiguration(Duration.ofMillis(1100)) + .withKeepAliveTime(0) + .withKeepAliveTimeout(0) + .withKeepAliveWithoutCalls(false); + final TransportStrategy transportStrategy = new StaticTransportStrategy(grpcConfig); final RetryStrategy retryStrategy = new FixedCountRetryStrategy(DEFAULT_MAX_RETRIES, new DefaultRetryEligibilityStrategy()); return new Lambda(transportStrategy, retryStrategy); diff --git a/momento-sdk/src/main/java/momento/sdk/config/TopicConfigurations.java b/momento-sdk/src/main/java/momento/sdk/config/TopicConfigurations.java index b5055c19..d710dd39 100644 --- a/momento-sdk/src/main/java/momento/sdk/config/TopicConfigurations.java +++ b/momento-sdk/src/main/java/momento/sdk/config/TopicConfigurations.java @@ -28,8 +28,11 @@ private Laptop(TransportStrategy transportStrategy, Logger logger) { * @return the latest Laptop configuration */ public static TopicConfiguration latest() { - final TransportStrategy transportStrategy = - new StaticTransportStrategy(new GrpcConfiguration(Duration.ofMillis(15000))); + final GrpcConfiguration grpcConfig = + new GrpcConfiguration(Duration.ofMillis(15000)) + .withKeepAliveTime(10000) + .withKeepAliveTimeout(5000); + final TransportStrategy transportStrategy = new StaticTransportStrategy(grpcConfig); final Logger logger = LoggerFactory.getLogger(TopicConfiguration.class); return new Laptop(transportStrategy, logger); } @@ -54,8 +57,11 @@ private InRegion(TransportStrategy transportStrategy, Logger logger) { * @return the latest in-region configuration */ public static TopicConfiguration latest() { - final TransportStrategy transportStrategy = - new StaticTransportStrategy(new GrpcConfiguration(Duration.ofMillis(1100))); + final GrpcConfiguration grpcConfig = + new GrpcConfiguration(Duration.ofMillis(1100)) + .withKeepAliveTime(10000) + .withKeepAliveTimeout(5000); + final TransportStrategy transportStrategy = new StaticTransportStrategy(grpcConfig); final Logger logger = LoggerFactory.getLogger(TopicConfiguration.class); return new InRegion(transportStrategy, logger); } @@ -81,8 +87,11 @@ private LowLatency(TransportStrategy transportStrategy, Logger logger) { * @return the latest low-latency configuration */ public static TopicConfiguration latest() { - final TransportStrategy transportStrategy = - new StaticTransportStrategy(new GrpcConfiguration(Duration.ofMillis(500))); + final GrpcConfiguration grpcConfig = + new GrpcConfiguration(Duration.ofMillis(500)) + .withKeepAliveTime(10000) + .withKeepAliveTimeout(5000); + final TransportStrategy transportStrategy = new StaticTransportStrategy(grpcConfig); final Logger logger = LoggerFactory.getLogger(TopicConfiguration.class); return new LowLatency(transportStrategy, logger); } diff --git a/momento-sdk/src/main/java/momento/sdk/config/transport/GrpcConfiguration.java b/momento-sdk/src/main/java/momento/sdk/config/transport/GrpcConfiguration.java index fdbcc104..f4cb0a40 100644 --- a/momento-sdk/src/main/java/momento/sdk/config/transport/GrpcConfiguration.java +++ b/momento-sdk/src/main/java/momento/sdk/config/transport/GrpcConfiguration.java @@ -4,12 +4,17 @@ import java.time.Duration; import javax.annotation.Nonnull; +import momento.sdk.internal.GrpcChannelOptions; /** Abstracts away the gRPC configuration tunables. */ public class GrpcConfiguration { private final Duration deadline; private final int minNumGrpcChannels; + private final int maxMessageSize; + private final boolean keepAliveWithoutCalls; + private final int keepAliveTimeoutMs; + private final int keepAliveTimeMs; /** * Constructs a GrpcConfiguration. @@ -17,7 +22,13 @@ public class GrpcConfiguration { * @param deadline The maximum duration of a gRPC call. */ public GrpcConfiguration(@Nonnull Duration deadline) { - this(deadline, 1); + this( + deadline, + 1, + GrpcChannelOptions.DEFAULT_MAX_MESSAGE_SIZE, + GrpcChannelOptions.DEFAULT_KEEPALIVE_WITHOUT_STREAM, + GrpcChannelOptions.DEFAULT_KEEPALIVE_TIMEOUT_MS, + GrpcChannelOptions.DEFAULT_KEEPALIVE_TIME_MS); } /** @@ -25,11 +36,27 @@ public GrpcConfiguration(@Nonnull Duration deadline) { * * @param deadline The maximum duration of a gRPC call. * @param minNumGrpcChannels The minimum number of gRPC channels to keep open at any given time. + * @param maxMessageSize The maximum size of a message (in bytes) that can be received by the + * client. + * @param keepAliveWithoutCalls Whether to send keepalive pings without any active calls. + * @param keepAliveTimeout The time to wait for a keepalive ping response before considering the + * connection dead. + * @param keepAliveTime The time to wait between keepalive pings. */ - public GrpcConfiguration(@Nonnull Duration deadline, int minNumGrpcChannels) { + public GrpcConfiguration( + @Nonnull Duration deadline, + int minNumGrpcChannels, + int maxMessageSize, + boolean keepAliveWithoutCalls, + int keepAliveTimeout, + int keepAliveTime) { ensureRequestDeadlineValid(deadline); this.deadline = deadline; this.minNumGrpcChannels = minNumGrpcChannels; + this.maxMessageSize = maxMessageSize; + this.keepAliveWithoutCalls = keepAliveWithoutCalls; + this.keepAliveTimeoutMs = keepAliveTimeout; + this.keepAliveTimeMs = keepAliveTime; } /** @@ -49,7 +76,13 @@ public Duration getDeadline() { * @return The updated GrpcConfiguration. */ public GrpcConfiguration withDeadline(Duration deadline) { - return new GrpcConfiguration(deadline); + return new GrpcConfiguration( + deadline, + minNumGrpcChannels, + maxMessageSize, + keepAliveWithoutCalls, + keepAliveTimeoutMs, + keepAliveTimeMs); } /** @@ -68,6 +101,136 @@ public int getMinNumGrpcChannels() { * @return The updated GrpcConfiguration. */ public GrpcConfiguration withMinNumGrpcChannels(int minNumGrpcChannels) { - return new GrpcConfiguration(deadline, minNumGrpcChannels); + return new GrpcConfiguration( + deadline, + minNumGrpcChannels, + maxMessageSize, + keepAliveWithoutCalls, + keepAliveTimeoutMs, + keepAliveTimeMs); + } + + /** + * The maximum size of a message (in bytes) that can be received by the client. + * + * @return the maximum message size. + */ + public int getMaxMessageSize() { + return maxMessageSize; + } + + /** + * Copy constructor that updates the maximum message size. + * + * @param maxMessageSize The new maximum message size. + * @return The updated GrpcConfiguration. + */ + public GrpcConfiguration withMaxMessageSize(int maxMessageSize) { + return new GrpcConfiguration( + deadline, + minNumGrpcChannels, + maxMessageSize, + keepAliveWithoutCalls, + keepAliveTimeoutMs, + keepAliveTimeMs); + } + + /** + * Whether keepalive will be performed when there are no outstanding requests on a connection. + * + * @return the boolean indicating whether to send keepalive pings without any active calls. + */ + public boolean getKeepAliveWithoutCalls() { + return keepAliveWithoutCalls; + } + + /** + * Copy constructor that updates whether keepalive will be performed when there are no outstanding + * requests on a connection. + * + *

NOTE: keep-alives are very important for long-lived server environments where there may be + * periods of time when the connection is idle. However, they are very problematic for lambda + * environments where the lambda runtime is continuously frozen and unfrozen, because the lambda + * may be frozen before the "ACK" is received from the server. This can cause the keep-alive to + * timeout even though the connection is completely healthy. Therefore, keep-alives should be + * disabled in lambda and similar environments. + * + * @param keepAliveWithoutCalls The new boolean indicating whether to send keepalive pings without + * any active calls. + * @return The updated GrpcConfiguration. + */ + public GrpcConfiguration withKeepAliveWithoutCalls(boolean keepAliveWithoutCalls) { + return new GrpcConfiguration( + deadline, + minNumGrpcChannels, + maxMessageSize, + keepAliveWithoutCalls, + keepAliveTimeoutMs, + keepAliveTimeMs); + } + + /** + * The time to wait for a keepalive ping response before considering the connection dead. + * + * @return the time to wait for a keepalive ping response before considering the connection dead. + */ + public int getKeepAliveTimeoutMs() { + return keepAliveTimeoutMs; + } + + /** + * Copy constructor that updates the time to wait for a keepalive ping response before considering + * the connection dead. + * + *

NOTE: keep-alives are very important for long-lived server environments where there may be + * periods of time when the connection is idle. However, they are very problematic for lambda + * environments where the lambda runtime is continuously frozen and unfrozen, because the lambda + * may be frozen before the "ACK" is received from the server. This can cause the keep-alive to + * timeout even though the connection is completely healthy. Therefore, keep-alives should be + * disabled in lambda and similar environments. + * + * @param keepAliveTimeoutMs The new time to wait for a keepalive ping response. + * @return The updated GrpcConfiguration. + */ + public GrpcConfiguration withKeepAliveTimeout(int keepAliveTimeoutMs) { + return new GrpcConfiguration( + deadline, + minNumGrpcChannels, + maxMessageSize, + keepAliveWithoutCalls, + keepAliveTimeoutMs, + keepAliveTimeMs); + } + + /** + * The time to wait between keepalive pings. + * + * @return the time to wait between keepalive pings. + */ + public int getKeepAliveTimeMs() { + return keepAliveTimeMs; + } + + /** + * Copy constructor that updates the time to wait between keepalive pings. + * + *

NOTE: keep-alives are very important for long-lived server environments where there may be + * periods of time when the connection is idle. However, they are very problematic for lambda + * environments where the lambda runtime is continuously frozen and unfrozen, because the lambda + * may be frozen before the "ACK" is received from the server. This can cause the keep-alive to + * timeout even though the connection is completely healthy. Therefore, keep-alives should be + * disabled in lambda and similar environments. + * + * @param keepAliveTimeMs The new time to wait between keepalive pings. + * @return The updated GrpcConfiguration. + */ + public GrpcConfiguration withKeepAliveTime(int keepAliveTimeMs) { + return new GrpcConfiguration( + deadline, + minNumGrpcChannels, + maxMessageSize, + keepAliveWithoutCalls, + keepAliveTimeoutMs, + keepAliveTimeMs); } } diff --git a/momento-sdk/src/main/java/momento/sdk/internal/GrpcChannelOptions.java b/momento-sdk/src/main/java/momento/sdk/internal/GrpcChannelOptions.java new file mode 100644 index 00000000..d63cc2d9 --- /dev/null +++ b/momento-sdk/src/main/java/momento/sdk/internal/GrpcChannelOptions.java @@ -0,0 +1,38 @@ +package momento.sdk.internal; + +import io.grpc.netty.shaded.io.grpc.netty.NettyChannelBuilder; +import java.util.concurrent.TimeUnit; +import momento.sdk.config.transport.GrpcConfiguration; + +public class GrpcChannelOptions { + // The default value for max_send_message_length is 4mb. We need to increase this to 5mb in order + // to + // support cases where users have requested a limit increase up to our maximum item size of 5mb. + public static final int DEFAULT_MAX_MESSAGE_SIZE = 5_243_000; // bytes + + public static final boolean DEFAULT_KEEPALIVE_WITHOUT_STREAM = true; + public static final int DEFAULT_KEEPALIVE_TIME_MS = 5000; // milliseconds + public static final int DEFAULT_KEEPALIVE_TIMEOUT_MS = 1000; // milliseconds + + public static NettyChannelBuilder GrpcOptionsFromGrpcConfig( + GrpcConfiguration grpcConfig, NettyChannelBuilder channelBuilder) { + channelBuilder.useTransportSecurity(); + channelBuilder.disableRetry(); + channelBuilder.maxInboundMessageSize(grpcConfig.getMaxMessageSize()); + // no equivalent for maxOutboundboundMessageSize + + if (grpcConfig.getKeepAliveTimeMs() > 0) { + channelBuilder.keepAliveTime(grpcConfig.getKeepAliveTimeMs(), TimeUnit.MILLISECONDS); + } + + if (grpcConfig.getKeepAliveTimeoutMs() > 0) { + channelBuilder.keepAliveTimeout(grpcConfig.getKeepAliveTimeoutMs(), TimeUnit.MILLISECONDS); + } + + if (!grpcConfig.getKeepAliveWithoutCalls()) { + channelBuilder.keepAliveWithoutCalls(false); + } + + return channelBuilder; + } +} diff --git a/momento-sdk/src/test/java/momento/sdk/Configurations.java b/momento-sdk/src/test/java/momento/sdk/Configurations.java new file mode 100644 index 00000000..c1839f97 --- /dev/null +++ b/momento-sdk/src/test/java/momento/sdk/Configurations.java @@ -0,0 +1,39 @@ +package momento.sdk; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import momento.sdk.config.Configuration; +import momento.sdk.config.TopicConfiguration; +import momento.sdk.config.transport.GrpcConfiguration; +import org.junit.jupiter.api.Test; + +public class Configurations { + @Test + public void testCacheLambdaConfigurationDisablesKeepalive() { + final Configuration config = momento.sdk.config.Configurations.Lambda.latest(); + final GrpcConfiguration grpcConfig = config.getTransportStrategy().getGrpcConfiguration(); + assertFalse(grpcConfig.getKeepAliveWithoutCalls()); + assertThat(grpcConfig.getKeepAliveTimeMs()).isEqualTo(0); + assertThat(grpcConfig.getKeepAliveTimeoutMs()).isEqualTo(0); + } + + @Test + public void testCacheLaptopConfigurationEnablesKeepalive() { + final Configuration config = momento.sdk.config.Configurations.Laptop.latest(); + final GrpcConfiguration grpcConfig = config.getTransportStrategy().getGrpcConfiguration(); + assertTrue(grpcConfig.getKeepAliveWithoutCalls()); + assertThat(grpcConfig.getKeepAliveTimeMs()).isEqualTo(5000); + assertThat(grpcConfig.getKeepAliveTimeoutMs()).isEqualTo(1000); + } + + @Test + public void testTopicsLaptopConfigurationEnablesKeepalive() { + final TopicConfiguration config = momento.sdk.config.TopicConfigurations.Laptop.latest(); + final GrpcConfiguration grpcConfig = config.getTransportStrategy().getGrpcConfiguration(); + assertTrue(grpcConfig.getKeepAliveWithoutCalls()); + assertThat(grpcConfig.getKeepAliveTimeMs()).isEqualTo(10000); + assertThat(grpcConfig.getKeepAliveTimeoutMs()).isEqualTo(5000); + } +} From ae33861d9a317eb99cd3490b8b115434155fc099 Mon Sep 17 00:00:00 2001 From: anitarua Date: Tue, 5 Mar 2024 15:52:03 -0800 Subject: [PATCH 2/6] add convenience copy constructor to disable all keepalive settings --- .../momento/sdk/ScsControlGrpcStubsManager.java | 7 +------ .../java/momento/sdk/config/Configurations.java | 5 +---- .../sdk/config/transport/GrpcConfiguration.java | 16 ++++++++++++++++ 3 files changed, 18 insertions(+), 10 deletions(-) diff --git a/momento-sdk/src/main/java/momento/sdk/ScsControlGrpcStubsManager.java b/momento-sdk/src/main/java/momento/sdk/ScsControlGrpcStubsManager.java index 1a5858e7..f90e531e 100644 --- a/momento-sdk/src/main/java/momento/sdk/ScsControlGrpcStubsManager.java +++ b/momento-sdk/src/main/java/momento/sdk/ScsControlGrpcStubsManager.java @@ -42,12 +42,7 @@ private static ManagedChannel setupConnection( // Override grpc config to disable keepalive for control clients final GrpcConfiguration controlConfig = - configuration - .getTransportStrategy() - .getGrpcConfiguration() - .withKeepAliveTime(0) - .withKeepAliveTimeout(0) - .withKeepAliveWithoutCalls(false); + configuration.getTransportStrategy().getGrpcConfiguration().withKeepAliveDisabled(); // set additional channel options (message size, keepalive, auth, etc) GrpcChannelOptions.GrpcOptionsFromGrpcConfig(controlConfig, channelBuilder); diff --git a/momento-sdk/src/main/java/momento/sdk/config/Configurations.java b/momento-sdk/src/main/java/momento/sdk/config/Configurations.java index 553835c7..928dae8b 100644 --- a/momento-sdk/src/main/java/momento/sdk/config/Configurations.java +++ b/momento-sdk/src/main/java/momento/sdk/config/Configurations.java @@ -148,10 +148,7 @@ private Lambda(TransportStrategy transportStrategy, RetryStrategy retryStrategy) */ public static Configuration latest() { final GrpcConfiguration grpcConfig = - new GrpcConfiguration(Duration.ofMillis(1100)) - .withKeepAliveTime(0) - .withKeepAliveTimeout(0) - .withKeepAliveWithoutCalls(false); + new GrpcConfiguration(Duration.ofMillis(1100)).withKeepAliveDisabled(); final TransportStrategy transportStrategy = new StaticTransportStrategy(grpcConfig); final RetryStrategy retryStrategy = new FixedCountRetryStrategy(DEFAULT_MAX_RETRIES, new DefaultRetryEligibilityStrategy()); diff --git a/momento-sdk/src/main/java/momento/sdk/config/transport/GrpcConfiguration.java b/momento-sdk/src/main/java/momento/sdk/config/transport/GrpcConfiguration.java index f4cb0a40..c4866bc8 100644 --- a/momento-sdk/src/main/java/momento/sdk/config/transport/GrpcConfiguration.java +++ b/momento-sdk/src/main/java/momento/sdk/config/transport/GrpcConfiguration.java @@ -233,4 +233,20 @@ public GrpcConfiguration withKeepAliveTime(int keepAliveTimeMs) { keepAliveTimeoutMs, keepAliveTimeMs); } + + /** + * Copy constructor that disables all client-side keepalive settings. + * + *

NOTE: keep-alives are very important for long-lived server environments where there may be + * periods of time when the connection is idle. However, they are very problematic for lambda + * environments where the lambda runtime is continuously frozen and unfrozen, because the lambda + * may be frozen before the "ACK" is received from the server. This can cause the keep-alive to + * timeout even though the connection is completely healthy. Therefore, keep-alives should be + * disabled in lambda and similar environments. + * + * @return The updated GrpcConfiguration. + */ + public GrpcConfiguration withKeepAliveDisabled() { + return new GrpcConfiguration(deadline, minNumGrpcChannels, maxMessageSize, false, 0, 0); + } } From 782eccd07c638e912d049665496459e726dd9d15 Mon Sep 17 00:00:00 2001 From: anitarua Date: Tue, 5 Mar 2024 16:15:36 -0800 Subject: [PATCH 3/6] use returned channel builder --- .../main/java/momento/sdk/ScsControlGrpcStubsManager.java | 4 ++-- .../src/main/java/momento/sdk/ScsDataGrpcStubsManager.java | 7 ++++--- .../main/java/momento/sdk/ScsTokenGrpcStubsManager.java | 4 ++-- .../main/java/momento/sdk/ScsTopicGrpcStubsManager.java | 7 ++++--- 4 files changed, 12 insertions(+), 10 deletions(-) diff --git a/momento-sdk/src/main/java/momento/sdk/ScsControlGrpcStubsManager.java b/momento-sdk/src/main/java/momento/sdk/ScsControlGrpcStubsManager.java index f90e531e..0b161042 100644 --- a/momento-sdk/src/main/java/momento/sdk/ScsControlGrpcStubsManager.java +++ b/momento-sdk/src/main/java/momento/sdk/ScsControlGrpcStubsManager.java @@ -37,7 +37,7 @@ final class ScsControlGrpcStubsManager implements AutoCloseable { private static ManagedChannel setupConnection( CredentialProvider credentialProvider, Configuration configuration) { - final NettyChannelBuilder channelBuilder = + NettyChannelBuilder channelBuilder = NettyChannelBuilder.forAddress(credentialProvider.getControlEndpoint(), 443); // Override grpc config to disable keepalive for control clients @@ -45,7 +45,7 @@ private static ManagedChannel setupConnection( configuration.getTransportStrategy().getGrpcConfiguration().withKeepAliveDisabled(); // set additional channel options (message size, keepalive, auth, etc) - GrpcChannelOptions.GrpcOptionsFromGrpcConfig(controlConfig, channelBuilder); + channelBuilder = GrpcChannelOptions.GrpcOptionsFromGrpcConfig(controlConfig, channelBuilder); final List clientInterceptors = new ArrayList<>(); clientInterceptors.add(new UserHeaderInterceptor(credentialProvider.getAuthToken())); diff --git a/momento-sdk/src/main/java/momento/sdk/ScsDataGrpcStubsManager.java b/momento-sdk/src/main/java/momento/sdk/ScsDataGrpcStubsManager.java index d98b2479..453330b2 100644 --- a/momento-sdk/src/main/java/momento/sdk/ScsDataGrpcStubsManager.java +++ b/momento-sdk/src/main/java/momento/sdk/ScsDataGrpcStubsManager.java @@ -170,12 +170,13 @@ private static void eagerlyConnect( private ManagedChannel setupChannel( CredentialProvider credentialProvider, Configuration configuration) { - final NettyChannelBuilder channelBuilder = + NettyChannelBuilder channelBuilder = NettyChannelBuilder.forAddress(credentialProvider.getCacheEndpoint(), 443); // set additional channel options (message size, keepalive, auth, etc) - GrpcChannelOptions.GrpcOptionsFromGrpcConfig( - configuration.getTransportStrategy().getGrpcConfiguration(), channelBuilder); + channelBuilder = + GrpcChannelOptions.GrpcOptionsFromGrpcConfig( + configuration.getTransportStrategy().getGrpcConfiguration(), channelBuilder); final List clientInterceptors = new ArrayList<>(); clientInterceptors.add(new UserHeaderInterceptor(credentialProvider.getAuthToken())); diff --git a/momento-sdk/src/main/java/momento/sdk/ScsTokenGrpcStubsManager.java b/momento-sdk/src/main/java/momento/sdk/ScsTokenGrpcStubsManager.java index af974628..5b377482 100644 --- a/momento-sdk/src/main/java/momento/sdk/ScsTokenGrpcStubsManager.java +++ b/momento-sdk/src/main/java/momento/sdk/ScsTokenGrpcStubsManager.java @@ -34,7 +34,7 @@ final class ScsTokenGrpcStubsManager implements AutoCloseable { } private static ManagedChannel setupConnection(CredentialProvider credentialProvider) { - final NettyChannelBuilder channelBuilder = + NettyChannelBuilder channelBuilder = NettyChannelBuilder.forAddress(credentialProvider.getTokenEndpoint(), 443); // Note: This is hard-coded for now but we may want to expose it via configuration object @@ -42,7 +42,7 @@ private static ManagedChannel setupConnection(CredentialProvider credentialProvi final GrpcConfiguration grpcConfig = new GrpcConfiguration(Duration.ofMillis(15000)); // set additional channel options (message size, keepalive, auth, etc) - GrpcChannelOptions.GrpcOptionsFromGrpcConfig(grpcConfig, channelBuilder); + channelBuilder = GrpcChannelOptions.GrpcOptionsFromGrpcConfig(grpcConfig, channelBuilder); final List clientInterceptors = new ArrayList<>(); clientInterceptors.add(new UserHeaderInterceptor(credentialProvider.getAuthToken())); diff --git a/momento-sdk/src/main/java/momento/sdk/ScsTopicGrpcStubsManager.java b/momento-sdk/src/main/java/momento/sdk/ScsTopicGrpcStubsManager.java index 5a608bc2..2aa99230 100644 --- a/momento-sdk/src/main/java/momento/sdk/ScsTopicGrpcStubsManager.java +++ b/momento-sdk/src/main/java/momento/sdk/ScsTopicGrpcStubsManager.java @@ -35,12 +35,13 @@ final class ScsTopicGrpcStubsManager implements Closeable { private static ManagedChannel setupConnection( CredentialProvider credentialProvider, TopicConfiguration configuration) { - final NettyChannelBuilder channelBuilder = + NettyChannelBuilder channelBuilder = NettyChannelBuilder.forAddress(credentialProvider.getCacheEndpoint(), 443); // set additional channel options (message size, keepalive, auth, etc) - GrpcChannelOptions.GrpcOptionsFromGrpcConfig( - configuration.getTransportStrategy().getGrpcConfiguration(), channelBuilder); + channelBuilder = + GrpcChannelOptions.GrpcOptionsFromGrpcConfig( + configuration.getTransportStrategy().getGrpcConfiguration(), channelBuilder); final List clientInterceptors = new ArrayList<>(); clientInterceptors.add(new UserHeaderInterceptor(credentialProvider.getAuthToken())); From c0ec9406613d9c12bfaab815b62a356329953f7c Mon Sep 17 00:00:00 2001 From: anitarua Date: Wed, 6 Mar 2024 16:14:12 -0800 Subject: [PATCH 4/6] make grpc options builder function a side effects function --- .../main/java/momento/sdk/ScsControlGrpcStubsManager.java | 4 ++-- .../src/main/java/momento/sdk/ScsDataGrpcStubsManager.java | 7 +++---- .../main/java/momento/sdk/ScsTokenGrpcStubsManager.java | 4 ++-- .../main/java/momento/sdk/ScsTopicGrpcStubsManager.java | 7 +++---- .../main/java/momento/sdk/internal/GrpcChannelOptions.java | 4 +--- 5 files changed, 11 insertions(+), 15 deletions(-) diff --git a/momento-sdk/src/main/java/momento/sdk/ScsControlGrpcStubsManager.java b/momento-sdk/src/main/java/momento/sdk/ScsControlGrpcStubsManager.java index 0b161042..a9f3b0e3 100644 --- a/momento-sdk/src/main/java/momento/sdk/ScsControlGrpcStubsManager.java +++ b/momento-sdk/src/main/java/momento/sdk/ScsControlGrpcStubsManager.java @@ -37,7 +37,7 @@ final class ScsControlGrpcStubsManager implements AutoCloseable { private static ManagedChannel setupConnection( CredentialProvider credentialProvider, Configuration configuration) { - NettyChannelBuilder channelBuilder = + final NettyChannelBuilder channelBuilder = NettyChannelBuilder.forAddress(credentialProvider.getControlEndpoint(), 443); // Override grpc config to disable keepalive for control clients @@ -45,7 +45,7 @@ private static ManagedChannel setupConnection( configuration.getTransportStrategy().getGrpcConfiguration().withKeepAliveDisabled(); // set additional channel options (message size, keepalive, auth, etc) - channelBuilder = GrpcChannelOptions.GrpcOptionsFromGrpcConfig(controlConfig, channelBuilder); + GrpcChannelOptions.applyGrpcConfigurationToChannelBuilder(controlConfig, channelBuilder); final List clientInterceptors = new ArrayList<>(); clientInterceptors.add(new UserHeaderInterceptor(credentialProvider.getAuthToken())); diff --git a/momento-sdk/src/main/java/momento/sdk/ScsDataGrpcStubsManager.java b/momento-sdk/src/main/java/momento/sdk/ScsDataGrpcStubsManager.java index 453330b2..a433ad0a 100644 --- a/momento-sdk/src/main/java/momento/sdk/ScsDataGrpcStubsManager.java +++ b/momento-sdk/src/main/java/momento/sdk/ScsDataGrpcStubsManager.java @@ -170,13 +170,12 @@ private static void eagerlyConnect( private ManagedChannel setupChannel( CredentialProvider credentialProvider, Configuration configuration) { - NettyChannelBuilder channelBuilder = + final NettyChannelBuilder channelBuilder = NettyChannelBuilder.forAddress(credentialProvider.getCacheEndpoint(), 443); // set additional channel options (message size, keepalive, auth, etc) - channelBuilder = - GrpcChannelOptions.GrpcOptionsFromGrpcConfig( - configuration.getTransportStrategy().getGrpcConfiguration(), channelBuilder); + GrpcChannelOptions.applyGrpcConfigurationToChannelBuilder( + configuration.getTransportStrategy().getGrpcConfiguration(), channelBuilder); final List clientInterceptors = new ArrayList<>(); clientInterceptors.add(new UserHeaderInterceptor(credentialProvider.getAuthToken())); diff --git a/momento-sdk/src/main/java/momento/sdk/ScsTokenGrpcStubsManager.java b/momento-sdk/src/main/java/momento/sdk/ScsTokenGrpcStubsManager.java index 5b377482..a86de78b 100644 --- a/momento-sdk/src/main/java/momento/sdk/ScsTokenGrpcStubsManager.java +++ b/momento-sdk/src/main/java/momento/sdk/ScsTokenGrpcStubsManager.java @@ -34,7 +34,7 @@ final class ScsTokenGrpcStubsManager implements AutoCloseable { } private static ManagedChannel setupConnection(CredentialProvider credentialProvider) { - NettyChannelBuilder channelBuilder = + final NettyChannelBuilder channelBuilder = NettyChannelBuilder.forAddress(credentialProvider.getTokenEndpoint(), 443); // Note: This is hard-coded for now but we may want to expose it via configuration object @@ -42,7 +42,7 @@ private static ManagedChannel setupConnection(CredentialProvider credentialProvi final GrpcConfiguration grpcConfig = new GrpcConfiguration(Duration.ofMillis(15000)); // set additional channel options (message size, keepalive, auth, etc) - channelBuilder = GrpcChannelOptions.GrpcOptionsFromGrpcConfig(grpcConfig, channelBuilder); + GrpcChannelOptions.applyGrpcConfigurationToChannelBuilder(grpcConfig, channelBuilder); final List clientInterceptors = new ArrayList<>(); clientInterceptors.add(new UserHeaderInterceptor(credentialProvider.getAuthToken())); diff --git a/momento-sdk/src/main/java/momento/sdk/ScsTopicGrpcStubsManager.java b/momento-sdk/src/main/java/momento/sdk/ScsTopicGrpcStubsManager.java index 2aa99230..8c304ce3 100644 --- a/momento-sdk/src/main/java/momento/sdk/ScsTopicGrpcStubsManager.java +++ b/momento-sdk/src/main/java/momento/sdk/ScsTopicGrpcStubsManager.java @@ -35,13 +35,12 @@ final class ScsTopicGrpcStubsManager implements Closeable { private static ManagedChannel setupConnection( CredentialProvider credentialProvider, TopicConfiguration configuration) { - NettyChannelBuilder channelBuilder = + final NettyChannelBuilder channelBuilder = NettyChannelBuilder.forAddress(credentialProvider.getCacheEndpoint(), 443); // set additional channel options (message size, keepalive, auth, etc) - channelBuilder = - GrpcChannelOptions.GrpcOptionsFromGrpcConfig( - configuration.getTransportStrategy().getGrpcConfiguration(), channelBuilder); + GrpcChannelOptions.applyGrpcConfigurationToChannelBuilder( + configuration.getTransportStrategy().getGrpcConfiguration(), channelBuilder); final List clientInterceptors = new ArrayList<>(); clientInterceptors.add(new UserHeaderInterceptor(credentialProvider.getAuthToken())); diff --git a/momento-sdk/src/main/java/momento/sdk/internal/GrpcChannelOptions.java b/momento-sdk/src/main/java/momento/sdk/internal/GrpcChannelOptions.java index d63cc2d9..380a713e 100644 --- a/momento-sdk/src/main/java/momento/sdk/internal/GrpcChannelOptions.java +++ b/momento-sdk/src/main/java/momento/sdk/internal/GrpcChannelOptions.java @@ -14,7 +14,7 @@ public class GrpcChannelOptions { public static final int DEFAULT_KEEPALIVE_TIME_MS = 5000; // milliseconds public static final int DEFAULT_KEEPALIVE_TIMEOUT_MS = 1000; // milliseconds - public static NettyChannelBuilder GrpcOptionsFromGrpcConfig( + public static void applyGrpcConfigurationToChannelBuilder( GrpcConfiguration grpcConfig, NettyChannelBuilder channelBuilder) { channelBuilder.useTransportSecurity(); channelBuilder.disableRetry(); @@ -32,7 +32,5 @@ public static NettyChannelBuilder GrpcOptionsFromGrpcConfig( if (!grpcConfig.getKeepAliveWithoutCalls()) { channelBuilder.keepAliveWithoutCalls(false); } - - return channelBuilder; } } From ef75997802788fa5c2d7e9689f8e0d62e56d2db8 Mon Sep 17 00:00:00 2001 From: anitarua Date: Wed, 6 Mar 2024 16:40:42 -0800 Subject: [PATCH 5/6] use OptionalInt for specifying options --- .../config/transport/GrpcConfiguration.java | 39 +++++++++++-------- .../sdk/internal/GrpcChannelOptions.java | 18 ++++++--- .../test/java/momento/sdk/Configurations.java | 12 +++--- 3 files changed, 42 insertions(+), 27 deletions(-) diff --git a/momento-sdk/src/main/java/momento/sdk/config/transport/GrpcConfiguration.java b/momento-sdk/src/main/java/momento/sdk/config/transport/GrpcConfiguration.java index c4866bc8..f11f9b44 100644 --- a/momento-sdk/src/main/java/momento/sdk/config/transport/GrpcConfiguration.java +++ b/momento-sdk/src/main/java/momento/sdk/config/transport/GrpcConfiguration.java @@ -3,6 +3,7 @@ import static momento.sdk.ValidationUtils.ensureRequestDeadlineValid; import java.time.Duration; +import java.util.OptionalInt; import javax.annotation.Nonnull; import momento.sdk.internal.GrpcChannelOptions; @@ -11,10 +12,10 @@ public class GrpcConfiguration { private final Duration deadline; private final int minNumGrpcChannels; - private final int maxMessageSize; + private final OptionalInt maxMessageSize; private final boolean keepAliveWithoutCalls; - private final int keepAliveTimeoutMs; - private final int keepAliveTimeMs; + private final OptionalInt keepAliveTimeoutMs; + private final OptionalInt keepAliveTimeMs; /** * Constructs a GrpcConfiguration. @@ -25,10 +26,10 @@ public GrpcConfiguration(@Nonnull Duration deadline) { this( deadline, 1, - GrpcChannelOptions.DEFAULT_MAX_MESSAGE_SIZE, + OptionalInt.of(GrpcChannelOptions.DEFAULT_MAX_MESSAGE_SIZE), GrpcChannelOptions.DEFAULT_KEEPALIVE_WITHOUT_STREAM, - GrpcChannelOptions.DEFAULT_KEEPALIVE_TIMEOUT_MS, - GrpcChannelOptions.DEFAULT_KEEPALIVE_TIME_MS); + OptionalInt.of(GrpcChannelOptions.DEFAULT_KEEPALIVE_TIMEOUT_MS), + OptionalInt.of(GrpcChannelOptions.DEFAULT_KEEPALIVE_TIME_MS)); } /** @@ -46,10 +47,10 @@ public GrpcConfiguration(@Nonnull Duration deadline) { public GrpcConfiguration( @Nonnull Duration deadline, int minNumGrpcChannels, - int maxMessageSize, + OptionalInt maxMessageSize, boolean keepAliveWithoutCalls, - int keepAliveTimeout, - int keepAliveTime) { + OptionalInt keepAliveTimeout, + OptionalInt keepAliveTime) { ensureRequestDeadlineValid(deadline); this.deadline = deadline; this.minNumGrpcChannels = minNumGrpcChannels; @@ -115,7 +116,7 @@ public GrpcConfiguration withMinNumGrpcChannels(int minNumGrpcChannels) { * * @return the maximum message size. */ - public int getMaxMessageSize() { + public OptionalInt getMaxMessageSize() { return maxMessageSize; } @@ -129,7 +130,7 @@ public GrpcConfiguration withMaxMessageSize(int maxMessageSize) { return new GrpcConfiguration( deadline, minNumGrpcChannels, - maxMessageSize, + OptionalInt.of(maxMessageSize), keepAliveWithoutCalls, keepAliveTimeoutMs, keepAliveTimeMs); @@ -174,7 +175,7 @@ public GrpcConfiguration withKeepAliveWithoutCalls(boolean keepAliveWithoutCalls * * @return the time to wait for a keepalive ping response before considering the connection dead. */ - public int getKeepAliveTimeoutMs() { + public OptionalInt getKeepAliveTimeoutMs() { return keepAliveTimeoutMs; } @@ -198,7 +199,7 @@ public GrpcConfiguration withKeepAliveTimeout(int keepAliveTimeoutMs) { minNumGrpcChannels, maxMessageSize, keepAliveWithoutCalls, - keepAliveTimeoutMs, + OptionalInt.of(keepAliveTimeoutMs), keepAliveTimeMs); } @@ -207,7 +208,7 @@ public GrpcConfiguration withKeepAliveTimeout(int keepAliveTimeoutMs) { * * @return the time to wait between keepalive pings. */ - public int getKeepAliveTimeMs() { + public OptionalInt getKeepAliveTimeMs() { return keepAliveTimeMs; } @@ -231,7 +232,7 @@ public GrpcConfiguration withKeepAliveTime(int keepAliveTimeMs) { maxMessageSize, keepAliveWithoutCalls, keepAliveTimeoutMs, - keepAliveTimeMs); + OptionalInt.of(keepAliveTimeMs)); } /** @@ -247,6 +248,12 @@ public GrpcConfiguration withKeepAliveTime(int keepAliveTimeMs) { * @return The updated GrpcConfiguration. */ public GrpcConfiguration withKeepAliveDisabled() { - return new GrpcConfiguration(deadline, minNumGrpcChannels, maxMessageSize, false, 0, 0); + return new GrpcConfiguration( + deadline, + minNumGrpcChannels, + maxMessageSize, + false, + OptionalInt.empty(), + OptionalInt.empty()); } } diff --git a/momento-sdk/src/main/java/momento/sdk/internal/GrpcChannelOptions.java b/momento-sdk/src/main/java/momento/sdk/internal/GrpcChannelOptions.java index 380a713e..68fb92e0 100644 --- a/momento-sdk/src/main/java/momento/sdk/internal/GrpcChannelOptions.java +++ b/momento-sdk/src/main/java/momento/sdk/internal/GrpcChannelOptions.java @@ -1,6 +1,7 @@ package momento.sdk.internal; import io.grpc.netty.shaded.io.grpc.netty.NettyChannelBuilder; +import java.util.OptionalInt; import java.util.concurrent.TimeUnit; import momento.sdk.config.transport.GrpcConfiguration; @@ -18,15 +19,22 @@ public static void applyGrpcConfigurationToChannelBuilder( GrpcConfiguration grpcConfig, NettyChannelBuilder channelBuilder) { channelBuilder.useTransportSecurity(); channelBuilder.disableRetry(); - channelBuilder.maxInboundMessageSize(grpcConfig.getMaxMessageSize()); + + final OptionalInt maxMessageSize = grpcConfig.getMaxMessageSize(); + if (maxMessageSize.isPresent()) { + channelBuilder.maxInboundMessageSize(maxMessageSize.getAsInt()); + } + // no equivalent for maxOutboundboundMessageSize - if (grpcConfig.getKeepAliveTimeMs() > 0) { - channelBuilder.keepAliveTime(grpcConfig.getKeepAliveTimeMs(), TimeUnit.MILLISECONDS); + final OptionalInt keepAliveTimeMs = grpcConfig.getKeepAliveTimeMs(); + if (keepAliveTimeMs.isPresent()) { + channelBuilder.keepAliveTime(keepAliveTimeMs.getAsInt(), TimeUnit.MILLISECONDS); } - if (grpcConfig.getKeepAliveTimeoutMs() > 0) { - channelBuilder.keepAliveTimeout(grpcConfig.getKeepAliveTimeoutMs(), TimeUnit.MILLISECONDS); + final OptionalInt keepAliveTimeoutMs = grpcConfig.getKeepAliveTimeoutMs(); + if (keepAliveTimeoutMs.isPresent()) { + channelBuilder.keepAliveTimeout(keepAliveTimeoutMs.getAsInt(), TimeUnit.MILLISECONDS); } if (!grpcConfig.getKeepAliveWithoutCalls()) { diff --git a/momento-sdk/src/test/java/momento/sdk/Configurations.java b/momento-sdk/src/test/java/momento/sdk/Configurations.java index c1839f97..9de61033 100644 --- a/momento-sdk/src/test/java/momento/sdk/Configurations.java +++ b/momento-sdk/src/test/java/momento/sdk/Configurations.java @@ -15,8 +15,8 @@ public void testCacheLambdaConfigurationDisablesKeepalive() { final Configuration config = momento.sdk.config.Configurations.Lambda.latest(); final GrpcConfiguration grpcConfig = config.getTransportStrategy().getGrpcConfiguration(); assertFalse(grpcConfig.getKeepAliveWithoutCalls()); - assertThat(grpcConfig.getKeepAliveTimeMs()).isEqualTo(0); - assertThat(grpcConfig.getKeepAliveTimeoutMs()).isEqualTo(0); + assertTrue(grpcConfig.getKeepAliveTimeMs().isEmpty()); + assertTrue(grpcConfig.getKeepAliveTimeoutMs().isEmpty()); } @Test @@ -24,8 +24,8 @@ public void testCacheLaptopConfigurationEnablesKeepalive() { final Configuration config = momento.sdk.config.Configurations.Laptop.latest(); final GrpcConfiguration grpcConfig = config.getTransportStrategy().getGrpcConfiguration(); assertTrue(grpcConfig.getKeepAliveWithoutCalls()); - assertThat(grpcConfig.getKeepAliveTimeMs()).isEqualTo(5000); - assertThat(grpcConfig.getKeepAliveTimeoutMs()).isEqualTo(1000); + assertThat(grpcConfig.getKeepAliveTimeMs().getAsInt()).isEqualTo(5000); + assertThat(grpcConfig.getKeepAliveTimeoutMs().getAsInt()).isEqualTo(1000); } @Test @@ -33,7 +33,7 @@ public void testTopicsLaptopConfigurationEnablesKeepalive() { final TopicConfiguration config = momento.sdk.config.TopicConfigurations.Laptop.latest(); final GrpcConfiguration grpcConfig = config.getTransportStrategy().getGrpcConfiguration(); assertTrue(grpcConfig.getKeepAliveWithoutCalls()); - assertThat(grpcConfig.getKeepAliveTimeMs()).isEqualTo(10000); - assertThat(grpcConfig.getKeepAliveTimeoutMs()).isEqualTo(5000); + assertThat(grpcConfig.getKeepAliveTimeMs().getAsInt()).isEqualTo(10000); + assertThat(grpcConfig.getKeepAliveTimeoutMs().getAsInt()).isEqualTo(5000); } } From 77fa9ca31970a780e3600c38415d4652696b0e59 Mon Sep 17 00:00:00 2001 From: anitarua Date: Thu, 7 Mar 2024 10:46:20 -0800 Subject: [PATCH 6/6] use Optionals with type classes not primitive ints --- .../config/transport/GrpcConfiguration.java | 52 +++++++++---------- .../sdk/internal/GrpcChannelOptions.java | 19 +++---- .../test/java/momento/sdk/Configurations.java | 15 +++--- 3 files changed, 43 insertions(+), 43 deletions(-) diff --git a/momento-sdk/src/main/java/momento/sdk/config/transport/GrpcConfiguration.java b/momento-sdk/src/main/java/momento/sdk/config/transport/GrpcConfiguration.java index f11f9b44..5ccf7fd4 100644 --- a/momento-sdk/src/main/java/momento/sdk/config/transport/GrpcConfiguration.java +++ b/momento-sdk/src/main/java/momento/sdk/config/transport/GrpcConfiguration.java @@ -3,7 +3,7 @@ import static momento.sdk.ValidationUtils.ensureRequestDeadlineValid; import java.time.Duration; -import java.util.OptionalInt; +import java.util.Optional; import javax.annotation.Nonnull; import momento.sdk.internal.GrpcChannelOptions; @@ -12,10 +12,10 @@ public class GrpcConfiguration { private final Duration deadline; private final int minNumGrpcChannels; - private final OptionalInt maxMessageSize; - private final boolean keepAliveWithoutCalls; - private final OptionalInt keepAliveTimeoutMs; - private final OptionalInt keepAliveTimeMs; + private final Optional maxMessageSize; + private final Optional keepAliveWithoutCalls; + private final Optional keepAliveTimeoutMs; + private final Optional keepAliveTimeMs; /** * Constructs a GrpcConfiguration. @@ -26,10 +26,10 @@ public GrpcConfiguration(@Nonnull Duration deadline) { this( deadline, 1, - OptionalInt.of(GrpcChannelOptions.DEFAULT_MAX_MESSAGE_SIZE), - GrpcChannelOptions.DEFAULT_KEEPALIVE_WITHOUT_STREAM, - OptionalInt.of(GrpcChannelOptions.DEFAULT_KEEPALIVE_TIMEOUT_MS), - OptionalInt.of(GrpcChannelOptions.DEFAULT_KEEPALIVE_TIME_MS)); + Optional.of(GrpcChannelOptions.DEFAULT_MAX_MESSAGE_SIZE), + Optional.of(GrpcChannelOptions.DEFAULT_KEEPALIVE_WITHOUT_STREAM), + Optional.of(GrpcChannelOptions.DEFAULT_KEEPALIVE_TIMEOUT_MS), + Optional.of(GrpcChannelOptions.DEFAULT_KEEPALIVE_TIME_MS)); } /** @@ -47,10 +47,10 @@ public GrpcConfiguration(@Nonnull Duration deadline) { public GrpcConfiguration( @Nonnull Duration deadline, int minNumGrpcChannels, - OptionalInt maxMessageSize, - boolean keepAliveWithoutCalls, - OptionalInt keepAliveTimeout, - OptionalInt keepAliveTime) { + Optional maxMessageSize, + Optional keepAliveWithoutCalls, + Optional keepAliveTimeout, + Optional keepAliveTime) { ensureRequestDeadlineValid(deadline); this.deadline = deadline; this.minNumGrpcChannels = minNumGrpcChannels; @@ -116,7 +116,7 @@ public GrpcConfiguration withMinNumGrpcChannels(int minNumGrpcChannels) { * * @return the maximum message size. */ - public OptionalInt getMaxMessageSize() { + public Optional getMaxMessageSize() { return maxMessageSize; } @@ -130,7 +130,7 @@ public GrpcConfiguration withMaxMessageSize(int maxMessageSize) { return new GrpcConfiguration( deadline, minNumGrpcChannels, - OptionalInt.of(maxMessageSize), + Optional.of(maxMessageSize), keepAliveWithoutCalls, keepAliveTimeoutMs, keepAliveTimeMs); @@ -141,7 +141,7 @@ public GrpcConfiguration withMaxMessageSize(int maxMessageSize) { * * @return the boolean indicating whether to send keepalive pings without any active calls. */ - public boolean getKeepAliveWithoutCalls() { + public Optional getKeepAliveWithoutCalls() { return keepAliveWithoutCalls; } @@ -156,11 +156,11 @@ public boolean getKeepAliveWithoutCalls() { * timeout even though the connection is completely healthy. Therefore, keep-alives should be * disabled in lambda and similar environments. * - * @param keepAliveWithoutCalls The new boolean indicating whether to send keepalive pings without - * any active calls. + * @param keepAliveWithoutCalls The boolean indicating whether to send keepalive pings without any + * active calls. * @return The updated GrpcConfiguration. */ - public GrpcConfiguration withKeepAliveWithoutCalls(boolean keepAliveWithoutCalls) { + public GrpcConfiguration withKeepAliveWithoutCalls(Optional keepAliveWithoutCalls) { return new GrpcConfiguration( deadline, minNumGrpcChannels, @@ -175,7 +175,7 @@ public GrpcConfiguration withKeepAliveWithoutCalls(boolean keepAliveWithoutCalls * * @return the time to wait for a keepalive ping response before considering the connection dead. */ - public OptionalInt getKeepAliveTimeoutMs() { + public Optional getKeepAliveTimeoutMs() { return keepAliveTimeoutMs; } @@ -199,7 +199,7 @@ public GrpcConfiguration withKeepAliveTimeout(int keepAliveTimeoutMs) { minNumGrpcChannels, maxMessageSize, keepAliveWithoutCalls, - OptionalInt.of(keepAliveTimeoutMs), + Optional.of(keepAliveTimeoutMs), keepAliveTimeMs); } @@ -208,7 +208,7 @@ public GrpcConfiguration withKeepAliveTimeout(int keepAliveTimeoutMs) { * * @return the time to wait between keepalive pings. */ - public OptionalInt getKeepAliveTimeMs() { + public Optional getKeepAliveTimeMs() { return keepAliveTimeMs; } @@ -232,7 +232,7 @@ public GrpcConfiguration withKeepAliveTime(int keepAliveTimeMs) { maxMessageSize, keepAliveWithoutCalls, keepAliveTimeoutMs, - OptionalInt.of(keepAliveTimeMs)); + Optional.of(keepAliveTimeMs)); } /** @@ -252,8 +252,8 @@ public GrpcConfiguration withKeepAliveDisabled() { deadline, minNumGrpcChannels, maxMessageSize, - false, - OptionalInt.empty(), - OptionalInt.empty()); + Optional.empty(), + Optional.empty(), + Optional.empty()); } } diff --git a/momento-sdk/src/main/java/momento/sdk/internal/GrpcChannelOptions.java b/momento-sdk/src/main/java/momento/sdk/internal/GrpcChannelOptions.java index 68fb92e0..fe86e721 100644 --- a/momento-sdk/src/main/java/momento/sdk/internal/GrpcChannelOptions.java +++ b/momento-sdk/src/main/java/momento/sdk/internal/GrpcChannelOptions.java @@ -1,7 +1,7 @@ package momento.sdk.internal; import io.grpc.netty.shaded.io.grpc.netty.NettyChannelBuilder; -import java.util.OptionalInt; +import java.util.Optional; import java.util.concurrent.TimeUnit; import momento.sdk.config.transport.GrpcConfiguration; @@ -20,25 +20,26 @@ public static void applyGrpcConfigurationToChannelBuilder( channelBuilder.useTransportSecurity(); channelBuilder.disableRetry(); - final OptionalInt maxMessageSize = grpcConfig.getMaxMessageSize(); + final Optional maxMessageSize = grpcConfig.getMaxMessageSize(); if (maxMessageSize.isPresent()) { - channelBuilder.maxInboundMessageSize(maxMessageSize.getAsInt()); + channelBuilder.maxInboundMessageSize(maxMessageSize.get()); } // no equivalent for maxOutboundboundMessageSize - final OptionalInt keepAliveTimeMs = grpcConfig.getKeepAliveTimeMs(); + final Optional keepAliveTimeMs = grpcConfig.getKeepAliveTimeMs(); if (keepAliveTimeMs.isPresent()) { - channelBuilder.keepAliveTime(keepAliveTimeMs.getAsInt(), TimeUnit.MILLISECONDS); + channelBuilder.keepAliveTime(keepAliveTimeMs.get(), TimeUnit.MILLISECONDS); } - final OptionalInt keepAliveTimeoutMs = grpcConfig.getKeepAliveTimeoutMs(); + final Optional keepAliveTimeoutMs = grpcConfig.getKeepAliveTimeoutMs(); if (keepAliveTimeoutMs.isPresent()) { - channelBuilder.keepAliveTimeout(keepAliveTimeoutMs.getAsInt(), TimeUnit.MILLISECONDS); + channelBuilder.keepAliveTimeout(keepAliveTimeoutMs.get(), TimeUnit.MILLISECONDS); } - if (!grpcConfig.getKeepAliveWithoutCalls()) { - channelBuilder.keepAliveWithoutCalls(false); + final Optional keepAliveWithoutCalls = grpcConfig.getKeepAliveWithoutCalls(); + if (keepAliveWithoutCalls.isPresent()) { + channelBuilder.keepAliveWithoutCalls(keepAliveWithoutCalls.get()); } } } diff --git a/momento-sdk/src/test/java/momento/sdk/Configurations.java b/momento-sdk/src/test/java/momento/sdk/Configurations.java index 9de61033..8cdfabfe 100644 --- a/momento-sdk/src/test/java/momento/sdk/Configurations.java +++ b/momento-sdk/src/test/java/momento/sdk/Configurations.java @@ -1,7 +1,6 @@ package momento.sdk; import static org.assertj.core.api.Assertions.assertThat; -import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; import momento.sdk.config.Configuration; @@ -14,7 +13,7 @@ public class Configurations { public void testCacheLambdaConfigurationDisablesKeepalive() { final Configuration config = momento.sdk.config.Configurations.Lambda.latest(); final GrpcConfiguration grpcConfig = config.getTransportStrategy().getGrpcConfiguration(); - assertFalse(grpcConfig.getKeepAliveWithoutCalls()); + assertThat(grpcConfig.getKeepAliveWithoutCalls().isEmpty()); assertTrue(grpcConfig.getKeepAliveTimeMs().isEmpty()); assertTrue(grpcConfig.getKeepAliveTimeoutMs().isEmpty()); } @@ -23,17 +22,17 @@ public void testCacheLambdaConfigurationDisablesKeepalive() { public void testCacheLaptopConfigurationEnablesKeepalive() { final Configuration config = momento.sdk.config.Configurations.Laptop.latest(); final GrpcConfiguration grpcConfig = config.getTransportStrategy().getGrpcConfiguration(); - assertTrue(grpcConfig.getKeepAliveWithoutCalls()); - assertThat(grpcConfig.getKeepAliveTimeMs().getAsInt()).isEqualTo(5000); - assertThat(grpcConfig.getKeepAliveTimeoutMs().getAsInt()).isEqualTo(1000); + assertTrue(grpcConfig.getKeepAliveWithoutCalls().get()); + assertThat(grpcConfig.getKeepAliveTimeMs().get()).isEqualTo(5000); + assertThat(grpcConfig.getKeepAliveTimeoutMs().get()).isEqualTo(1000); } @Test public void testTopicsLaptopConfigurationEnablesKeepalive() { final TopicConfiguration config = momento.sdk.config.TopicConfigurations.Laptop.latest(); final GrpcConfiguration grpcConfig = config.getTransportStrategy().getGrpcConfiguration(); - assertTrue(grpcConfig.getKeepAliveWithoutCalls()); - assertThat(grpcConfig.getKeepAliveTimeMs().getAsInt()).isEqualTo(10000); - assertThat(grpcConfig.getKeepAliveTimeoutMs().getAsInt()).isEqualTo(5000); + assertTrue(grpcConfig.getKeepAliveWithoutCalls().get()); + assertThat(grpcConfig.getKeepAliveTimeMs().get()).isEqualTo(10000); + assertThat(grpcConfig.getKeepAliveTimeoutMs().get()).isEqualTo(5000); } }