Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use async client for delete blob or path in S3 Blob Container #16788

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import org.opensearch.repositories.RepositoryStats;
import org.opensearch.repositories.blobstore.BlobStoreRepository;
import org.opensearch.repositories.blobstore.OpenSearchMockAPIBasedRepositoryIntegTestCase;
import org.opensearch.repositories.s3.async.AsyncTransferManager;
import org.opensearch.repositories.s3.utils.AwsRequestSigner;
import org.opensearch.snapshots.mockstore.BlobStoreWrapper;
import org.opensearch.test.BackgroundIndexer;
Expand Down Expand Up @@ -153,7 +154,6 @@ protected Settings nodeSettings(int nodeOrdinal) {
// Disable request throttling because some random values in tests might generate too many failures for the S3 client
.put(S3ClientSettings.USE_THROTTLE_RETRIES_SETTING.getConcreteSettingForNamespace("test").getKey(), false)
.put(S3ClientSettings.PROXY_TYPE_SETTING.getConcreteSettingForNamespace("test").getKey(), ProxySettings.ProxyType.DIRECT)
.put(BlobStoreRepository.SNAPSHOT_ASYNC_DELETION_ENABLE_SETTING.getKey(), false)
.put(super.nodeSettings(nodeOrdinal))
.setSecureSettings(secureSettings);

Expand Down Expand Up @@ -253,22 +253,27 @@ protected S3Repository createRepository(
ClusterService clusterService,
RecoverySettings recoverySettings
) {
GenericStatsMetricPublisher genericStatsMetricPublisher = new GenericStatsMetricPublisher(10000L, 10, 10000L, 10);

AsyncTransferManager asyncUploadUtils = new AsyncTransferManager(
S3Repository.PARALLEL_MULTIPART_UPLOAD_MINIMUM_PART_SIZE_SETTING.get(clusterService.getSettings()).getBytes(),
normalExecutorBuilder.getStreamReader(),
priorityExecutorBuilder.getStreamReader(),
urgentExecutorBuilder.getStreamReader(),
transferSemaphoresHolder
);
return new S3Repository(
metadata,
registry,
service,
clusterService,
recoverySettings,
null,
null,
null,
null,
null,
false,
null,
null,
asyncUploadUtils,
urgentExecutorBuilder,
priorityExecutorBuilder,
normalExecutorBuilder,
s3AsyncService,
S3Repository.PARALLEL_MULTIPART_UPLOAD_ENABLED_SETTING.get(clusterService.getSettings()),
normalPrioritySizeBasedBlockingQ,
lowPrioritySizeBasedBlockingQ,
genericStatsMetricPublisher
) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,14 +55,6 @@

public class S3RepositoryThirdPartyTests extends AbstractThirdPartyRepositoryTestCase {

@Override
protected Settings nodeSettings() {
return Settings.builder()
.put(super.nodeSettings())
.put(BlobStoreRepository.SNAPSHOT_ASYNC_DELETION_ENABLE_SETTING.getKey(), false)
.build();
}

@Override
@Before
@SuppressForbidden(reason = "Need to set system property here for AWS SDK v2")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient;
import software.amazon.awssdk.http.nio.netty.ProxyConfiguration;
import software.amazon.awssdk.http.nio.netty.SdkEventLoopGroup;
import software.amazon.awssdk.profiles.ProfileFileSystemSetting;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.S3AsyncClientBuilder;
Expand Down Expand Up @@ -120,6 +119,7 @@ public AmazonAsyncS3Reference client(
if (existing != null && existing.tryIncRef()) {
return existing;
}

final AmazonAsyncS3Reference clientReference = new AmazonAsyncS3Reference(
buildClient(clientSettings, urgentExecutorBuilder, priorityExecutorBuilder, normalExecutorBuilder)
);
Expand Down Expand Up @@ -235,17 +235,17 @@ synchronized AmazonAsyncS3WithCredentials buildClient(
}

static ClientOverrideConfiguration buildOverrideConfiguration(final S3ClientSettings clientSettings) {
RetryPolicy retryPolicy = SocketAccess.doPrivileged(
() -> RetryPolicy.builder()
.numRetries(clientSettings.maxRetries)
.throttlingBackoffStrategy(
clientSettings.throttleRetries ? BackoffStrategy.defaultThrottlingStrategy(RetryMode.STANDARD) : BackoffStrategy.none()
)
.build()
);

return ClientOverrideConfiguration.builder()
.retryPolicy(
RetryPolicy.builder()
.numRetries(clientSettings.maxRetries)
.throttlingBackoffStrategy(
clientSettings.throttleRetries
? BackoffStrategy.defaultThrottlingStrategy(RetryMode.STANDARD)
: BackoffStrategy.none()
)
.build()
)
.retryPolicy(retryPolicy)
.apiCallAttemptTimeout(Duration.ofMillis(clientSettings.requestTimeoutMillis))
.build();
}
Expand Down Expand Up @@ -346,12 +346,7 @@ static AwsCredentialsProvider buildCredentials(Logger logger, S3ClientSettings c
// valid paths.
@SuppressForbidden(reason = "Need to provide this override to v2 SDK so that path does not default to home path")
private static void setDefaultAwsProfilePath() {
if (ProfileFileSystemSetting.AWS_SHARED_CREDENTIALS_FILE.getStringValue().isEmpty()) {
System.setProperty(ProfileFileSystemSetting.AWS_SHARED_CREDENTIALS_FILE.property(), System.getProperty("opensearch.path.conf"));
}
if (ProfileFileSystemSetting.AWS_CONFIG_FILE.getStringValue().isEmpty()) {
System.setProperty(ProfileFileSystemSetting.AWS_CONFIG_FILE.property(), System.getProperty("opensearch.path.conf"));
}
S3Service.setDefaultAwsProfilePath();
}

private static IrsaCredentials buildFromEnvironment(IrsaCredentials defaults) {
Expand Down Expand Up @@ -443,5 +438,6 @@ public AwsCredentials resolveCredentials() {
@Override
public void close() {
releaseCachedClients();

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,6 @@
import software.amazon.awssdk.services.s3.model.CompletedMultipartUpload;
import software.amazon.awssdk.services.s3.model.CompletedPart;
import software.amazon.awssdk.services.s3.model.CreateMultipartUploadRequest;
import software.amazon.awssdk.services.s3.model.Delete;
import software.amazon.awssdk.services.s3.model.DeleteObjectsRequest;
import software.amazon.awssdk.services.s3.model.DeleteObjectsResponse;
import software.amazon.awssdk.services.s3.model.GetObjectAttributesRequest;
import software.amazon.awssdk.services.s3.model.GetObjectAttributesResponse;
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
Expand All @@ -55,9 +52,7 @@
import software.amazon.awssdk.services.s3.model.ListObjectsV2Response;
import software.amazon.awssdk.services.s3.model.NoSuchKeyException;
import software.amazon.awssdk.services.s3.model.ObjectAttributes;
import software.amazon.awssdk.services.s3.model.ObjectIdentifier;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
import software.amazon.awssdk.services.s3.model.S3Error;
import software.amazon.awssdk.services.s3.model.ServerSideEncryption;
import software.amazon.awssdk.services.s3.model.UploadPartRequest;
import software.amazon.awssdk.services.s3.model.UploadPartResponse;
Expand All @@ -68,7 +63,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.ExceptionsHelper;
import org.opensearch.action.support.PlainActionFuture;
import org.opensearch.common.Nullable;
import org.opensearch.common.SetOnce;
import org.opensearch.common.StreamContext;
Expand Down Expand Up @@ -101,11 +96,8 @@
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
Expand Down Expand Up @@ -381,125 +373,17 @@
}

@Override
public DeleteResult delete() throws IOException {
final AtomicLong deletedBlobs = new AtomicLong();
final AtomicLong deletedBytes = new AtomicLong();
try (AmazonS3Reference clientReference = blobStore.clientReference()) {
ListObjectsV2Iterable listObjectsIterable = SocketAccess.doPrivileged(
() -> clientReference.get()
.listObjectsV2Paginator(
ListObjectsV2Request.builder()
.bucket(blobStore.bucket())
.prefix(keyPath)
.overrideConfiguration(
o -> o.addMetricPublisher(blobStore.getStatsMetricPublisher().listObjectsMetricPublisher)
)
.build()
)
);

Iterator<ListObjectsV2Response> listObjectsResponseIterator = listObjectsIterable.iterator();
while (listObjectsResponseIterator.hasNext()) {
ListObjectsV2Response listObjectsResponse = SocketAccess.doPrivileged(listObjectsResponseIterator::next);
List<String> blobsToDelete = listObjectsResponse.contents().stream().map(s3Object -> {
deletedBlobs.incrementAndGet();
deletedBytes.addAndGet(s3Object.size());

return s3Object.key();
}).collect(Collectors.toList());

if (!listObjectsResponseIterator.hasNext()) {
blobsToDelete.add(keyPath);
}

doDeleteBlobs(blobsToDelete, false);
}
} catch (SdkException e) {
throw new IOException("Exception when deleting blob container [" + keyPath + "]", e);
}

return new DeleteResult(deletedBlobs.get(), deletedBytes.get());
public DeleteResult delete() {
PlainActionFuture<DeleteResult> future = new PlainActionFuture<>();
deleteAsync(future);
return future.actionGet();

Check warning on line 379 in plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java

View check run for this annotation

Codecov / codecov/patch

plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java#L377-L379

Added lines #L377 - L379 were not covered by tests
}

@Override
public void deleteBlobsIgnoringIfNotExists(List<String> blobNames) throws IOException {
doDeleteBlobs(blobNames, true);
}

private void doDeleteBlobs(List<String> blobNames, boolean relative) throws IOException {
if (blobNames.isEmpty()) {
return;
}
final Set<String> outstanding;
if (relative) {
outstanding = blobNames.stream().map(this::buildKey).collect(Collectors.toSet());
} else {
outstanding = new HashSet<>(blobNames);
}
try (AmazonS3Reference clientReference = blobStore.clientReference()) {
// S3 API allows 1k blobs per delete so we split up the given blobs into requests of bulk size deletes
final List<DeleteObjectsRequest> deleteRequests = new ArrayList<>();
final List<String> partition = new ArrayList<>();
for (String key : outstanding) {
partition.add(key);
if (partition.size() == blobStore.getBulkDeletesSize()) {
deleteRequests.add(bulkDelete(blobStore.bucket(), partition));
partition.clear();
}
}
if (partition.isEmpty() == false) {
deleteRequests.add(bulkDelete(blobStore.bucket(), partition));
}
SocketAccess.doPrivilegedVoid(() -> {
SdkException aex = null;
for (DeleteObjectsRequest deleteRequest : deleteRequests) {
List<String> keysInRequest = deleteRequest.delete()
.objects()
.stream()
.map(ObjectIdentifier::key)
.collect(Collectors.toList());
try {
DeleteObjectsResponse deleteObjectsResponse = clientReference.get().deleteObjects(deleteRequest);
outstanding.removeAll(keysInRequest);
outstanding.addAll(deleteObjectsResponse.errors().stream().map(S3Error::key).collect(Collectors.toSet()));
if (!deleteObjectsResponse.errors().isEmpty()) {
logger.warn(
() -> new ParameterizedMessage(
"Failed to delete some blobs {}",
deleteObjectsResponse.errors()
.stream()
.map(s3Error -> "[" + s3Error.key() + "][" + s3Error.code() + "][" + s3Error.message() + "]")
.collect(Collectors.toList())
)
);
}
} catch (SdkException e) {
// The AWS client threw any unexpected exception and did not execute the request at all so we do not
// remove any keys from the outstanding deletes set.
aex = ExceptionsHelper.useOrSuppress(aex, e);
}
}
if (aex != null) {
throw aex;
}
});
} catch (Exception e) {
throw new IOException("Failed to delete blobs [" + outstanding + "]", e);
}
assert outstanding.isEmpty();
}

private DeleteObjectsRequest bulkDelete(String bucket, List<String> blobs) {
return DeleteObjectsRequest.builder()
.bucket(bucket)
.delete(
Delete.builder()
.objects(blobs.stream().map(blob -> ObjectIdentifier.builder().key(blob).build()).collect(Collectors.toList()))
.quiet(true)
.build()
)
.overrideConfiguration(o -> o.addMetricPublisher(blobStore.getStatsMetricPublisher().deleteObjectsMetricPublisher))
.build();
public void deleteBlobsIgnoringIfNotExists(List<String> blobNames) {
PlainActionFuture<Void> future = new PlainActionFuture<>();
deleteBlobsAsyncIgnoringIfNotExists(blobNames, future);
future.actionGet();

Check warning on line 386 in plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java

View check run for this annotation

Codecov / codecov/patch

plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java#L384-L386

Added lines #L384 - L386 were not covered by tests
}

@Override
Expand Down Expand Up @@ -886,7 +770,11 @@
try (AmazonAsyncS3Reference asyncClientReference = blobStore.asyncClientReference()) {
S3AsyncClient s3AsyncClient = asyncClientReference.get().client();

ListObjectsV2Request listRequest = ListObjectsV2Request.builder().bucket(blobStore.bucket()).prefix(keyPath).build();
ListObjectsV2Request listRequest = ListObjectsV2Request.builder()
.bucket(blobStore.bucket())
.prefix(keyPath)
.overrideConfiguration(o -> o.addMetricPublisher(blobStore.getStatsMetricPublisher().listObjectsMetricPublisher))
.build();
ListObjectsV2Publisher listPublisher = s3AsyncClient.listObjectsV2Paginator(listRequest);

AtomicLong deletedBlobs = new AtomicLong();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,19 +93,19 @@ public class S3RepositoryPlugin extends Plugin implements RepositoryPlugin, Relo
private static final String NORMAL_TRANSFER_QUEUE_CONSUMER = "normal_transfer_queue_consumer";

protected final S3Service service;
private final S3AsyncService s3AsyncService;
protected final S3AsyncService s3AsyncService;

private final Path configPath;

private AsyncExecutorContainer urgentExecutorBuilder;
private AsyncExecutorContainer priorityExecutorBuilder;
private AsyncExecutorContainer normalExecutorBuilder;
protected AsyncExecutorContainer urgentExecutorBuilder;
protected AsyncExecutorContainer priorityExecutorBuilder;
protected AsyncExecutorContainer normalExecutorBuilder;
private ExecutorService lowTransferQConsumerService;
private ExecutorService normalTransferQConsumerService;
private SizeBasedBlockingQ normalPrioritySizeBasedBlockingQ;
private SizeBasedBlockingQ lowPrioritySizeBasedBlockingQ;
private TransferSemaphoresHolder transferSemaphoresHolder;
private GenericStatsMetricPublisher genericStatsMetricPublisher;
protected SizeBasedBlockingQ normalPrioritySizeBasedBlockingQ;
protected SizeBasedBlockingQ lowPrioritySizeBasedBlockingQ;
protected TransferSemaphoresHolder transferSemaphoresHolder;
protected GenericStatsMetricPublisher genericStatsMetricPublisher;

public S3RepositoryPlugin(final Settings settings, final Path configPath) {
this(settings, configPath, new S3Service(configPath), new S3AsyncService(configPath));
Expand Down Expand Up @@ -387,5 +387,8 @@ public void reload(Settings settings) {
public void close() throws IOException {
service.close();
s3AsyncService.close();
urgentExecutorBuilder.getAsyncTransferEventLoopGroup().close();
priorityExecutorBuilder.getAsyncTransferEventLoopGroup().close();
normalExecutorBuilder.getAsyncTransferEventLoopGroup().close();
}
}
Loading
Loading