From 21ea4fe6e58c97e9a5bc6bfa79529355091e5245 Mon Sep 17 00:00:00 2001 From: Russ Cam Date: Wed, 4 Oct 2023 08:17:18 +1000 Subject: [PATCH] Expose future stub on Pinecone connection This commit exposes the future stub on Pinecone connection, allowing for non-blocking async calls --- .../java/io/pinecone/PineconeConnection.java | 29 +++++++++---------- 1 file changed, 13 insertions(+), 16 deletions(-) diff --git a/src/main/java/io/pinecone/PineconeConnection.java b/src/main/java/io/pinecone/PineconeConnection.java index d38d10e5..48ee166a 100644 --- a/src/main/java/io/pinecone/PineconeConnection.java +++ b/src/main/java/io/pinecone/PineconeConnection.java @@ -34,7 +34,7 @@ public class PineconeConnection implements AutoCloseable { */ private VectorServiceGrpc.VectorServiceBlockingStub blockingStub; - private VectorServiceGrpc.VectorServiceStub asyncStub; + private VectorServiceGrpc.VectorServiceFutureStub futureStub; public PineconeConnection(PineconeClientConfig clientConfig, PineconeConnectionConfig connectionConfig) { this.connectionConfig = connectionConfig; @@ -45,13 +45,17 @@ public PineconeConnection(PineconeClientConfig clientConfig, PineconeConnectionC ? connectionConfig.getCustomChannelBuilder().apply(clientConfig, connectionConfig) : buildChannel(clientConfig, connectionConfig); channel.notifyWhenStateChanged(channel.getState(false), this::onConnectivityStateChanged); - Metadata metadata = assembleMetadata(clientConfig, connectionConfig); + Metadata metadata = assembleMetadata(clientConfig); blockingStub = VectorServiceGrpc .newBlockingStub(channel) - .withInterceptors(MetadataUtils.newAttachHeadersInterceptor(metadata)); - asyncStub = VectorServiceGrpc - .newStub(channel) - .withInterceptors(MetadataUtils.newAttachHeadersInterceptor(metadata)); + .withInterceptors(MetadataUtils.newAttachHeadersInterceptor(metadata)) + .withMaxInboundMessageSize(DEFAULT_MAX_MESSAGE_SIZE) + .withMaxOutboundMessageSize(DEFAULT_MAX_MESSAGE_SIZE); + futureStub = VectorServiceGrpc + .newFutureStub(channel) + .withInterceptors(MetadataUtils.newAttachHeadersInterceptor(metadata)) + .withMaxInboundMessageSize(DEFAULT_MAX_MESSAGE_SIZE) + .withMaxOutboundMessageSize(DEFAULT_MAX_MESSAGE_SIZE); logger.debug("created new PineconeConnection for channel: {}", channel); } @@ -79,8 +83,8 @@ public VectorServiceGrpc.VectorServiceBlockingStub getBlockingStub() { return blockingStub; } - public void setBlockingStub(VectorServiceGrpc.VectorServiceBlockingStub blockingStub) { - this.blockingStub = blockingStub; + public VectorServiceGrpc.VectorServiceFutureStub getFutureStub() { + return futureStub; } private void onConnectivityStateChanged() { @@ -104,20 +108,13 @@ public static ManagedChannel buildChannel(PineconeClientConfig clientConfig, return builder.build(); } - private static Metadata assembleMetadata(PineconeClientConfig clientConfig, - PineconeConnectionConfig connectionConfig) { + private static Metadata assembleMetadata(PineconeClientConfig clientConfig) { Metadata metadata = new Metadata(); metadata.put(Metadata.Key.of("api-key", Metadata.ASCII_STRING_MARSHALLER), clientConfig.getApiKey()); return metadata; } - private VectorServiceGrpc.VectorServiceBlockingStub applyDefaultBlockingStubConfig(VectorServiceGrpc.VectorServiceBlockingStub stub) { - return stub - .withMaxInboundMessageSize(DEFAULT_MAX_MESSAGE_SIZE) - .withMaxOutboundMessageSize(DEFAULT_MAX_MESSAGE_SIZE); - } - static String getEndpoint(PineconeClientConfig clientConfig, PineconeConnectionConfig connectionConfig) { String endpoint = (connectionConfig.getConnectionUrl() != null) ? connectionConfig.getConnectionUrl().replaceFirst("https?://", "") :