Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add grpc config options and turn off keepalive for Lambda config #351

Merged
merged 6 commits into from
Mar 9, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -45,3 +45,5 @@ examples/lib/.settings
examples/lib/.classpath
examples/lib/.project
examples/lib/bin
.vscode/
*logback.xml
2 changes: 1 addition & 1 deletion momento-sdk/src/main/java/momento/sdk/CacheClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public CacheClient(
@Nonnull CredentialProvider credentialProvider,
@Nonnull Configuration configuration,
@Nonnull Duration itemDefaultTtl) {
this.scsControlClient = new ScsControlClient(credentialProvider);
this.scsControlClient = new ScsControlClient(credentialProvider, configuration);
this.scsDataClient = new ScsDataClient(credentialProvider, configuration, itemDefaultTtl);

logger.info("Creating Momento Cache Client");
Expand Down
6 changes: 4 additions & 2 deletions momento-sdk/src/main/java/momento/sdk/ScsControlClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import momento.sdk.auth.CredentialProvider;
import momento.sdk.config.Configuration;
import momento.sdk.exceptions.CacheServiceExceptionMapper;
import momento.sdk.exceptions.InternalServerException;
import momento.sdk.responses.cache.control.CacheCreateResponse;
Expand All @@ -48,9 +49,10 @@ final class ScsControlClient extends ScsClient {
private final CredentialProvider credentialProvider;
private final ScsControlGrpcStubsManager controlGrpcStubsManager;

ScsControlClient(@Nonnull CredentialProvider credentialProvider) {
ScsControlClient(@Nonnull CredentialProvider credentialProvider, Configuration configuration) {
this.credentialProvider = credentialProvider;
this.controlGrpcStubsManager = new ScsControlGrpcStubsManager(credentialProvider);
this.controlGrpcStubsManager =
new ScsControlGrpcStubsManager(credentialProvider, configuration);
}

CompletableFuture<CacheCreateResponse> createCache(String cacheName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@
import java.util.concurrent.TimeUnit;
import javax.annotation.Nonnull;
import momento.sdk.auth.CredentialProvider;
import momento.sdk.config.Configuration;
import momento.sdk.config.transport.GrpcConfiguration;
import momento.sdk.internal.GrpcChannelOptions;

/**
* Manager responsible for GRPC channels and stubs for the Control Plane.
Expand All @@ -26,16 +29,24 @@ final class ScsControlGrpcStubsManager implements AutoCloseable {

private final ScsControlGrpc.ScsControlFutureStub futureStub;

ScsControlGrpcStubsManager(@Nonnull CredentialProvider credentialProvider) {
this.channel = setupConnection(credentialProvider);
ScsControlGrpcStubsManager(
@Nonnull CredentialProvider credentialProvider, Configuration configuration) {
this.channel = setupConnection(credentialProvider, configuration);
this.futureStub = ScsControlGrpc.newFutureStub(channel);
}

private static ManagedChannel setupConnection(CredentialProvider credentialProvider) {
private static ManagedChannel setupConnection(
CredentialProvider credentialProvider, Configuration configuration) {
final NettyChannelBuilder channelBuilder =
NettyChannelBuilder.forAddress(credentialProvider.getControlEndpoint(), 443);
channelBuilder.useTransportSecurity();
channelBuilder.disableRetry();

// Override grpc config to disable keepalive for control clients
final GrpcConfiguration controlConfig =
configuration.getTransportStrategy().getGrpcConfiguration().withKeepAliveDisabled();

// set additional channel options (message size, keepalive, auth, etc)
GrpcChannelOptions.GrpcOptionsFromGrpcConfig(controlConfig, channelBuilder);

final List<ClientInterceptor> clientInterceptors = new ArrayList<>();
clientInterceptors.add(new UserHeaderInterceptor(credentialProvider.getAuthToken()));
channelBuilder.intercept(clientInterceptors);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import javax.annotation.Nonnull;
import momento.sdk.auth.CredentialProvider;
import momento.sdk.config.Configuration;
import momento.sdk.internal.GrpcChannelOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -171,8 +172,11 @@ private ManagedChannel setupChannel(
CredentialProvider credentialProvider, Configuration configuration) {
final NettyChannelBuilder channelBuilder =
NettyChannelBuilder.forAddress(credentialProvider.getCacheEndpoint(), 443);
channelBuilder.useTransportSecurity();
channelBuilder.disableRetry();

// set additional channel options (message size, keepalive, auth, etc)
GrpcChannelOptions.GrpcOptionsFromGrpcConfig(
configuration.getTransportStrategy().getGrpcConfiguration(), channelBuilder);

final List<ClientInterceptor> clientInterceptors = new ArrayList<>();
clientInterceptors.add(new UserHeaderInterceptor(credentialProvider.getAuthToken()));
clientInterceptors.add(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
import java.util.concurrent.TimeUnit;
import javax.annotation.Nonnull;
import momento.sdk.auth.CredentialProvider;
import momento.sdk.config.transport.GrpcConfiguration;
import momento.sdk.internal.GrpcChannelOptions;
import momento.token.TokenGrpc;

/**
Expand All @@ -34,8 +36,14 @@ final class ScsTokenGrpcStubsManager implements AutoCloseable {
private static ManagedChannel setupConnection(CredentialProvider credentialProvider) {
final NettyChannelBuilder channelBuilder =
NettyChannelBuilder.forAddress(credentialProvider.getTokenEndpoint(), 443);
channelBuilder.useTransportSecurity();
channelBuilder.disableRetry();

// Note: This is hard-coded for now but we may want to expose it via configuration object
// in the future, as we do with some of the other clients.
final GrpcConfiguration grpcConfig = new GrpcConfiguration(Duration.ofMillis(15000));

// set additional channel options (message size, keepalive, auth, etc)
GrpcChannelOptions.GrpcOptionsFromGrpcConfig(grpcConfig, channelBuilder);

final List<ClientInterceptor> clientInterceptors = new ArrayList<>();
clientInterceptors.add(new UserHeaderInterceptor(credentialProvider.getAuthToken()));
channelBuilder.intercept(clientInterceptors);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@
import java.io.Closeable;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nonnull;
import momento.sdk.auth.CredentialProvider;
import momento.sdk.config.TopicConfiguration;
import momento.sdk.internal.GrpcChannelOptions;

/**
* Manager responsible for GRPC channels and stubs for the Topics.
Expand All @@ -28,20 +28,19 @@ final class ScsTopicGrpcStubsManager implements Closeable {

ScsTopicGrpcStubsManager(
@Nonnull CredentialProvider credentialProvider, @Nonnull TopicConfiguration configuration) {
this.channel = setupConnection(credentialProvider);
this.channel = setupConnection(credentialProvider, configuration);
this.stub = PubsubGrpc.newStub(channel);
this.configuration = configuration;
}

private static ManagedChannel setupConnection(CredentialProvider credentialProvider) {
private static ManagedChannel setupConnection(
CredentialProvider credentialProvider, TopicConfiguration configuration) {
final NettyChannelBuilder channelBuilder =
NettyChannelBuilder.forAddress(credentialProvider.getCacheEndpoint(), 443);
channelBuilder.useTransportSecurity();
channelBuilder.disableRetry();

channelBuilder.keepAliveTime(10, TimeUnit.SECONDS);
channelBuilder.keepAliveTimeout(5, TimeUnit.SECONDS);
channelBuilder.keepAliveWithoutCalls(true);
Comment on lines -42 to -44
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

propagated these values to the prebuilt topics configs

// set additional channel options (message size, keepalive, auth, etc)
GrpcChannelOptions.GrpcOptionsFromGrpcConfig(
configuration.getTransportStrategy().getGrpcConfiguration(), channelBuilder);

final List<ClientInterceptor> clientInterceptors = new ArrayList<>();
clientInterceptors.add(new UserHeaderInterceptor(credentialProvider.getAuthToken()));
Expand Down
12 changes: 10 additions & 2 deletions momento-sdk/src/main/java/momento/sdk/config/Configurations.java
Original file line number Diff line number Diff line change
Expand Up @@ -137,11 +137,19 @@ private Lambda(TransportStrategy transportStrategy, RetryStrategy retryStrategy)
* <p>This configuration may change in future releases to take advantage of improvements we
* identify for default configurations.
*
* <p>NOTE: keep-alives are very important for long-lived server environments where there may be
* periods of time when the connection is idle. However, they are very problematic for lambda
* environments where the lambda runtime is continuously frozen and unfrozen, because the lambda
* may be frozen before the "ACK" is received from the server. This can cause the keep-alive to
* timeout even though the connection is completely healthy. Therefore, keep-alives should be
* disabled in lambda and similar environments.
*
* @return the latest Lambda configuration
*/
public static Configuration latest() {
final TransportStrategy transportStrategy =
new StaticTransportStrategy(new GrpcConfiguration(Duration.ofMillis(1100)));
final GrpcConfiguration grpcConfig =
new GrpcConfiguration(Duration.ofMillis(1100)).withKeepAliveDisabled();
final TransportStrategy transportStrategy = new StaticTransportStrategy(grpcConfig);
final RetryStrategy retryStrategy =
new FixedCountRetryStrategy(DEFAULT_MAX_RETRIES, new DefaultRetryEligibilityStrategy());
return new Lambda(transportStrategy, retryStrategy);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,11 @@ private Laptop(TransportStrategy transportStrategy, Logger logger) {
* @return the latest Laptop configuration
*/
public static TopicConfiguration latest() {
final TransportStrategy transportStrategy =
new StaticTransportStrategy(new GrpcConfiguration(Duration.ofMillis(15000)));
final GrpcConfiguration grpcConfig =
new GrpcConfiguration(Duration.ofMillis(15000))
.withKeepAliveTime(10000)
.withKeepAliveTimeout(5000);
final TransportStrategy transportStrategy = new StaticTransportStrategy(grpcConfig);
final Logger logger = LoggerFactory.getLogger(TopicConfiguration.class);
return new Laptop(transportStrategy, logger);
}
Expand All @@ -54,8 +57,11 @@ private InRegion(TransportStrategy transportStrategy, Logger logger) {
* @return the latest in-region configuration
*/
public static TopicConfiguration latest() {
final TransportStrategy transportStrategy =
new StaticTransportStrategy(new GrpcConfiguration(Duration.ofMillis(1100)));
final GrpcConfiguration grpcConfig =
new GrpcConfiguration(Duration.ofMillis(1100))
.withKeepAliveTime(10000)
.withKeepAliveTimeout(5000);
final TransportStrategy transportStrategy = new StaticTransportStrategy(grpcConfig);
final Logger logger = LoggerFactory.getLogger(TopicConfiguration.class);
return new InRegion(transportStrategy, logger);
}
Expand All @@ -81,8 +87,11 @@ private LowLatency(TransportStrategy transportStrategy, Logger logger) {
* @return the latest low-latency configuration
*/
public static TopicConfiguration latest() {
final TransportStrategy transportStrategy =
new StaticTransportStrategy(new GrpcConfiguration(Duration.ofMillis(500)));
final GrpcConfiguration grpcConfig =
new GrpcConfiguration(Duration.ofMillis(500))
.withKeepAliveTime(10000)
.withKeepAliveTimeout(5000);
final TransportStrategy transportStrategy = new StaticTransportStrategy(grpcConfig);
final Logger logger = LoggerFactory.getLogger(TopicConfiguration.class);
return new LowLatency(transportStrategy, logger);
}
Expand Down
Loading
Loading