diff --git a/momento-sdk/src/main/java/momento/sdk/PreviewStorageClient.java b/momento-sdk/src/main/java/momento/sdk/PreviewStorageClient.java index 1d01e3ba..bd36359c 100644 --- a/momento-sdk/src/main/java/momento/sdk/PreviewStorageClient.java +++ b/momento-sdk/src/main/java/momento/sdk/PreviewStorageClient.java @@ -26,7 +26,8 @@ public PreviewStorageClient( this.dataClient = new StorageDataClient(credentialProvider, configuration); } - /** Control operations */ + // Control operations + public CompletableFuture createStore(String storeName) { return this.controlClient.createStore(storeName); } @@ -39,7 +40,8 @@ public CompletableFuture listStores() { return this.controlClient.listStores(); } - /** Data operations */ + // Data operations + public CompletableFuture get(String storeName, String key) { return this.dataClient.get(storeName, key); } diff --git a/momento-sdk/src/main/java/momento/sdk/StorageControlGrpcStubsManager.java b/momento-sdk/src/main/java/momento/sdk/StorageControlGrpcStubsManager.java index 1678bd07..c0286b6b 100644 --- a/momento-sdk/src/main/java/momento/sdk/StorageControlGrpcStubsManager.java +++ b/momento-sdk/src/main/java/momento/sdk/StorageControlGrpcStubsManager.java @@ -11,7 +11,7 @@ import javax.annotation.Nonnull; import momento.sdk.auth.CredentialProvider; import momento.sdk.config.StorageConfiguration; -import momento.sdk.config.transport.GrpcConfiguration; +import momento.sdk.config.transport.storage.StorageGrpcConfiguration; import momento.sdk.internal.GrpcChannelOptions; /** @@ -41,7 +41,7 @@ private static ManagedChannel setupConnection( NettyChannelBuilder.forAddress(credentialProvider.getControlEndpoint(), 443); // Override grpc config to disable keepalive for control clients - final GrpcConfiguration controlConfig = + final StorageGrpcConfiguration controlConfig = configuration.getTransportStrategy().getGrpcConfiguration().withKeepAliveDisabled(); // set additional channel options (message size, keepalive, auth, etc) diff --git a/momento-sdk/src/main/java/momento/sdk/config/StorageConfiguration.java b/momento-sdk/src/main/java/momento/sdk/config/StorageConfiguration.java index 51bada9e..c011d50a 100644 --- a/momento-sdk/src/main/java/momento/sdk/config/StorageConfiguration.java +++ b/momento-sdk/src/main/java/momento/sdk/config/StorageConfiguration.java @@ -1,18 +1,18 @@ package momento.sdk.config; -import momento.sdk.config.transport.TransportStrategy; +import momento.sdk.config.transport.storage.StorageTransportStrategy; /** The contract for SDK configurables. A configuration must have a transport strategy. */ public class StorageConfiguration { - private final TransportStrategy transportStrategy; + private final StorageTransportStrategy transportStrategy; /** * Creates a new configuration object. * * @param transportStrategy Responsible for configuring network tunables. */ - public StorageConfiguration(TransportStrategy transportStrategy) { + public StorageConfiguration(StorageTransportStrategy transportStrategy) { this.transportStrategy = transportStrategy; } @@ -21,7 +21,7 @@ public StorageConfiguration(TransportStrategy transportStrategy) { * * @return The transport strategy */ - public TransportStrategy getTransportStrategy() { + public StorageTransportStrategy getTransportStrategy() { return transportStrategy; } @@ -31,7 +31,8 @@ public TransportStrategy getTransportStrategy() { * @param transportStrategy * @return a new Configuration with the updated transport strategy */ - public StorageConfiguration withTransportStrategy(final TransportStrategy transportStrategy) { + public StorageConfiguration withTransportStrategy( + final StorageTransportStrategy transportStrategy) { return new StorageConfiguration(transportStrategy); } } diff --git a/momento-sdk/src/main/java/momento/sdk/config/StorageConfigurations.java b/momento-sdk/src/main/java/momento/sdk/config/StorageConfigurations.java index 25314cc9..b963943e 100644 --- a/momento-sdk/src/main/java/momento/sdk/config/StorageConfigurations.java +++ b/momento-sdk/src/main/java/momento/sdk/config/StorageConfigurations.java @@ -1,9 +1,9 @@ package momento.sdk.config; import java.time.Duration; -import momento.sdk.config.transport.GrpcConfiguration; -import momento.sdk.config.transport.StaticTransportStrategy; -import momento.sdk.config.transport.TransportStrategy; +import momento.sdk.config.transport.storage.StaticStorageTransportStrategy; +import momento.sdk.config.transport.storage.StorageGrpcConfiguration; +import momento.sdk.config.transport.storage.StorageTransportStrategy; /** Prebuilt {@link StorageConfiguration}s for different environments. */ public class StorageConfigurations { @@ -13,7 +13,7 @@ public class StorageConfigurations { */ public static class Laptop extends StorageConfiguration { - private Laptop(TransportStrategy transportStrategy) { + private Laptop(StorageTransportStrategy transportStrategy) { super(transportStrategy); } @@ -26,9 +26,10 @@ private Laptop(TransportStrategy transportStrategy) { * @return the latest Laptop configuration */ public static StorageConfiguration latest() { - final TransportStrategy transportStrategy = + final StorageTransportStrategy transportStrategy = // TODO - new StaticTransportStrategy(new GrpcConfiguration(Duration.ofMillis(15000))); + new StaticStorageTransportStrategy( + new StorageGrpcConfiguration(Duration.ofMillis(15000))); return new Laptop(transportStrategy); } } @@ -40,7 +41,7 @@ public static StorageConfiguration latest() { */ public static class InRegion extends StorageConfiguration { - private InRegion(TransportStrategy transportStrategy) { + private InRegion(StorageTransportStrategy transportStrategy) { super(transportStrategy); } @@ -53,15 +54,16 @@ private InRegion(TransportStrategy transportStrategy) { * @return the latest in-region configuration */ public static StorageConfiguration latest() { - final TransportStrategy transportStrategy = + final StorageTransportStrategy transportStrategy = // TODO - new StaticTransportStrategy(new GrpcConfiguration(Duration.ofMillis(15000))); + new StaticStorageTransportStrategy( + new StorageGrpcConfiguration(Duration.ofMillis(15000))); return new InRegion(transportStrategy); } } public static class Lambda extends StorageConfiguration { - private Lambda(TransportStrategy transportStrategy) { + private Lambda(StorageTransportStrategy transportStrategy) { super(transportStrategy); } @@ -81,10 +83,11 @@ private Lambda(TransportStrategy transportStrategy) { * @return the latest Lambda configuration */ public static StorageConfiguration latest() { - final GrpcConfiguration grpcConfig = + final StorageGrpcConfiguration grpcConfig = // TODO - new GrpcConfiguration(Duration.ofMillis(15000)).withKeepAliveDisabled(); - final TransportStrategy transportStrategy = new StaticTransportStrategy(grpcConfig); + new StorageGrpcConfiguration(Duration.ofMillis(15000)).withKeepAliveDisabled(); + final StorageTransportStrategy transportStrategy = + new StaticStorageTransportStrategy(grpcConfig); return new Lambda(transportStrategy); } } diff --git a/momento-sdk/src/main/java/momento/sdk/config/transport/storage/StaticStorageTransportStrategy.java b/momento-sdk/src/main/java/momento/sdk/config/transport/storage/StaticStorageTransportStrategy.java new file mode 100644 index 00000000..83e54de1 --- /dev/null +++ b/momento-sdk/src/main/java/momento/sdk/config/transport/storage/StaticStorageTransportStrategy.java @@ -0,0 +1,30 @@ +package momento.sdk.config.transport.storage; + +/** + * The simplest way to configure gRPC for the Momento client. Specifies static values the transport + * options. + */ +public class StaticStorageTransportStrategy implements StorageTransportStrategy { + + private final StorageGrpcConfiguration grpcConfiguration; + + /** + * Constructs a StaticStorageTransportStrategy. + * + * @param grpcConfiguration gRPC tunables. + */ + public StaticStorageTransportStrategy(StorageGrpcConfiguration grpcConfiguration) { + this.grpcConfiguration = grpcConfiguration; + } + + @Override + public StorageGrpcConfiguration getGrpcConfiguration() { + return grpcConfiguration; + } + + @Override + public StorageTransportStrategy withGrpcConfiguration( + StorageGrpcConfiguration grpcConfiguration) { + return new StaticStorageTransportStrategy(grpcConfiguration); + } +} diff --git a/momento-sdk/src/main/java/momento/sdk/config/transport/storage/StorageGrpcConfiguration.java b/momento-sdk/src/main/java/momento/sdk/config/transport/storage/StorageGrpcConfiguration.java new file mode 100644 index 00000000..e6251886 --- /dev/null +++ b/momento-sdk/src/main/java/momento/sdk/config/transport/storage/StorageGrpcConfiguration.java @@ -0,0 +1,260 @@ +package momento.sdk.config.transport.storage; + +import static momento.sdk.ValidationUtils.ensureRequestDeadlineValid; + +import java.time.Duration; +import java.util.Optional; +import javax.annotation.Nonnull; +import momento.sdk.internal.GrpcChannelOptions; + +/** Abstracts away the gRPC configuration tunables. */ +public class StorageGrpcConfiguration { + + private final Duration deadline; + private final int minNumGrpcChannels; + private final Optional maxMessageSize; + private final Optional keepAliveWithoutCalls; + private final Optional keepAliveTimeoutMs; + private final Optional keepAliveTimeMs; + + /** + * Constructs a StorageGrpcConfiguration. + * + * @param deadline The maximum duration of a gRPC call. + */ + public StorageGrpcConfiguration(@Nonnull Duration deadline) { + this( + deadline, + 1, + Optional.of(GrpcChannelOptions.DEFAULT_MAX_MESSAGE_SIZE), + Optional.of(GrpcChannelOptions.DEFAULT_KEEPALIVE_WITHOUT_STREAM), + Optional.of(GrpcChannelOptions.DEFAULT_KEEPALIVE_TIMEOUT_MS), + Optional.of(GrpcChannelOptions.DEFAULT_KEEPALIVE_TIME_MS)); + } + + /** + * Constructs a StorageGrpcConfiguration. + * + * @param deadline The maximum duration of a gRPC call. + * @param minNumGrpcChannels The minimum number of gRPC channels to keep open at any given time. + * @param maxMessageSize The maximum size of a message (in bytes) that can be received by the + * client. + * @param keepAliveWithoutCalls Whether to send keepalive pings without any active calls. + * @param keepAliveTimeout The time to wait for a keepalive ping response before considering the + * connection dead. + * @param keepAliveTime The time to wait between keepalive pings. + */ + public StorageGrpcConfiguration( + @Nonnull Duration deadline, + int minNumGrpcChannels, + Optional maxMessageSize, + Optional keepAliveWithoutCalls, + Optional keepAliveTimeout, + Optional keepAliveTime) { + ensureRequestDeadlineValid(deadline); + this.deadline = deadline; + this.minNumGrpcChannels = minNumGrpcChannels; + this.maxMessageSize = maxMessageSize; + this.keepAliveWithoutCalls = keepAliveWithoutCalls; + this.keepAliveTimeoutMs = keepAliveTimeout; + this.keepAliveTimeMs = keepAliveTime; + } + + /** + * How long the client will wait for an RPC to complete before it is terminated with {@link + * io.grpc.Status.Code#DEADLINE_EXCEEDED}. + * + * @return the deadline + */ + public Duration getDeadline() { + return deadline; + } + + /** + * Copy constructor that updates the deadline. + * + * @param deadline The new deadline. + * @return The updated StorageGrpcConfiguration. + */ + public StorageGrpcConfiguration withDeadline(Duration deadline) { + return new StorageGrpcConfiguration( + deadline, + minNumGrpcChannels, + maxMessageSize, + keepAliveWithoutCalls, + keepAliveTimeoutMs, + keepAliveTimeMs); + } + + /** + * The minimum number of gRPC channels to keep open at any given time. + * + * @return the minimum number of gRPC channels. + */ + public int getMinNumGrpcChannels() { + return minNumGrpcChannels; + } + + /** + * Copy constructor that updates the minimum number of gRPC channels. + * + * @param minNumGrpcChannels The new minimum number of gRPC channels. + * @return The updated StorageGrpcConfiguration. + */ + public StorageGrpcConfiguration withMinNumGrpcChannels(int minNumGrpcChannels) { + return new StorageGrpcConfiguration( + deadline, + minNumGrpcChannels, + maxMessageSize, + keepAliveWithoutCalls, + keepAliveTimeoutMs, + keepAliveTimeMs); + } + + /** + * The maximum size of a message (in bytes) that can be received by the client. + * + * @return the maximum message size. + */ + public Optional getMaxMessageSize() { + return maxMessageSize; + } + + /** + * Copy constructor that updates the maximum message size. + * + * @param maxMessageSize The new maximum message size. + * @return The updated StorageGrpcConfiguration. + */ + public StorageGrpcConfiguration withMaxMessageSize(int maxMessageSize) { + return new StorageGrpcConfiguration( + deadline, + minNumGrpcChannels, + Optional.of(maxMessageSize), + keepAliveWithoutCalls, + keepAliveTimeoutMs, + keepAliveTimeMs); + } + + /** + * Whether keepalive will be performed when there are no outstanding requests on a connection. + * + * @return the boolean indicating whether to send keepalive pings without any active calls. + */ + public Optional getKeepAliveWithoutCalls() { + return keepAliveWithoutCalls; + } + + /** + * Copy constructor that updates whether keepalive will be performed when there are no outstanding + * requests on a connection. + * + *

NOTE: keep-alives are very important for long-lived server environments where there may be + * periods of time when the connection is idle. However, they are very problematic for lambda + * environments where the lambda runtime is continuously frozen and unfrozen, because the lambda + * may be frozen before the "ACK" is received from the server. This can cause the keep-alive to + * timeout even though the connection is completely healthy. Therefore, keep-alives should be + * disabled in lambda and similar environments. + * + * @param keepAliveWithoutCalls The boolean indicating whether to send keepalive pings without any + * active calls. + * @return The updated StorageGrpcConfiguration. + */ + public StorageGrpcConfiguration withKeepAliveWithoutCalls( + Optional keepAliveWithoutCalls) { + return new StorageGrpcConfiguration( + deadline, + minNumGrpcChannels, + maxMessageSize, + keepAliveWithoutCalls, + keepAliveTimeoutMs, + keepAliveTimeMs); + } + + /** + * The time to wait for a keepalive ping response before considering the connection dead. + * + * @return the time to wait for a keepalive ping response before considering the connection dead. + */ + public Optional getKeepAliveTimeoutMs() { + return keepAliveTimeoutMs; + } + + /** + * Copy constructor that updates the time to wait for a keepalive ping response before considering + * the connection dead. + * + *

NOTE: keep-alives are very important for long-lived server environments where there may be + * periods of time when the connection is idle. However, they are very problematic for lambda + * environments where the lambda runtime is continuously frozen and unfrozen, because the lambda + * may be frozen before the "ACK" is received from the server. This can cause the keep-alive to + * timeout even though the connection is completely healthy. Therefore, keep-alives should be + * disabled in lambda and similar environments. + * + * @param keepAliveTimeoutMs The new time to wait for a keepalive ping response. + * @return The updated StorageGrpcConfiguration. + */ + public StorageGrpcConfiguration withKeepAliveTimeout(int keepAliveTimeoutMs) { + return new StorageGrpcConfiguration( + deadline, + minNumGrpcChannels, + maxMessageSize, + keepAliveWithoutCalls, + Optional.of(keepAliveTimeoutMs), + keepAliveTimeMs); + } + + /** + * The time to wait between keepalive pings. + * + * @return the time to wait between keepalive pings. + */ + public Optional getKeepAliveTimeMs() { + return keepAliveTimeMs; + } + + /** + * Copy constructor that updates the time to wait between keepalive pings. + * + *

NOTE: keep-alives are very important for long-lived server environments where there may be + * periods of time when the connection is idle. However, they are very problematic for lambda + * environments where the lambda runtime is continuously frozen and unfrozen, because the lambda + * may be frozen before the "ACK" is received from the server. This can cause the keep-alive to + * timeout even though the connection is completely healthy. Therefore, keep-alives should be + * disabled in lambda and similar environments. + * + * @param keepAliveTimeMs The new time to wait between keepalive pings. + * @return The updated StorageGrpcConfiguration. + */ + public StorageGrpcConfiguration withKeepAliveTime(int keepAliveTimeMs) { + return new StorageGrpcConfiguration( + deadline, + minNumGrpcChannels, + maxMessageSize, + keepAliveWithoutCalls, + keepAliveTimeoutMs, + Optional.of(keepAliveTimeMs)); + } + + /** + * Copy constructor that disables all client-side keepalive settings. + * + *

NOTE: keep-alives are very important for long-lived server environments where there may be + * periods of time when the connection is idle. However, they are very problematic for lambda + * environments where the lambda runtime is continuously frozen and unfrozen, because the lambda + * may be frozen before the "ACK" is received from the server. This can cause the keep-alive to + * timeout even though the connection is completely healthy. Therefore, keep-alives should be + * disabled in lambda and similar environments. + * + * @return The updated StorageGrpcConfiguration. + */ + public StorageGrpcConfiguration withKeepAliveDisabled() { + return new StorageGrpcConfiguration( + deadline, + minNumGrpcChannels, + maxMessageSize, + Optional.empty(), + Optional.empty(), + Optional.empty()); + } +} diff --git a/momento-sdk/src/main/java/momento/sdk/config/transport/storage/StorageTransportStrategy.java b/momento-sdk/src/main/java/momento/sdk/config/transport/storage/StorageTransportStrategy.java new file mode 100644 index 00000000..91f85ec3 --- /dev/null +++ b/momento-sdk/src/main/java/momento/sdk/config/transport/storage/StorageTransportStrategy.java @@ -0,0 +1,19 @@ +package momento.sdk.config.transport.storage; + +/** Configuration for network tunables. */ +public interface StorageTransportStrategy { + /** + * Configures the low-level gRPC settings for the client's communication with the Momento server. + * + * @return the gRPC configuration. + */ + StorageGrpcConfiguration getGrpcConfiguration(); + + /** + * Copy constructor that modifies the gRPC configuration. + * + * @param grpcConfiguration low-level gRPC settings. + * @return The modified StorageTransportStrategy. + */ + StorageTransportStrategy withGrpcConfiguration(StorageGrpcConfiguration grpcConfiguration); +} diff --git a/momento-sdk/src/main/java/momento/sdk/internal/GrpcChannelOptions.java b/momento-sdk/src/main/java/momento/sdk/internal/GrpcChannelOptions.java index fe86e721..081ed50a 100644 --- a/momento-sdk/src/main/java/momento/sdk/internal/GrpcChannelOptions.java +++ b/momento-sdk/src/main/java/momento/sdk/internal/GrpcChannelOptions.java @@ -4,6 +4,7 @@ import java.util.Optional; import java.util.concurrent.TimeUnit; import momento.sdk.config.transport.GrpcConfiguration; +import momento.sdk.config.transport.storage.StorageGrpcConfiguration; public class GrpcChannelOptions { // The default value for max_send_message_length is 4mb. We need to increase this to 5mb in order @@ -42,4 +43,32 @@ public static void applyGrpcConfigurationToChannelBuilder( channelBuilder.keepAliveWithoutCalls(keepAliveWithoutCalls.get()); } } + + public static void applyGrpcConfigurationToChannelBuilder( + StorageGrpcConfiguration grpcConfig, NettyChannelBuilder channelBuilder) { + channelBuilder.useTransportSecurity(); + channelBuilder.disableRetry(); + + final Optional maxMessageSize = grpcConfig.getMaxMessageSize(); + if (maxMessageSize.isPresent()) { + channelBuilder.maxInboundMessageSize(maxMessageSize.get()); + } + + // no equivalent for maxOutboundboundMessageSize + + final Optional keepAliveTimeMs = grpcConfig.getKeepAliveTimeMs(); + if (keepAliveTimeMs.isPresent()) { + channelBuilder.keepAliveTime(keepAliveTimeMs.get(), TimeUnit.MILLISECONDS); + } + + final Optional keepAliveTimeoutMs = grpcConfig.getKeepAliveTimeoutMs(); + if (keepAliveTimeoutMs.isPresent()) { + channelBuilder.keepAliveTimeout(keepAliveTimeoutMs.get(), TimeUnit.MILLISECONDS); + } + + final Optional keepAliveWithoutCalls = grpcConfig.getKeepAliveWithoutCalls(); + if (keepAliveWithoutCalls.isPresent()) { + channelBuilder.keepAliveWithoutCalls(keepAliveWithoutCalls.get()); + } + } } diff --git a/momento-sdk/src/main/java/momento/sdk/responses/storage/GetResponse.java b/momento-sdk/src/main/java/momento/sdk/responses/storage/GetResponse.java index 4958ea50..6c873ac6 100644 --- a/momento-sdk/src/main/java/momento/sdk/responses/storage/GetResponse.java +++ b/momento-sdk/src/main/java/momento/sdk/responses/storage/GetResponse.java @@ -3,7 +3,21 @@ import java.util.Optional; import momento.sdk.exceptions.SdkException; -/** Response for a get operation */ +/** + * Response for a get operation. + * + *

The response can be either a {@link Success} or an {@link Error}. + * + *

To shortcut access to the success response, use {@link #success()}. If the operation was + * successful, the response will be an optional of {@link Success}, otherwise it will be an empty + * optional. + * + *

To handle the response otherwise, use pattern matching or an instanceof to check if the + * response is a {@link Success} or an {@link Error}. + * + *

Upon a success, the value can be retrieved with {@link Success#value()}. If the value was + * found in the store, it will be present, otherwise it will be empty. + */ public interface GetResponse { /** * Returns the success response if the operation was successful, or an empty optional if the @@ -15,7 +29,15 @@ public interface GetResponse { */ Optional success(); - /** A successful get operation. */ + /** + * A successful get operation. + * + *

To access the value, use {@link #value()}. If the value was found in the store, it will be + * present, otherwise it will be empty. + * + *

Use the appropriate type-based accessor on the value to retrieve the value in its + * corresponding type. + */ class Success implements GetResponse { private final Optional value; @@ -43,6 +65,11 @@ public static Success of(double value) { return new Success(Optional.of(StorageValue.of(value))); } + /** + * Returns the value if it exists, or an empty optional if the value does not exist. + * + * @return The value, or an empty optional if the value does not exist. + */ public Optional value() { return value; } diff --git a/momento-sdk/src/main/java/momento/sdk/responses/storage/StorageValue.java b/momento-sdk/src/main/java/momento/sdk/responses/storage/StorageValue.java index 07d99e30..f33f5f75 100644 --- a/momento-sdk/src/main/java/momento/sdk/responses/storage/StorageValue.java +++ b/momento-sdk/src/main/java/momento/sdk/responses/storage/StorageValue.java @@ -2,6 +2,21 @@ import java.util.Optional; +/** + * A value stored in the storage. + * + *

Values can be of the following types: + * + *

    + *
  • byte array + *
  • string + *
  • long + *
  • double + *
+ * + *

Use the appropriate accessor to retrieve the value in its corresponding type. If the + * underlying value is not of the requested type, an empty optional will be returned. + */ public class StorageValue { private final Object value; private final StorageItemType itemType; @@ -27,10 +42,21 @@ public static StorageValue of(double value) { return new StorageValue(value, StorageItemType.DOUBLE); } + /** + * Get the type of the value. + * + * @return the type of the value. + */ public StorageItemType getType() { return itemType; } + /** + * Get the value as a byte array. + * + * @return the value as a byte array. If the value is not a byte array, an empty optional is + * returned. + */ public Optional getByteArray() { if (itemType != StorageItemType.BYTE_ARRAY) { return Optional.empty(); @@ -38,6 +64,11 @@ public Optional getByteArray() { return Optional.of((byte[]) value); } + /** + * Get the value as a string. + * + * @return the value as a string. If the value is not a string, an empty optional is returned. + */ public Optional getString() { if (itemType != StorageItemType.STRING) { return Optional.empty(); @@ -45,6 +76,11 @@ public Optional getString() { return Optional.of((String) value); } + /** + * Get the value as a long. + * + * @return the value as a long. If the value is not a long, an empty optional is returned. + */ public Optional getLong() { if (itemType != StorageItemType.LONG) { return Optional.empty(); @@ -52,6 +88,11 @@ public Optional getLong() { return Optional.of((long) value); } + /** + * Get the value as a double. + * + * @return the value as a double. If the value is not a double, an empty optional is returned. + */ public Optional getDouble() { if (itemType != StorageItemType.DOUBLE) { return Optional.empty();