Skip to content

Commit

Permalink
feat: add grpc config options and turn off keepalive for Lambda config
Browse files Browse the repository at this point in the history
  • Loading branch information
anitarua committed Mar 5, 2024
1 parent 167a738 commit 243bce0
Show file tree
Hide file tree
Showing 12 changed files with 323 additions and 32 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -45,3 +45,5 @@ examples/lib/.settings
examples/lib/.classpath
examples/lib/.project
examples/lib/bin
.vscode/
*logback.xml
2 changes: 1 addition & 1 deletion momento-sdk/src/main/java/momento/sdk/CacheClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public CacheClient(
@Nonnull CredentialProvider credentialProvider,
@Nonnull Configuration configuration,
@Nonnull Duration itemDefaultTtl) {
this.scsControlClient = new ScsControlClient(credentialProvider);
this.scsControlClient = new ScsControlClient(credentialProvider, configuration);
this.scsDataClient = new ScsDataClient(credentialProvider, configuration, itemDefaultTtl);

logger.info("Creating Momento Cache Client");
Expand Down
6 changes: 4 additions & 2 deletions momento-sdk/src/main/java/momento/sdk/ScsControlClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import momento.sdk.auth.CredentialProvider;
import momento.sdk.config.Configuration;
import momento.sdk.exceptions.CacheServiceExceptionMapper;
import momento.sdk.exceptions.InternalServerException;
import momento.sdk.responses.cache.control.CacheCreateResponse;
Expand All @@ -48,9 +49,10 @@ final class ScsControlClient extends ScsClient {
private final CredentialProvider credentialProvider;
private final ScsControlGrpcStubsManager controlGrpcStubsManager;

ScsControlClient(@Nonnull CredentialProvider credentialProvider) {
ScsControlClient(@Nonnull CredentialProvider credentialProvider, Configuration configuration) {
this.credentialProvider = credentialProvider;
this.controlGrpcStubsManager = new ScsControlGrpcStubsManager(credentialProvider);
this.controlGrpcStubsManager =
new ScsControlGrpcStubsManager(credentialProvider, configuration);
}

CompletableFuture<CacheCreateResponse> createCache(String cacheName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@
import java.util.concurrent.TimeUnit;
import javax.annotation.Nonnull;
import momento.sdk.auth.CredentialProvider;
import momento.sdk.config.Configuration;
import momento.sdk.config.transport.GrpcConfiguration;
import momento.sdk.internal.GrpcChannelOptions;

/**
* Manager responsible for GRPC channels and stubs for the Control Plane.
Expand All @@ -26,16 +29,29 @@ final class ScsControlGrpcStubsManager implements AutoCloseable {

private final ScsControlGrpc.ScsControlFutureStub futureStub;

ScsControlGrpcStubsManager(@Nonnull CredentialProvider credentialProvider) {
this.channel = setupConnection(credentialProvider);
ScsControlGrpcStubsManager(
@Nonnull CredentialProvider credentialProvider, Configuration configuration) {
this.channel = setupConnection(credentialProvider, configuration);
this.futureStub = ScsControlGrpc.newFutureStub(channel);
}

private static ManagedChannel setupConnection(CredentialProvider credentialProvider) {
private static ManagedChannel setupConnection(
CredentialProvider credentialProvider, Configuration configuration) {
final NettyChannelBuilder channelBuilder =
NettyChannelBuilder.forAddress(credentialProvider.getControlEndpoint(), 443);
channelBuilder.useTransportSecurity();
channelBuilder.disableRetry();

// Override grpc config to disable keepalive for control clients
final GrpcConfiguration controlConfig =
configuration
.getTransportStrategy()
.getGrpcConfiguration()
.withKeepAliveTime(0)
.withKeepAliveTimeout(0)
.withKeepAliveWithoutCalls(false);

// set additional channel options (message size, keepalive, auth, etc)
GrpcChannelOptions.GrpcOptionsFromGrpcConfig(controlConfig, channelBuilder);

final List<ClientInterceptor> clientInterceptors = new ArrayList<>();
clientInterceptors.add(new UserHeaderInterceptor(credentialProvider.getAuthToken()));
channelBuilder.intercept(clientInterceptors);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import javax.annotation.Nonnull;
import momento.sdk.auth.CredentialProvider;
import momento.sdk.config.Configuration;
import momento.sdk.internal.GrpcChannelOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -171,8 +172,11 @@ private ManagedChannel setupChannel(
CredentialProvider credentialProvider, Configuration configuration) {
final NettyChannelBuilder channelBuilder =
NettyChannelBuilder.forAddress(credentialProvider.getCacheEndpoint(), 443);
channelBuilder.useTransportSecurity();
channelBuilder.disableRetry();

// set additional channel options (message size, keepalive, auth, etc)
GrpcChannelOptions.GrpcOptionsFromGrpcConfig(
configuration.getTransportStrategy().getGrpcConfiguration(), channelBuilder);

final List<ClientInterceptor> clientInterceptors = new ArrayList<>();
clientInterceptors.add(new UserHeaderInterceptor(credentialProvider.getAuthToken()));
clientInterceptors.add(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
import java.util.concurrent.TimeUnit;
import javax.annotation.Nonnull;
import momento.sdk.auth.CredentialProvider;
import momento.sdk.config.transport.GrpcConfiguration;
import momento.sdk.internal.GrpcChannelOptions;
import momento.token.TokenGrpc;

/**
Expand All @@ -34,8 +36,14 @@ final class ScsTokenGrpcStubsManager implements AutoCloseable {
private static ManagedChannel setupConnection(CredentialProvider credentialProvider) {
final NettyChannelBuilder channelBuilder =
NettyChannelBuilder.forAddress(credentialProvider.getTokenEndpoint(), 443);
channelBuilder.useTransportSecurity();
channelBuilder.disableRetry();

// Note: This is hard-coded for now but we may want to expose it via configuration object
// in the future, as we do with some of the other clients.
final GrpcConfiguration grpcConfig = new GrpcConfiguration(Duration.ofMillis(15000));

// set additional channel options (message size, keepalive, auth, etc)
GrpcChannelOptions.GrpcOptionsFromGrpcConfig(grpcConfig, channelBuilder);

final List<ClientInterceptor> clientInterceptors = new ArrayList<>();
clientInterceptors.add(new UserHeaderInterceptor(credentialProvider.getAuthToken()));
channelBuilder.intercept(clientInterceptors);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@
import java.io.Closeable;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nonnull;
import momento.sdk.auth.CredentialProvider;
import momento.sdk.config.TopicConfiguration;
import momento.sdk.internal.GrpcChannelOptions;

/**
* Manager responsible for GRPC channels and stubs for the Topics.
Expand All @@ -28,20 +28,19 @@ final class ScsTopicGrpcStubsManager implements Closeable {

ScsTopicGrpcStubsManager(
@Nonnull CredentialProvider credentialProvider, @Nonnull TopicConfiguration configuration) {
this.channel = setupConnection(credentialProvider);
this.channel = setupConnection(credentialProvider, configuration);
this.stub = PubsubGrpc.newStub(channel);
this.configuration = configuration;
}

private static ManagedChannel setupConnection(CredentialProvider credentialProvider) {
private static ManagedChannel setupConnection(
CredentialProvider credentialProvider, TopicConfiguration configuration) {
final NettyChannelBuilder channelBuilder =
NettyChannelBuilder.forAddress(credentialProvider.getCacheEndpoint(), 443);
channelBuilder.useTransportSecurity();
channelBuilder.disableRetry();

channelBuilder.keepAliveTime(10, TimeUnit.SECONDS);
channelBuilder.keepAliveTimeout(5, TimeUnit.SECONDS);
channelBuilder.keepAliveWithoutCalls(true);
// set additional channel options (message size, keepalive, auth, etc)
GrpcChannelOptions.GrpcOptionsFromGrpcConfig(
configuration.getTransportStrategy().getGrpcConfiguration(), channelBuilder);

final List<ClientInterceptor> clientInterceptors = new ArrayList<>();
clientInterceptors.add(new UserHeaderInterceptor(credentialProvider.getAuthToken()));
Expand Down
15 changes: 13 additions & 2 deletions momento-sdk/src/main/java/momento/sdk/config/Configurations.java
Original file line number Diff line number Diff line change
Expand Up @@ -137,11 +137,22 @@ private Lambda(TransportStrategy transportStrategy, RetryStrategy retryStrategy)
* <p>This configuration may change in future releases to take advantage of improvements we
* identify for default configurations.
*
* <p>NOTE: keep-alives are very important for long-lived server environments where there may be
* periods of time when the connection is idle. However, they are very problematic for lambda
* environments where the lambda runtime is continuously frozen and unfrozen, because the lambda
* may be frozen before the "ACK" is received from the server. This can cause the keep-alive to
* timeout even though the connection is completely healthy. Therefore, keep-alives should be
* disabled in lambda and similar environments.
*
* @return the latest Lambda configuration
*/
public static Configuration latest() {
final TransportStrategy transportStrategy =
new StaticTransportStrategy(new GrpcConfiguration(Duration.ofMillis(1100)));
final GrpcConfiguration grpcConfig =
new GrpcConfiguration(Duration.ofMillis(1100))
.withKeepAliveTime(0)
.withKeepAliveTimeout(0)
.withKeepAliveWithoutCalls(false);
final TransportStrategy transportStrategy = new StaticTransportStrategy(grpcConfig);
final RetryStrategy retryStrategy =
new FixedCountRetryStrategy(DEFAULT_MAX_RETRIES, new DefaultRetryEligibilityStrategy());
return new Lambda(transportStrategy, retryStrategy);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,11 @@ private Laptop(TransportStrategy transportStrategy, Logger logger) {
* @return the latest Laptop configuration
*/
public static TopicConfiguration latest() {
final TransportStrategy transportStrategy =
new StaticTransportStrategy(new GrpcConfiguration(Duration.ofMillis(15000)));
final GrpcConfiguration grpcConfig =
new GrpcConfiguration(Duration.ofMillis(15000))
.withKeepAliveTime(10000)
.withKeepAliveTimeout(5000);
final TransportStrategy transportStrategy = new StaticTransportStrategy(grpcConfig);
final Logger logger = LoggerFactory.getLogger(TopicConfiguration.class);
return new Laptop(transportStrategy, logger);
}
Expand All @@ -54,8 +57,11 @@ private InRegion(TransportStrategy transportStrategy, Logger logger) {
* @return the latest in-region configuration
*/
public static TopicConfiguration latest() {
final TransportStrategy transportStrategy =
new StaticTransportStrategy(new GrpcConfiguration(Duration.ofMillis(1100)));
final GrpcConfiguration grpcConfig =
new GrpcConfiguration(Duration.ofMillis(1100))
.withKeepAliveTime(10000)
.withKeepAliveTimeout(5000);
final TransportStrategy transportStrategy = new StaticTransportStrategy(grpcConfig);
final Logger logger = LoggerFactory.getLogger(TopicConfiguration.class);
return new InRegion(transportStrategy, logger);
}
Expand All @@ -81,8 +87,11 @@ private LowLatency(TransportStrategy transportStrategy, Logger logger) {
* @return the latest low-latency configuration
*/
public static TopicConfiguration latest() {
final TransportStrategy transportStrategy =
new StaticTransportStrategy(new GrpcConfiguration(Duration.ofMillis(500)));
final GrpcConfiguration grpcConfig =
new GrpcConfiguration(Duration.ofMillis(500))
.withKeepAliveTime(10000)
.withKeepAliveTimeout(5000);
final TransportStrategy transportStrategy = new StaticTransportStrategy(grpcConfig);
final Logger logger = LoggerFactory.getLogger(TopicConfiguration.class);
return new LowLatency(transportStrategy, logger);
}
Expand Down
Loading

0 comments on commit 243bce0

Please sign in to comment.