Skip to content

Commit

Permalink
stores: improve locking for deleting multipart uploads in parallel
Browse files Browse the repository at this point in the history
  • Loading branch information
ChrisSchinnerl committed Mar 4, 2024
1 parent a10da6f commit cce9d22
Showing 1 changed file with 25 additions and 19 deletions.
44 changes: 25 additions & 19 deletions stores/multipart.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,26 +274,32 @@ func (s *SQLStore) MultipartUploadParts(ctx context.Context, bucket, object stri

func (s *SQLStore) AbortMultipartUpload(ctx context.Context, bucket, path string, uploadID string) error {
return s.retryTransaction(func(tx *gorm.DB) error {
// Find multipart upload.
var mu dbMultipartUpload
err := tx.Where("upload_id = ?", uploadID).
Preload("Parts").
// delete multipart upload optimistically
res := tx.Where("upload_id", uploadID).
Where("object_id", path).
Where("DBBucket.name", bucket).
Joins("DBBucket").
Take(&mu).
Error
if err != nil {
return fmt.Errorf("failed to fetch multipart upload: %w", err)
}
if mu.ObjectID != path {
// Check object id.
return fmt.Errorf("object id mismatch: %v != %v: %w", mu.ObjectID, path, api.ErrObjectNotFound)
} else if mu.DBBucket.Name != bucket {
// Check bucket name.
return fmt.Errorf("bucket name mismatch: %v != %v: %w", mu.DBBucket.Name, bucket, api.ErrBucketNotFound)
}
err = tx.Delete(&mu).Error
if err != nil {
return fmt.Errorf("failed to delete multipart upload: %w", err)
Delete(&dbMultipartUpload{})
if res.Error != nil {
return fmt.Errorf("failed to fetch multipart upload: %w", res.Error)
}
// if the upload wasn't found, find out why
if res.RowsAffected == 0 {
var mu dbMultipartUpload
err := tx.Where("upload_id = ?", uploadID).
Joins("DBBucket").
Take(&mu).
Error
if errors.Is(err, gorm.ErrRecordNotFound) {
return api.ErrMultipartUploadNotFound
} else if err != nil {
return fmt.Errorf("failed to fetch multipart upload: %w", err)
} else if mu.ObjectID != path {
return fmt.Errorf("object id mismatch: %v != %v: %w", mu.ObjectID, path, api.ErrObjectNotFound)
} else if mu.DBBucket.Name != bucket {
return fmt.Errorf("bucket name mismatch: %v != %v: %w", mu.DBBucket.Name, bucket, api.ErrBucketNotFound)
}
return errors.New("failed to delete multipart upload for unknown reason")
}
// Prune the slabs.
if err := pruneSlabs(tx); err != nil {
Expand Down

0 comments on commit cce9d22

Please sign in to comment.