Skip to content

Commit

Permalink
feat: add support for configuring minimum num gRPC channels on CacheC…
Browse files Browse the repository at this point in the history
…lient (#345)

Prior to this commit, each instance of the CacheClient would always
open a single gRPC channel to the server. This commit adds a configuration
setting on the GrpcConfiguration object which allows a user to configure
a minimum number of channels; this is useful for cases where callers will
be issuing more than 100 concurrent requests, since 100 is the maximum
number of requests that can be multiplexed over a single channel.
  • Loading branch information
cprice404 authored Dec 21, 2023
1 parent 37a5fe4 commit 223a5d5
Show file tree
Hide file tree
Showing 3 changed files with 95 additions and 31 deletions.
85 changes: 54 additions & 31 deletions momento-sdk/src/main/java/momento/sdk/ScsDataGrpcStubsManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import javax.annotation.Nonnull;
import momento.sdk.auth.CredentialProvider;
import momento.sdk.config.Configuration;
Expand All @@ -32,8 +35,11 @@
*/
final class ScsDataGrpcStubsManager implements AutoCloseable {

private final ManagedChannel channel;
private final ScsGrpc.ScsFutureStub futureStub;
private final List<ManagedChannel> channels;
private final List<ScsGrpc.ScsFutureStub> futureStubs;
private final AtomicInteger nextStubIndex = new AtomicInteger(0);

private final int numGrpcChannels;
private final Duration deadline;
private final ScheduledExecutorService retryScheduler;
private final ExecutorService retryExecutor;
Expand All @@ -48,6 +54,8 @@ final class ScsDataGrpcStubsManager implements AutoCloseable {
ScsDataGrpcStubsManager(
@Nonnull CredentialProvider credentialProvider, @Nonnull Configuration configuration) {
this.deadline = configuration.getTransportStrategy().getGrpcConfiguration().getDeadline();
this.numGrpcChannels =
configuration.getTransportStrategy().getGrpcConfiguration().getMinNumGrpcChannels();

/**
* These two executors are used by {@link RetryClientInterceptor} to schedule and execute
Expand Down Expand Up @@ -80,8 +88,11 @@ final class ScsDataGrpcStubsManager implements AutoCloseable {
TimeUnit.SECONDS,
new LinkedBlockingQueue<>());

this.channel = setupChannel(credentialProvider, configuration);
this.futureStub = ScsGrpc.newFutureStub(channel);
this.channels =
IntStream.range(0, this.numGrpcChannels)
.mapToObj(i -> setupChannel(credentialProvider, configuration))
.collect(Collectors.toList());
this.futureStubs = channels.stream().map(ScsGrpc::newFutureStub).collect(Collectors.toList());
}

/**
Expand All @@ -91,36 +102,43 @@ final class ScsDataGrpcStubsManager implements AutoCloseable {
* attempt.
*/
public void connect(final long timeoutSeconds) {
final ConnectivityState currentState = this.channel.getState(true /* tryToConnect */);
if (ConnectivityState.READY.equals(currentState)) {
LOGGER.debug("Connected to Momento's server! Happy Caching!");
return;
}

// this future is our signalling mechanism to exit after the eager connection is successfully
// established or fails
final CompletableFuture<Void> connectionFuture = new CompletableFuture<>();
eagerlyConnect(currentState, connectionFuture);

try {
connectionFuture.get(timeoutSeconds, TimeUnit.SECONDS);
} catch (TimeoutException e) {
connectionFuture.cancel(true);
LOGGER.debug("Failed to connect within the allotted time of {} seconds.", 1);
} catch (InterruptedException | ExecutionException e) {
connectionFuture.cancel(true);
LOGGER.debug("Error while waiting for eager connection to establish.", e);
// TODO: client initialization time could be optimized, in the case where a user configures more
// than one gRPC
// channel, by attempting to connect these channels asynchronously rather than serially.
for (ManagedChannel channel : channels) {
final ConnectivityState currentState = channel.getState(true /* tryToConnect */);
if (ConnectivityState.READY.equals(currentState)) {
LOGGER.debug("Connected to Momento's server! Happy Caching!");
return;
}

// this future is our signalling mechanism to exit after the eager connection is successfully
// established or fails
final CompletableFuture<Void> connectionFuture = new CompletableFuture<>();
eagerlyConnect(currentState, connectionFuture, channel);

try {
connectionFuture.get(timeoutSeconds, TimeUnit.SECONDS);
} catch (TimeoutException e) {
connectionFuture.cancel(true);
LOGGER.debug("Failed to connect within the allotted time of {} seconds.", 1);
} catch (InterruptedException | ExecutionException e) {
connectionFuture.cancel(true);
LOGGER.debug("Error while waiting for eager connection to establish.", e);
}
}
}

private void eagerlyConnect(
final ConnectivityState lastObservedState, final CompletableFuture<Void> connectionFuture) {
private static void eagerlyConnect(
final ConnectivityState lastObservedState,
final CompletableFuture<Void> connectionFuture,
final ManagedChannel channel) {

// the callback is triggerd only when a state change happens
this.channel.notifyWhenStateChanged(
channel.notifyWhenStateChanged(
lastObservedState,
() -> {
final ConnectivityState currentState = this.channel.getState(false /* tryToConnect */);
final ConnectivityState currentState = channel.getState(false /* tryToConnect */);
switch (currentState) {
case READY:
LOGGER.debug("Connected to Momento's server! Happy Caching!");
Expand All @@ -133,11 +151,11 @@ private void eagerlyConnect(
// we deliberately give up connecting eagerly if it fails or has intermittent issues
// to not
// blow up the call stack on frequent state changes.
eagerlyConnect(currentState, connectionFuture);
eagerlyConnect(currentState, connectionFuture, channel);
break;
case CONNECTING:
LOGGER.debug("State transitioned to CONNECTING; waiting to get READY");
eagerlyConnect(currentState, connectionFuture);
eagerlyConnect(currentState, connectionFuture, channel);
break;
default:
LOGGER.debug(
Expand Down Expand Up @@ -176,13 +194,18 @@ private ManagedChannel setupChannel(
* <p><a href="https://github.com/grpc/grpc-java/issues/1495">more information</a>
*/
ScsGrpc.ScsFutureStub getStub() {
return futureStub.withDeadlineAfter(deadline.getSeconds(), TimeUnit.SECONDS);
int nextStubIndex = this.nextStubIndex.getAndIncrement();
return futureStubs
.get(nextStubIndex % this.numGrpcChannels)
.withDeadlineAfter(deadline.getSeconds(), TimeUnit.SECONDS);
}

@Override
public void close() {
retryScheduler.shutdown();
retryExecutor.shutdown();
channel.shutdown();
for (ManagedChannel channel : channels) {
channel.shutdown();
}
}
}
10 changes: 10 additions & 0 deletions momento-sdk/src/main/java/momento/sdk/config/Configuration.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,16 @@ public TransportStrategy getTransportStrategy() {
return transportStrategy;
}

/**
* Copy constructor that modifies the transport strategy.
*
* @param transportStrategy
* @return a new Configuration with the updated transport strategy
*/
public Configuration withTransportStrategy(final TransportStrategy transportStrategy) {
return new Configuration(transportStrategy, this.retryStrategy);
}

/**
* Configuration for retry tunables. By default, {@link momento.sdk.retry.FixedCountRetryStrategy}
* gets used.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,27 @@
public class GrpcConfiguration {

private final Duration deadline;
private final int minNumGrpcChannels;

/**
* Constructs a GrpcConfiguration.
*
* @param deadline The maximum duration of a gRPC call.
*/
public GrpcConfiguration(@Nonnull Duration deadline) {
this(deadline, 1);
}

/**
* Constructs a GrpcConfiguration.
*
* @param deadline The maximum duration of a gRPC call.
* @param minNumGrpcChannels The minimum number of gRPC channels to keep open at any given time.
*/
public GrpcConfiguration(@Nonnull Duration deadline, int minNumGrpcChannels) {
ensureRequestDeadlineValid(deadline);
this.deadline = deadline;
this.minNumGrpcChannels = minNumGrpcChannels;
}

/**
Expand All @@ -39,4 +51,23 @@ public Duration getDeadline() {
public GrpcConfiguration withDeadline(Duration deadline) {
return new GrpcConfiguration(deadline);
}

/**
* The minimum number of gRPC channels to keep open at any given time.
*
* @return the minimum number of gRPC channels.
*/
public int getMinNumGrpcChannels() {
return minNumGrpcChannels;
}

/**
* Copy constructor that updates the minimum number of gRPC channels.
*
* @param minNumGrpcChannels The new minimum number of gRPC channels.
* @return The updated GrpcConfiguration.
*/
public GrpcConfiguration withMinNumGrpcChannels(int minNumGrpcChannels) {
return new GrpcConfiguration(deadline, minNumGrpcChannels);
}
}

0 comments on commit 223a5d5

Please sign in to comment.