Skip to content

Commit

Permalink
Add collections operations to PineconeControlPlaneClient with integ…
Browse files Browse the repository at this point in the history
…ration tests (#65)

## Problem
The Java SDK is currently missing collections functionality. Now that
we've added generated code for the OpenAPI spec, we can hook up
collections operations in `PineconeControlPlaneClient`.

We'd also like to add integration tests to cover collections.

## Solution
- Update `PineconeControlPlaneClient` to include collections operations
for create, list, delete, and describe.
- Add new integration test files: `CollectionTest` and
`CollectionErrorTest`.
- Add new helpers to `IndexManager` for creating and connecting to an
index by `indexName`, creating a collection and waiting for it to be
ready, and polling until an index is ready.
- Add new helper to `BuildUpsertRequest` for generating vectors by
dimension.

Bonuses:

- I added some logging configs in `build.gradle` for `test` and the
`integrationTest` task. This was primarily to help me debug things in CI
and log a bit more info to the console. We can tweak as needed, but I
think having something like this will be really helpful as opposed to
the generic output.
- Set `max-parallel: 1` in the `integration-test` job in the `pr.yml`
workflow. Because of the way `findIndexWithDimensionAndType` works we
cannot run integration tests in parallel without flakiness.
- Fixed an issue in `AssertRetry`. We were incrementing `delay` at the
class level which is a static variable, meaning if you re-used the
function in one place it would continue increasing the delay for all
subsequent calls based on the `backOff`.

## Type of Change
- [X] New feature (non-breaking change which adds functionality)

## Test Plan
Run integration tests to make sure `CollectionTest` and
`CollectionErrorTest` pass as expected. Integration tests should also be
passing in CI barring any flakiness.
  • Loading branch information
austin-denoble authored Feb 21, 2024
1 parent 051b696 commit 20a38ac
Show file tree
Hide file tree
Showing 15 changed files with 577 additions and 42 deletions.
3 changes: 2 additions & 1 deletion .github/workflows/pr.yml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ jobs:
{ java: 8, gradle: 6.8 },
{ java: 17, gradle: 7.3.1 }
]
max-parallel: 1
steps:
- uses: actions/checkout@v4

Expand All @@ -83,5 +84,5 @@ jobs:
env:
PINECONE_API_KEY: ${{ secrets.PINECONE_API_KEY }}
PINECONE_ENVIRONMENT: ${{ secrets.PINECONE_ENVIRONMENT }}


21 changes: 15 additions & 6 deletions src/integration/java/io/pinecone/helpers/AssertRetry.java
Original file line number Diff line number Diff line change
@@ -1,34 +1,43 @@
package io.pinecone.helpers;

import io.pinecone.exceptions.PineconeException;

import java.io.IOException;
import java.util.concurrent.ExecutionException;

public class AssertRetry {
private static final int maxRetry = 4;
private static int delay = 1500;
private static final int delay = 1500;

public static void assertWithRetry(AssertionRunnable assertionRunnable) throws InterruptedException {
public static void assertWithRetry(AssertionRunnable assertionRunnable) throws InterruptedException, PineconeException {
assertWithRetry(assertionRunnable, 2);
}

public static void assertWithRetry(AssertionRunnable assertionRunnable, int backOff) throws InterruptedException {
public static void assertWithRetry(AssertionRunnable assertionRunnable, int backOff) throws AssertionError, InterruptedException {
int retryCount = 0;
int delayCount = delay;
boolean success = false;
String errorMessage = null;

while (retryCount < maxRetry && !success) {
try {
assertionRunnable.run();
success = true;
} catch (AssertionError | ExecutionException | IOException e) {
errorMessage = e.getLocalizedMessage();
retryCount++;
delay*=backOff;
Thread.sleep(delay);
delayCount*=backOff;
Thread.sleep(delayCount);
}
}

if (!success) {
throw new AssertionError(errorMessage);
}
}

@FunctionalInterface
public interface AssertionRunnable {
void run() throws AssertionError, ExecutionException, InterruptedException, IOException;
void run() throws AssertionError, ExecutionException, InterruptedException, IOException, PineconeException;
}
}
33 changes: 29 additions & 4 deletions src/integration/java/io/pinecone/helpers/BuildUpsertRequest.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,7 @@
import io.pinecone.proto.UpsertRequest;
import io.pinecone.proto.Vector;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.*;

public class BuildUpsertRequest {
private static final float[][] upsertData = {{1.0F, 2.0F, 3.0F}, {4.0F, 5.0F, 6.0F}, {7.0F, 8.0F, 9.0F}};
Expand Down Expand Up @@ -44,6 +41,23 @@ public static UpsertRequest buildRequiredUpsertRequest(List<String> upsertIds, S
.build();
}

public static UpsertRequest buildRequiredUpsertRequestByDimension(List<String> upsertIds, int dimension, String namespace) {
if (upsertIds.isEmpty()) upsertIds = Arrays.asList("v1", "v2", "v3");

List<Vector> upsertVectors = new ArrayList<>();
for (String upsertId : upsertIds) {
upsertVectors.add(Vector.newBuilder()
.addAllValues(generateVectorValuesByDimension(dimension))
.setId(upsertId)
.build());
}

return UpsertRequest.newBuilder()
.addAllVectors(upsertVectors)
.setNamespace(namespace)
.build();
}

public static UpsertRequest buildOptionalUpsertRequest() {
return buildOptionalUpsertRequest(new ArrayList<>(), "");
}
Expand Down Expand Up @@ -108,4 +122,15 @@ public static HashMap<String, List<String>> createAndGetMetadataMap() {

return metadataMap;
}

public static ArrayList<Float> generateVectorValuesByDimension(int dimension) {
ArrayList<Float> values = new ArrayList<>();
Random random = new Random();

for (int i = 0; i < dimension; i++) {
values.add(random.nextFloat());
}

return values;
}
}
84 changes: 81 additions & 3 deletions src/integration/java/io/pinecone/helpers/IndexManager.java
Original file line number Diff line number Diff line change
@@ -1,15 +1,21 @@
package io.pinecone.helpers;

import io.pinecone.*;
import io.pinecone.exceptions.PineconeException;
import org.openapitools.client.model.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.List;

import static io.pinecone.helpers.AssertRetry.assertWithRetry;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.fail;

public class IndexManager {
private static PineconeClientConfig config;
private static final Logger logger = LoggerFactory.getLogger(IndexManager.class);

public static PineconeConnection createIndexIfNotExistsDataPlane(int dimension, String indexType) throws IOException, InterruptedException {
String apiKey = System.getenv("PINECONE_API_KEY");
Expand Down Expand Up @@ -45,9 +51,11 @@ private static String findIndexWithDimensionAndType(IndexList indexList, int dim
List<IndexModel> indexModels = indexList.getIndexes();
while (i < indexModels.size()) {
IndexModel indexModel = isIndexReady(indexModels.get(i).getName(), controlPlaneClient);
// ToDo: add pod type support
if (indexModel.getDimension() == dimension
&& ((indexType.equalsIgnoreCase(IndexModelSpec.SERIALIZED_NAME_POD) && indexModel.getSpec().getPod() != null && indexModel.getSpec().getPod().getReplicas() == 1 && indexModel.getSpec().getPod().getPodType().equalsIgnoreCase("p1.x1"))
&& ((indexType.equalsIgnoreCase(IndexModelSpec.SERIALIZED_NAME_POD)
&& indexModel.getSpec().getPod() != null
&& indexModel.getSpec().getPod().getReplicas() == 1
&& indexModel.getSpec().getPod().getPodType().equalsIgnoreCase("p1.x1"))
|| (indexType.equalsIgnoreCase(IndexModelSpec.SERIALIZED_NAME_SERVERLESS)))) {
return indexModel.getName();
}
Expand Down Expand Up @@ -79,13 +87,83 @@ private static String createNewIndex(PineconeControlPlaneClient controlPlaneClie
return indexName;
}

public static IndexModel waitUntilIndexIsReady(PineconeControlPlaneClient controlPlaneClient, String indexName, Integer totalMsToWait) throws InterruptedException {
IndexModel index = controlPlaneClient.describeIndex(indexName);
int waitedTimeMs = 0;
int intervalMs = 1500;

while (!index.getStatus().getReady()) {
index = controlPlaneClient.describeIndex(indexName);
if (waitedTimeMs >= totalMsToWait) {
logger.info("Index " + indexName + " not ready after " + waitedTimeMs + "ms");
break;
}
if (index.getStatus().getReady()) {
logger.info("Index " + indexName + " is ready after " + waitedTimeMs + "ms");
break;
}
Thread.sleep(intervalMs);
waitedTimeMs += intervalMs;
}
return index;
}

public static IndexModel waitUntilIndexIsReady(PineconeControlPlaneClient controlPlaneClient, String indexName) throws InterruptedException {
return waitUntilIndexIsReady(controlPlaneClient, indexName, 120000);
}

public static PineconeConnection createNewIndexAndConnect(PineconeControlPlaneClient controlPlaneClient, String indexName, int dimension, IndexMetric metric, CreateIndexRequestSpec spec) throws InterruptedException, PineconeException {
CreateIndexRequest createIndexRequest = new CreateIndexRequest().name(indexName).dimension(dimension).metric(metric).spec(spec);
controlPlaneClient.createIndex(createIndexRequest);

// Wait until index is ready
waitUntilIndexIsReady(controlPlaneClient, indexName, 200000);
// wait a bit more before we connect...
Thread.sleep(15000);

String host = controlPlaneClient.describeIndex(indexName).getHost();

PineconeClientConfig specificConfig = new PineconeClientConfig().withApiKey(System.getenv("PINECONE_API_KEY"));
PineconeClient dataPlaneClient = new PineconeClient(specificConfig);

return dataPlaneClient.connect(
new PineconeConnectionConfig()
.withConnectionUrl("https://" + host));
}

public static CollectionModel createCollection(PineconeControlPlaneClient controlPlaneClient, String collectionName, String indexName, boolean waitUntilReady) throws InterruptedException {
CreateCollectionRequest createCollectionRequest = new CreateCollectionRequest().name(collectionName).source(indexName);
CollectionModel collection = controlPlaneClient.createCollection(createCollectionRequest);

assertEquals(collection.getStatus(), CollectionModel.StatusEnum.INITIALIZING);

// Wait until collection is ready
if (waitUntilReady) {
int timeWaited = 0;
CollectionModel.StatusEnum collectionReady = collection.getStatus();
while (collectionReady != CollectionModel.StatusEnum.READY && timeWaited < 120000) {
logger.info("Waiting for collection" + collectionName + " to be ready. Waited " + timeWaited + " milliseconds...");
Thread.sleep(5000);
timeWaited += 5000;
collection = controlPlaneClient.describeCollection(collectionName);
collectionReady = collection.getStatus();
}

if (timeWaited > 120000) {
fail("Collection: " + collectionName + " is not ready after 120 seconds");
}
}

return collection;
}

public static IndexModel isIndexReady(String indexName, PineconeControlPlaneClient controlPlaneClient)
throws InterruptedException {
final IndexModel[] indexModels = new IndexModel[1];
assertWithRetry(() -> {
indexModels[0] = controlPlaneClient.describeIndex(indexName);
assert (indexModels[0].getStatus().getReady());
}, 1);
}, 4);

return indexModels[0];
}
Expand Down
Loading

0 comments on commit 20a38ac

Please sign in to comment.