Skip to content

Commit

Permalink
Add configurable auto-checksum (#1990)
Browse files Browse the repository at this point in the history
Allow configuring the automatic checksum added to uploads, and test multipart uploads with all hash types.

Adds test for minio/minio#20231 as well.
  • Loading branch information
klauspost authored Aug 21, 2024
1 parent e337e77 commit 21381fc
Show file tree
Hide file tree
Showing 7 changed files with 135 additions and 86 deletions.
10 changes: 5 additions & 5 deletions api-put-object-multipart.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"encoding/hex"
"encoding/xml"
"fmt"
"hash/crc32"
"io"
"net/http"
"net/url"
Expand Down Expand Up @@ -87,7 +86,7 @@ func (c *Client) putObjectMultipartNoStream(ctx context.Context, bucketName, obj
if opts.UserMetadata == nil {
opts.UserMetadata = make(map[string]string, 1)
}
opts.UserMetadata["X-Amz-Checksum-Algorithm"] = "CRC32C"
opts.UserMetadata["X-Amz-Checksum-Algorithm"] = opts.AutoChecksum.String()
}

// Initiate a new multipart upload.
Expand Down Expand Up @@ -116,7 +115,7 @@ func (c *Client) putObjectMultipartNoStream(ctx context.Context, bucketName, obj
// CRC32C is ~50% faster on AMD64 @ 30GB/s
var crcBytes []byte
customHeader := make(http.Header)
crc := crc32.New(crc32.MakeTable(crc32.Castagnoli))
crc := opts.AutoChecksum.Hasher()
for partNumber <= totalPartsCount {
length, rErr := readFull(reader, buf)
if rErr == io.EOF && partNumber > 1 {
Expand Down Expand Up @@ -154,7 +153,7 @@ func (c *Client) putObjectMultipartNoStream(ctx context.Context, bucketName, obj
crc.Reset()
crc.Write(buf[:length])
cSum := crc.Sum(nil)
customHeader.Set("x-amz-checksum-crc32c", base64.StdEncoding.EncodeToString(cSum))
customHeader.Set(opts.AutoChecksum.Key(), base64.StdEncoding.EncodeToString(cSum))
crcBytes = append(crcBytes, cSum...)
}

Expand Down Expand Up @@ -202,12 +201,13 @@ func (c *Client) putObjectMultipartNoStream(ctx context.Context, bucketName, obj
sort.Sort(completedParts(complMultipartUpload.Parts))
opts = PutObjectOptions{
ServerSideEncryption: opts.ServerSideEncryption,
AutoChecksum: opts.AutoChecksum,
}
if len(crcBytes) > 0 {
// Add hash of hashes.
crc.Reset()
crc.Write(crcBytes)
opts.UserMetadata = map[string]string{"X-Amz-Checksum-Crc32c": base64.StdEncoding.EncodeToString(crc.Sum(nil))}
opts.UserMetadata = map[string]string{opts.AutoChecksum.Key(): base64.StdEncoding.EncodeToString(crc.Sum(nil))}
}
uploadInfo, err := c.completeMultipartUpload(ctx, bucketName, objectName, uploadID, complMultipartUpload, opts)
if err != nil {
Expand Down
39 changes: 22 additions & 17 deletions api-put-object-streaming.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"context"
"encoding/base64"
"fmt"
"hash/crc32"
"io"
"net/http"
"net/url"
Expand Down Expand Up @@ -115,7 +114,7 @@ func (c *Client) putObjectMultipartStreamFromReadAt(ctx context.Context, bucketN
if opts.UserMetadata == nil {
opts.UserMetadata = make(map[string]string, 1)
}
opts.UserMetadata["X-Amz-Checksum-Algorithm"] = "CRC32C"
opts.UserMetadata["X-Amz-Checksum-Algorithm"] = opts.AutoChecksum.String()
}
// Initiate a new multipart upload.
uploadID, err := c.newUploadID(ctx, bucketName, objectName, opts)
Expand Down Expand Up @@ -195,10 +194,10 @@ func (c *Client) putObjectMultipartStreamFromReadAt(ctx context.Context, bucketN
sectionReader := newHook(io.NewSectionReader(reader, readOffset, partSize), opts.Progress)
trailer := make(http.Header, 1)
if withChecksum {
crc := crc32.New(crc32.MakeTable(crc32.Castagnoli))
trailer.Set("x-amz-checksum-crc32c", base64.StdEncoding.EncodeToString(crc.Sum(nil)))
crc := opts.AutoChecksum.Hasher()
trailer.Set(opts.AutoChecksum.Key(), base64.StdEncoding.EncodeToString(crc.Sum(nil)))
sectionReader = newHashReaderWrapper(sectionReader, crc, func(hash []byte) {
trailer.Set("x-amz-checksum-crc32c", base64.StdEncoding.EncodeToString(hash))
trailer.Set(opts.AutoChecksum.Key(), base64.StdEncoding.EncodeToString(hash))
})
}

Expand Down Expand Up @@ -271,17 +270,18 @@ func (c *Client) putObjectMultipartStreamFromReadAt(ctx context.Context, bucketN

opts = PutObjectOptions{
ServerSideEncryption: opts.ServerSideEncryption,
AutoChecksum: opts.AutoChecksum,
}
if withChecksum {
// Add hash of hashes.
crc := crc32.New(crc32.MakeTable(crc32.Castagnoli))
crc := opts.AutoChecksum.Hasher()
for _, part := range complMultipartUpload.Parts {
cs, err := base64.StdEncoding.DecodeString(part.ChecksumCRC32C)
cs, err := base64.StdEncoding.DecodeString(part.Checksum(opts.AutoChecksum))
if err == nil {
crc.Write(cs)
}
}
opts.UserMetadata = map[string]string{"X-Amz-Checksum-Crc32c": base64.StdEncoding.EncodeToString(crc.Sum(nil))}
opts.UserMetadata = map[string]string{opts.AutoChecksum.KeyCapitalized(): base64.StdEncoding.EncodeToString(crc.Sum(nil))}
}

uploadInfo, err := c.completeMultipartUpload(ctx, bucketName, objectName, uploadID, complMultipartUpload, opts)
Expand All @@ -308,7 +308,7 @@ func (c *Client) putObjectMultipartStreamOptionalChecksum(ctx context.Context, b
if opts.UserMetadata == nil {
opts.UserMetadata = make(map[string]string, 1)
}
opts.UserMetadata["X-Amz-Checksum-Algorithm"] = "CRC32C"
opts.UserMetadata["X-Amz-Checksum-Algorithm"] = opts.AutoChecksum.String()
}

// Calculate the optimal parts info for a given size.
Expand Down Expand Up @@ -337,7 +337,7 @@ func (c *Client) putObjectMultipartStreamOptionalChecksum(ctx context.Context, b
// CRC32C is ~50% faster on AMD64 @ 30GB/s
var crcBytes []byte
customHeader := make(http.Header)
crc := crc32.New(crc32.MakeTable(crc32.Castagnoli))
crc := opts.AutoChecksum.Hasher()
md5Hash := c.md5Hasher()
defer md5Hash.Close()

Expand Down Expand Up @@ -381,7 +381,7 @@ func (c *Client) putObjectMultipartStreamOptionalChecksum(ctx context.Context, b
crc.Reset()
crc.Write(buf[:length])
cSum := crc.Sum(nil)
customHeader.Set("x-amz-checksum-crc32c", base64.StdEncoding.EncodeToString(cSum))
customHeader.Set(opts.AutoChecksum.KeyCapitalized(), base64.StdEncoding.EncodeToString(cSum))
crcBytes = append(crcBytes, cSum...)
}

Expand Down Expand Up @@ -433,12 +433,13 @@ func (c *Client) putObjectMultipartStreamOptionalChecksum(ctx context.Context, b

opts = PutObjectOptions{
ServerSideEncryption: opts.ServerSideEncryption,
AutoChecksum: opts.AutoChecksum,
}
if len(crcBytes) > 0 {
// Add hash of hashes.
crc.Reset()
crc.Write(crcBytes)
opts.UserMetadata = map[string]string{"X-Amz-Checksum-Crc32c": base64.StdEncoding.EncodeToString(crc.Sum(nil))}
opts.UserMetadata = map[string]string{opts.AutoChecksum.KeyCapitalized(): base64.StdEncoding.EncodeToString(crc.Sum(nil))}
}
uploadInfo, err := c.completeMultipartUpload(ctx, bucketName, objectName, uploadID, complMultipartUpload, opts)
if err != nil {
Expand Down Expand Up @@ -467,7 +468,7 @@ func (c *Client) putObjectMultipartStreamParallel(ctx context.Context, bucketNam
if opts.UserMetadata == nil {
opts.UserMetadata = make(map[string]string, 1)
}
opts.UserMetadata["X-Amz-Checksum-Algorithm"] = "CRC32C"
opts.UserMetadata["X-Amz-Checksum-Algorithm"] = opts.AutoChecksum.String()
}

// Cancel all when an error occurs.
Expand Down Expand Up @@ -500,7 +501,7 @@ func (c *Client) putObjectMultipartStreamParallel(ctx context.Context, bucketNam
// Create checksums
// CRC32C is ~50% faster on AMD64 @ 30GB/s
var crcBytes []byte
crc := crc32.New(crc32.MakeTable(crc32.Castagnoli))
crc := opts.AutoChecksum.Hasher()

// Total data read and written to server. should be equal to 'size' at the end of the call.
var totalUploadedSize int64
Expand Down Expand Up @@ -558,7 +559,7 @@ func (c *Client) putObjectMultipartStreamParallel(ctx context.Context, bucketNam
crc.Reset()
crc.Write(buf[:length])
cSum := crc.Sum(nil)
customHeader.Set("x-amz-checksum-crc32c", base64.StdEncoding.EncodeToString(cSum))
customHeader.Set(opts.AutoChecksum.Key(), base64.StdEncoding.EncodeToString(cSum))
crcBytes = append(crcBytes, cSum...)
}

Expand Down Expand Up @@ -639,12 +640,13 @@ func (c *Client) putObjectMultipartStreamParallel(ctx context.Context, bucketNam

opts = PutObjectOptions{
ServerSideEncryption: opts.ServerSideEncryption,
AutoChecksum: opts.AutoChecksum,
}
if len(crcBytes) > 0 {
// Add hash of hashes.
crc.Reset()
crc.Write(crcBytes)
opts.UserMetadata = map[string]string{"X-Amz-Checksum-Crc32c": base64.StdEncoding.EncodeToString(crc.Sum(nil))}
opts.UserMetadata = map[string]string{opts.AutoChecksum.KeyCapitalized(): base64.StdEncoding.EncodeToString(crc.Sum(nil))}
}
uploadInfo, err := c.completeMultipartUpload(ctx, bucketName, objectName, uploadID, complMultipartUpload, opts)
if err != nil {
Expand Down Expand Up @@ -765,7 +767,10 @@ func (c *Client) putObjectDo(ctx context.Context, bucketName, objectName string,
contentMD5Base64: md5Base64,
contentSHA256Hex: sha256Hex,
streamSha256: !opts.DisableContentSha256,
addCrc: addCrc,
}
if addCrc {
opts.AutoChecksum.SetDefault(ChecksumCRC32C)
reqMetadata.addCrc = &opts.AutoChecksum
}
if opts.Internal.SourceVersionID != "" {
if opts.Internal.SourceVersionID != nullVersionID {
Expand Down
16 changes: 11 additions & 5 deletions api-put-object.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"encoding/base64"
"errors"
"fmt"
"hash/crc32"
"io"
"net/http"
"sort"
Expand Down Expand Up @@ -90,6 +89,11 @@ type PutObjectOptions struct {
DisableContentSha256 bool
DisableMultipart bool

// AutoChecksum is the type of checksum that will be added if no other checksum is added,
// like MD5 or SHA256 streaming checksum, and it is feasible for the upload type.
// If none is specified CRC32C is used, since it is generally the fastest.
AutoChecksum ChecksumType

// ConcurrentStreamParts will create NumThreads buffers of PartSize bytes,
// fill them serially and upload them in parallel.
// This can be used for faster uploads on non-seekable or slow-to-seek input.
Expand Down Expand Up @@ -300,6 +304,7 @@ func (c *Client) putObjectCommon(ctx context.Context, bucketName, objectName str
if size > int64(maxMultipartPutObjectSize) {
return UploadInfo{}, errEntityTooLarge(size, maxMultipartPutObjectSize, bucketName, objectName)
}
opts.AutoChecksum.SetDefault(ChecksumCRC32C)

// NOTE: Streaming signature is not supported by GCS.
if s3utils.IsGoogleEndpoint(*c.endpointURL) {
Expand Down Expand Up @@ -361,7 +366,7 @@ func (c *Client) putObjectMultipartStreamNoLength(ctx context.Context, bucketNam
if opts.UserMetadata == nil {
opts.UserMetadata = make(map[string]string, 1)
}
opts.UserMetadata["X-Amz-Checksum-Algorithm"] = "CRC32C"
opts.UserMetadata["X-Amz-Checksum-Algorithm"] = opts.AutoChecksum.String()
}

// Initiate a new multipart upload.
Expand Down Expand Up @@ -390,7 +395,7 @@ func (c *Client) putObjectMultipartStreamNoLength(ctx context.Context, bucketNam
// CRC32C is ~50% faster on AMD64 @ 30GB/s
var crcBytes []byte
customHeader := make(http.Header)
crc := crc32.New(crc32.MakeTable(crc32.Castagnoli))
crc := opts.AutoChecksum.Hasher()

for partNumber <= totalPartsCount {
length, rerr := readFull(reader, buf)
Expand All @@ -413,7 +418,7 @@ func (c *Client) putObjectMultipartStreamNoLength(ctx context.Context, bucketNam
crc.Reset()
crc.Write(buf[:length])
cSum := crc.Sum(nil)
customHeader.Set("x-amz-checksum-crc32c", base64.StdEncoding.EncodeToString(cSum))
customHeader.Set(opts.AutoChecksum.Key(), base64.StdEncoding.EncodeToString(cSum))
crcBytes = append(crcBytes, cSum...)
}

Expand Down Expand Up @@ -466,12 +471,13 @@ func (c *Client) putObjectMultipartStreamNoLength(ctx context.Context, bucketNam

opts = PutObjectOptions{
ServerSideEncryption: opts.ServerSideEncryption,
AutoChecksum: opts.AutoChecksum,
}
if len(crcBytes) > 0 {
// Add hash of hashes.
crc.Reset()
crc.Write(crcBytes)
opts.UserMetadata = map[string]string{"X-Amz-Checksum-Crc32c": base64.StdEncoding.EncodeToString(crc.Sum(nil))}
opts.UserMetadata = map[string]string{opts.AutoChecksum.KeyCapitalized(): base64.StdEncoding.EncodeToString(crc.Sum(nil))}
}
uploadInfo, err := c.completeMultipartUpload(ctx, bucketName, objectName, uploadID, complMultipartUpload, opts)
if err != nil {
Expand Down
16 changes: 16 additions & 0 deletions api-s3-datatypes.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,22 @@ type CompletePart struct {
ChecksumSHA256 string `xml:"ChecksumSHA256,omitempty"`
}

// Checksum will return the checksum for the given type.
// Will return the empty string if not set.
func (c CompletePart) Checksum(t ChecksumType) string {
switch {
case t.Is(ChecksumCRC32C):
return c.ChecksumCRC32C
case t.Is(ChecksumCRC32):
return c.ChecksumCRC32
case t.Is(ChecksumSHA1):
return c.ChecksumSHA1
case t.Is(ChecksumSHA256):
return c.ChecksumSHA256
}
return ""
}

// completeMultipartUpload container for completing multipart upload.
type completeMultipartUpload struct {
XMLName xml.Name `xml:"http://s3.amazonaws.com/doc/2006-03-01/ CompleteMultipartUpload" json:"-"`
Expand Down
11 changes: 5 additions & 6 deletions api.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"encoding/base64"
"errors"
"fmt"
"hash/crc32"
"io"
"math/rand"
"net"
Expand Down Expand Up @@ -471,7 +470,7 @@ type requestMetadata struct {
contentMD5Base64 string // carries base64 encoded md5sum
contentSHA256Hex string // carries hex encoded sha256sum
streamSha256 bool
addCrc bool
addCrc *ChecksumType
trailer http.Header // (http.Request).Trailer. Requires v4 signature.
}

Expand Down Expand Up @@ -616,16 +615,16 @@ func (c *Client) executeMethod(ctx context.Context, method string, metadata requ
}
}

if metadata.addCrc && metadata.contentLength > 0 {
if metadata.addCrc != nil && metadata.contentLength > 0 {
if metadata.trailer == nil {
metadata.trailer = make(http.Header, 1)
}
crc := crc32.New(crc32.MakeTable(crc32.Castagnoli))
crc := metadata.addCrc.Hasher()
metadata.contentBody = newHashReaderWrapper(metadata.contentBody, crc, func(hash []byte) {
// Update trailer when done.
metadata.trailer.Set("x-amz-checksum-crc32c", base64.StdEncoding.EncodeToString(hash))
metadata.trailer.Set(metadata.addCrc.Key(), base64.StdEncoding.EncodeToString(hash))
})
metadata.trailer.Set("x-amz-checksum-crc32c", base64.StdEncoding.EncodeToString(crc.Sum(nil)))
metadata.trailer.Set(metadata.addCrc.Key(), base64.StdEncoding.EncodeToString(crc.Sum(nil)))
}

// Create cancel context to control 'newRetryTimer' go routine.
Expand Down
13 changes: 13 additions & 0 deletions checksum.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"hash/crc32"
"io"
"math/bits"
"net/http"
)

// ChecksumType contains information about the checksum type.
Expand Down Expand Up @@ -78,6 +79,11 @@ func (c ChecksumType) Key() string {
return ""
}

// KeyCapitalized returns the capitalized key as used in HTTP headers.
func (c ChecksumType) KeyCapitalized() string {
return http.CanonicalHeaderKey(c.Key())
}

// RawByteLen returns the size of the un-encoded checksum.
func (c ChecksumType) RawByteLen() int {
switch c & checksumMask {
Expand Down Expand Up @@ -112,6 +118,13 @@ func (c ChecksumType) IsSet() bool {
return bits.OnesCount32(uint32(c)) == 1
}

// SetDefault will set the checksum if not already set.
func (c *ChecksumType) SetDefault(t ChecksumType) {
if !c.IsSet() {
*c = t
}
}

// String returns the type as a string.
// CRC32, CRC32C, SHA1, and SHA256 for valid values.
// Empty string for unset and "<invalid>" if not valid.
Expand Down
Loading

0 comments on commit 21381fc

Please sign in to comment.