From 1e946ca4a45169e3555894dd58b1d523d6d74f0b Mon Sep 17 00:00:00 2001 From: PJ Date: Fri, 12 Jan 2024 09:52:46 +0100 Subject: [PATCH 1/4] worker: check context --- worker/upload.go | 2 ++ worker/upload_test.go | 8 ++++++++ 2 files changed, 10 insertions(+) diff --git a/worker/upload.go b/worker/upload.go index a5ae356a8..5631dd0ea 100644 --- a/worker/upload.go +++ b/worker/upload.go @@ -569,6 +569,8 @@ func (mgr *uploadManager) Upload(ctx context.Context, r io.Reader, contracts []a select { case <-mgr.shutdownCtx.Done(): return false, "", errWorkerShutDown + case <-ctx.Done(): + return false, "", ctx.Err() case numSlabs = <-numSlabsChan: case res := <-respChan: if res.err != nil { diff --git a/worker/upload_test.go b/worker/upload_test.go index fd4a82907..14cd0d923 100644 --- a/worker/upload_test.go +++ b/worker/upload_test.go @@ -125,6 +125,14 @@ func TestUploadDownload(t *testing.T) { if !errors.Is(err, errBucketNotFound) { t.Fatal("expected bucket not found error", err) } + + // upload data using a cancelled context - assert we don't hang + ctx, cancel := context.WithCancel(context.Background()) + cancel() + _, _, err = ul.Upload(ctx, bytes.NewReader(data), metadatas, params, lockingPriorityUpload) + if err == nil || !errors.Is(err, context.Canceled) { + t.Fatal(err) + } } func testParameters(bucket, path string) uploadParameters { From c2ca9c66c7be317cd6ba5d08f074302ed7cd1ebd Mon Sep 17 00:00:00 2001 From: PJ Date: Fri, 12 Jan 2024 10:55:55 +0100 Subject: [PATCH 2/4] worker: add custom error --- worker/upload.go | 3 ++- worker/upload_test.go | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/worker/upload.go b/worker/upload.go index 5631dd0ea..351cf53cb 100644 --- a/worker/upload.go +++ b/worker/upload.go @@ -31,6 +31,7 @@ var ( errNoCandidateUploader = errors.New("no candidate uploader found") errNotEnoughContracts = errors.New("not enough contracts to support requested redundancy") errWorkerShutDown = errors.New("worker was shut down") + errUploadInterrupted = errors.New("upload was interrupted") ) type ( @@ -570,7 +571,7 @@ func (mgr *uploadManager) Upload(ctx context.Context, r io.Reader, contracts []a case <-mgr.shutdownCtx.Done(): return false, "", errWorkerShutDown case <-ctx.Done(): - return false, "", ctx.Err() + return false, "", errUploadInterrupted case numSlabs = <-numSlabsChan: case res := <-respChan: if res.err != nil { diff --git a/worker/upload_test.go b/worker/upload_test.go index 14cd0d923..3534d2edf 100644 --- a/worker/upload_test.go +++ b/worker/upload_test.go @@ -130,7 +130,7 @@ func TestUploadDownload(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) cancel() _, _, err = ul.Upload(ctx, bytes.NewReader(data), metadatas, params, lockingPriorityUpload) - if err == nil || !errors.Is(err, context.Canceled) { + if err == nil || !errors.Is(err, errUploadInterrupted) { t.Fatal(err) } } From 25e5049effe11bb46b8acce7d8d78c9138904d82 Mon Sep 17 00:00:00 2001 From: PJ Date: Fri, 12 Jan 2024 11:05:16 +0100 Subject: [PATCH 3/4] worker: add regression test and improve upload flow --- worker/mocks_test.go | 7 +- worker/upload.go | 153 +++++++++++++++++++++--------------------- worker/upload_test.go | 55 +++++++++++++++ 3 files changed, 137 insertions(+), 78 deletions(-) diff --git a/worker/mocks_test.go b/worker/mocks_test.go index ba69ff117..13540cd7e 100644 --- a/worker/mocks_test.go +++ b/worker/mocks_test.go @@ -48,7 +48,9 @@ type ( } mockMemory struct{} - mockMemoryManager struct{} + mockMemoryManager struct { + memBlockChan chan struct{} + } mockObjectStore struct { mu sync.Mutex @@ -110,6 +112,9 @@ func (mm *mockMemoryManager) Limit(amt uint64) (MemoryManager, error) { } func (mm *mockMemoryManager) Status() api.MemoryStatus { return api.MemoryStatus{} } func (mm *mockMemoryManager) AcquireMemory(ctx context.Context, amt uint64) Memory { + if mm.memBlockChan != nil { + <-mm.memBlockChan + } return &mockMemory{} } diff --git a/worker/upload.go b/worker/upload.go index 351cf53cb..45f4c3ad6 100644 --- a/worker/upload.go +++ b/worker/upload.go @@ -104,9 +104,10 @@ type ( } slabUploadResponse struct { - slab object.SlabSlice - index int - err error + slabs []object.SlabSlice + bufferSizeLimitReached bool + index int + err error } sectorUpload struct { @@ -492,17 +493,16 @@ func (mgr *uploadManager) Upload(ctx context.Context, r io.Reader, contracts []a // create the response channel respChan := make(chan slabUploadResponse) - // channel to notify main thread of the number of slabs to wait for - numSlabsChan := make(chan int, 1) + // channel to notify main thread of the number of responses to wait for + numResponsesChan := make(chan int, 1) // prepare slab size size := int64(up.rs.MinShards) * rhpv2.SectorSize redundantSize := uint64(up.rs.TotalShards) * rhpv2.SectorSize - var partialSlab []byte // launch uploads in a separate goroutine go func() { - var slabIndex int + var respIndex int for { select { case <-mgr.shutdownCtx.Done(): @@ -511,6 +511,7 @@ func (mgr *uploadManager) Upload(ctx context.Context, r io.Reader, contracts []a return // interrupted default: } + // acquire memory mem := mgr.mm.AcquireMemory(ctx, redundantSize) if mem == nil { @@ -523,15 +524,10 @@ func (mgr *uploadManager) Upload(ctx context.Context, r io.Reader, contracts []a if err == io.EOF { mem.Release() - // no more data to upload, notify main thread of the number of - // slabs to wait for - numSlabs := slabIndex - if partialSlab != nil && slabIndex > 0 { - numSlabs-- // don't wait on partial slab - } - numSlabsChan <- numSlabs + // we know for sure how many responses the main thread should expect + numResponsesChan <- respIndex return - } else if err != nil && err != io.ErrUnexpectedEOF { + } else if err != nil && !errors.Is(err, io.ErrUnexpectedEOF) { mem.Release() // unexpected error, notify main thread @@ -540,39 +536,26 @@ func (mgr *uploadManager) Upload(ctx context.Context, r io.Reader, contracts []a case <-ctx.Done(): } return - } else if up.packing && errors.Is(err, io.ErrUnexpectedEOF) { - mem.Release() - - // uploadPacking is true, we return the partial slab without - // uploading. - partialSlab = data[:length] + } else if errors.Is(err, io.ErrUnexpectedEOF) && up.packing { + go mgr.addPartialSlab(ctx, mem, up.rs, data[:length], up.contractSet, respIndex, respChan) + respIndex++ } else { - // regular upload - go func(rs api.RedundancySettings, data []byte, length, slabIndex int) { - uploadSpeed, overdrivePct := upload.uploadSlab(ctx, rs, data, length, slabIndex, respChan, mgr.candidates(upload.allowed), mem, mgr.maxOverdrive, mgr.overdriveTimeout) - - // track stats - mgr.statsSlabUploadSpeedBytesPerMS.Track(float64(uploadSpeed)) - mgr.statsOverdrivePct.Track(overdrivePct) - - // release memory - mem.Release() - }(up.rs, data, length, slabIndex) + go mgr.addSlab(ctx, mem, up.rs, upload, data, length, respIndex, respChan) + respIndex++ } - slabIndex++ } }() // collect responses var responses []slabUploadResponse - numSlabs := math.MaxInt32 - for len(responses) < numSlabs { + numResponses := math.MaxInt32 + for len(responses) < numResponses { select { case <-mgr.shutdownCtx.Done(): return false, "", errWorkerShutDown case <-ctx.Done(): return false, "", errUploadInterrupted - case numSlabs = <-numSlabsChan: + case numResponses = <-numResponsesChan: case res := <-respChan: if res.err != nil { return false, "", res.err @@ -588,30 +571,20 @@ func (mgr *uploadManager) Upload(ctx context.Context, r io.Reader, contracts []a // decorate the object with the slabs for _, resp := range responses { - o.Slabs = append(o.Slabs, resp.slab) + bufferSizeLimitReached = bufferSizeLimitReached || resp.bufferSizeLimitReached + o.Slabs = append(o.Slabs, resp.slabs...) } // calculate the eTag eTag = hr.Hash() - // add partial slabs - if len(partialSlab) > 0 { - var pss []object.SlabSlice - pss, bufferSizeLimitReached, err = mgr.os.AddPartialSlab(ctx, partialSlab, uint8(up.rs.MinShards), uint8(up.rs.TotalShards), up.contractSet) - if err != nil { - return false, "", err - } - o.Slabs = append(o.Slabs, pss...) - } - + // persist the upload if up.multipart { - // persist the part err = mgr.os.AddMultipartPart(ctx, up.bucket, up.path, up.contractSet, eTag, up.uploadID, up.partNumber, o.Slabs) if err != nil { return bufferSizeLimitReached, "", fmt.Errorf("couldn't add multi part: %w", err) } } else { - // persist the object err = mgr.os.AddObject(ctx, up.bucket, up.path, up.contractSet, o, api.AddObjectOptions{MimeType: up.mimeType, ETag: eTag, Metadata: up.metadata}) if err != nil { return bufferSizeLimitReached, "", fmt.Errorf("couldn't add object: %w", err) @@ -669,6 +642,60 @@ func (mgr *uploadManager) UploadPackedSlab(ctx context.Context, rs api.Redundanc return nil } +func (mgr *uploadManager) addPartialSlab(ctx context.Context, mem Memory, rs api.RedundancySettings, data []byte, contractSet string, respIndex int, respChan chan slabUploadResponse) { + // make sure we release the memory + defer mem.Release() + + // create the response + resp := slabUploadResponse{ + index: respIndex, + } + + // create the response + resp.slabs, resp.bufferSizeLimitReached, resp.err = mgr.os.AddPartialSlab(ctx, data, uint8(rs.MinShards), uint8(rs.TotalShards), contractSet) + + // send the response + select { + case <-ctx.Done(): + case respChan <- resp: + } +} + +func (mgr *uploadManager) addSlab(ctx context.Context, mem Memory, rs api.RedundancySettings, upload *upload, data []byte, length, respIndex int, respChan chan slabUploadResponse) { + // make sure we release the memory + defer mem.Release() + + // create the response + resp := slabUploadResponse{ + slabs: []object.SlabSlice{{ + Slab: object.NewSlab(uint8(rs.MinShards)), + Offset: 0, + Length: uint32(length), + }}, + index: respIndex, + } + + // create the shards + shards := make([][]byte, rs.TotalShards) + resp.slabs[0].Slab.Encode(data, shards) + resp.slabs[0].Slab.Encrypt(shards) + + // upload the shards + var uploadSpeed int64 + var overdrivePct float64 + resp.slabs[0].Slab.Shards, uploadSpeed, overdrivePct, resp.err = upload.uploadShards(ctx, shards, mgr.candidates(upload.allowed), mem, mgr.maxOverdrive, mgr.overdriveTimeout) + + // track stats + mgr.statsSlabUploadSpeedBytesPerMS.Track(float64(uploadSpeed)) + mgr.statsOverdrivePct.Track(overdrivePct) + + // send the response + select { + case <-ctx.Done(): + case respChan <- resp: + } +} + func (mgr *uploadManager) candidates(allowed map[types.PublicKey]struct{}) (candidates []*uploader) { mgr.mu.Lock() defer mgr.mu.Unlock() @@ -805,34 +832,6 @@ func (u *upload) newSlabUpload(ctx context.Context, shards [][]byte, uploaders [ }, responseChan } -func (u *upload) uploadSlab(ctx context.Context, rs api.RedundancySettings, data []byte, length, index int, respChan chan slabUploadResponse, candidates []*uploader, mem Memory, maxOverdrive uint64, overdriveTimeout time.Duration) (uploadSpeed int64, overdrivePct float64) { - // create the response - resp := slabUploadResponse{ - slab: object.SlabSlice{ - Slab: object.NewSlab(uint8(rs.MinShards)), - Offset: 0, - Length: uint32(length), - }, - index: index, - } - - // create the shards - shards := make([][]byte, rs.TotalShards) - resp.slab.Slab.Encode(data, shards) - resp.slab.Slab.Encrypt(shards) - - // upload the shards - resp.slab.Slab.Shards, uploadSpeed, overdrivePct, resp.err = u.uploadShards(ctx, shards, candidates, mem, maxOverdrive, overdriveTimeout) - - // send the response - select { - case <-ctx.Done(): - case respChan <- resp: - } - - return -} - func (u *upload) uploadShards(ctx context.Context, shards [][]byte, candidates []*uploader, mem Memory, maxOverdrive uint64, overdriveTimeout time.Duration) (sectors []object.Sector, uploadSpeed int64, overdrivePct float64, err error) { start := time.Now() diff --git a/worker/upload_test.go b/worker/upload_test.go index 29b51d888..a42fa802f 100644 --- a/worker/upload_test.go +++ b/worker/upload_test.go @@ -340,6 +340,61 @@ func TestMigrateShards(t *testing.T) { } } +func TestUploadRegression(t *testing.T) { + // mock worker + w := newMockWorker(testRedundancySettings.TotalShards * 2) + + // convenience variables + ul := w.ul + dl := w.dl + mm := w.mm + os := w.os + + // create test data + data := make([]byte, 128) + if _, err := frand.Read(data); err != nil { + t.Fatal(err) + } + + // create upload params + params := testParameters(t.Name()) + + // make sure the memory manager blocks + mm.memBlockChan = make(chan struct{}) + + // upload data + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + _, _, err := ul.Upload(ctx, bytes.NewReader(data), w.contracts.values(), params, lockingPriorityUpload) + if !errors.Is(err, errUploadInterrupted) { + t.Fatal(err) + } + + // unblock the memory manager + close(mm.memBlockChan) + + // upload data + _, _, err = ul.Upload(context.Background(), bytes.NewReader(data), w.contracts.values(), params, lockingPriorityUpload) + if err != nil { + t.Fatal(err) + } + + // grab the object + o, err := os.Object(context.Background(), testBucket, t.Name(), api.GetObjectOptions{}) + if err != nil { + t.Fatal(err) + } + + // download data for good measure + var buf bytes.Buffer + err = dl.DownloadObject(context.Background(), &buf, o.Object.Object, 0, uint64(o.Object.Size), w.contracts.values()) + if err != nil { + t.Fatal(err) + } else if !bytes.Equal(data, buf.Bytes()) { + t.Fatal("data mismatch", data, buf.Bytes()) + } +} + func testParameters(path string) uploadParameters { return uploadParameters{ bucket: testBucket, From baf644ef9e153487e216c7b9a3952356af9b0143 Mon Sep 17 00:00:00 2001 From: PJ Date: Tue, 23 Jan 2024 10:43:50 +0100 Subject: [PATCH 4/4] worker: revert changes to upload --- worker/upload.go | 154 ++++++++++++++++++++++++----------------------- 1 file changed, 78 insertions(+), 76 deletions(-) diff --git a/worker/upload.go b/worker/upload.go index 51006c403..aebd341a4 100644 --- a/worker/upload.go +++ b/worker/upload.go @@ -104,10 +104,9 @@ type ( } slabUploadResponse struct { - slabs []object.SlabSlice - bufferSizeLimitReached bool - index int - err error + slab object.SlabSlice + index int + err error } sectorUpload struct { @@ -496,16 +495,17 @@ func (mgr *uploadManager) Upload(ctx context.Context, r io.Reader, contracts []a // create the response channel respChan := make(chan slabUploadResponse) - // channel to notify main thread of the number of responses to wait for - numResponsesChan := make(chan int, 1) + // channel to notify main thread of the number of slabs to wait for + numSlabsChan := make(chan int, 1) // prepare slab size size := int64(up.rs.MinShards) * rhpv2.SectorSize redundantSize := uint64(up.rs.TotalShards) * rhpv2.SectorSize + var partialSlab []byte // launch uploads in a separate goroutine go func() { - var respIndex int + var slabIndex int for { select { case <-mgr.shutdownCtx.Done(): @@ -514,7 +514,6 @@ func (mgr *uploadManager) Upload(ctx context.Context, r io.Reader, contracts []a return // interrupted default: } - // acquire memory mem := mgr.mm.AcquireMemory(ctx, redundantSize) if mem == nil { @@ -527,10 +526,15 @@ func (mgr *uploadManager) Upload(ctx context.Context, r io.Reader, contracts []a if err == io.EOF { mem.Release() - // we know for sure how many responses the main thread should expect - numResponsesChan <- respIndex + // no more data to upload, notify main thread of the number of + // slabs to wait for + numSlabs := slabIndex + if partialSlab != nil && slabIndex > 0 { + numSlabs-- // don't wait on partial slab + } + numSlabsChan <- numSlabs return - } else if err != nil && !errors.Is(err, io.ErrUnexpectedEOF) { + } else if err != nil && err != io.ErrUnexpectedEOF { mem.Release() // unexpected error, notify main thread @@ -539,26 +543,40 @@ func (mgr *uploadManager) Upload(ctx context.Context, r io.Reader, contracts []a case <-ctx.Done(): } return - } else if errors.Is(err, io.ErrUnexpectedEOF) && up.packing { - go mgr.addPartialSlab(ctx, mem, up.rs, data[:length], up.contractSet, respIndex, respChan) - respIndex++ + } else if up.packing && errors.Is(err, io.ErrUnexpectedEOF) { + mem.Release() + + // uploadPacking is true, we return the partial slab without + // uploading. + partialSlab = data[:length] } else { - go mgr.addSlab(ctx, mem, up.rs, upload, data, length, respIndex, respChan) - respIndex++ + // regular upload + go func(rs api.RedundancySettings, data []byte, length, slabIndex int) { + uploadSpeed, overdrivePct := upload.uploadSlab(ctx, rs, data, length, slabIndex, respChan, mgr.candidates(upload.allowed), mem, mgr.maxOverdrive, mgr.overdriveTimeout) + + // track stats + mgr.statsSlabUploadSpeedBytesPerMS.Track(float64(uploadSpeed)) + mgr.statsOverdrivePct.Track(overdrivePct) + + // release memory + mem.Release() + }(up.rs, data, length, slabIndex) } + + slabIndex++ } }() // collect responses var responses []slabUploadResponse - numResponses := math.MaxInt32 - for len(responses) < numResponses { + numSlabs := math.MaxInt32 + for len(responses) < numSlabs { select { case <-mgr.shutdownCtx.Done(): return false, "", errWorkerShutDown case <-ctx.Done(): return false, "", errUploadInterrupted - case numResponses = <-numResponsesChan: + case numSlabs = <-numSlabsChan: case res := <-respChan: if res.err != nil { return false, "", res.err @@ -574,20 +592,30 @@ func (mgr *uploadManager) Upload(ctx context.Context, r io.Reader, contracts []a // decorate the object with the slabs for _, resp := range responses { - bufferSizeLimitReached = bufferSizeLimitReached || resp.bufferSizeLimitReached - o.Slabs = append(o.Slabs, resp.slabs...) + o.Slabs = append(o.Slabs, resp.slab) } // calculate the eTag eTag = hr.Hash() - // persist the upload + // add partial slabs + if len(partialSlab) > 0 { + var pss []object.SlabSlice + pss, bufferSizeLimitReached, err = mgr.os.AddPartialSlab(ctx, partialSlab, uint8(up.rs.MinShards), uint8(up.rs.TotalShards), up.contractSet) + if err != nil { + return false, "", err + } + o.Slabs = append(o.Slabs, pss...) + } + if up.multipart { + // persist the part err = mgr.os.AddMultipartPart(ctx, up.bucket, up.path, up.contractSet, eTag, up.uploadID, up.partNumber, o.Slabs) if err != nil { return bufferSizeLimitReached, "", fmt.Errorf("couldn't add multi part: %w", err) } } else { + // persist the object err = mgr.os.AddObject(ctx, up.bucket, up.path, up.contractSet, o, api.AddObjectOptions{MimeType: up.mimeType, ETag: eTag, Metadata: up.metadata}) if err != nil { return bufferSizeLimitReached, "", fmt.Errorf("couldn't add object: %w", err) @@ -645,60 +673,6 @@ func (mgr *uploadManager) UploadPackedSlab(ctx context.Context, rs api.Redundanc return nil } -func (mgr *uploadManager) addPartialSlab(ctx context.Context, mem Memory, rs api.RedundancySettings, data []byte, contractSet string, respIndex int, respChan chan slabUploadResponse) { - // make sure we release the memory - defer mem.Release() - - // create the response - resp := slabUploadResponse{ - index: respIndex, - } - - // create the response - resp.slabs, resp.bufferSizeLimitReached, resp.err = mgr.os.AddPartialSlab(ctx, data, uint8(rs.MinShards), uint8(rs.TotalShards), contractSet) - - // send the response - select { - case <-ctx.Done(): - case respChan <- resp: - } -} - -func (mgr *uploadManager) addSlab(ctx context.Context, mem Memory, rs api.RedundancySettings, upload *upload, data []byte, length, respIndex int, respChan chan slabUploadResponse) { - // make sure we release the memory - defer mem.Release() - - // create the response - resp := slabUploadResponse{ - slabs: []object.SlabSlice{{ - Slab: object.NewSlab(uint8(rs.MinShards)), - Offset: 0, - Length: uint32(length), - }}, - index: respIndex, - } - - // create the shards - shards := make([][]byte, rs.TotalShards) - resp.slabs[0].Slab.Encode(data, shards) - resp.slabs[0].Slab.Encrypt(shards) - - // upload the shards - var uploadSpeed int64 - var overdrivePct float64 - resp.slabs[0].Slab.Shards, uploadSpeed, overdrivePct, resp.err = upload.uploadShards(ctx, shards, mgr.candidates(upload.allowed), mem, mgr.maxOverdrive, mgr.overdriveTimeout) - - // track stats - mgr.statsSlabUploadSpeedBytesPerMS.Track(float64(uploadSpeed)) - mgr.statsOverdrivePct.Track(overdrivePct) - - // send the response - select { - case <-ctx.Done(): - case respChan <- resp: - } -} - func (mgr *uploadManager) candidates(allowed map[types.PublicKey]struct{}) (candidates []*uploader) { mgr.mu.Lock() defer mgr.mu.Unlock() @@ -833,6 +807,34 @@ func (u *upload) newSlabUpload(ctx context.Context, shards [][]byte, uploaders [ }, responseChan } +func (u *upload) uploadSlab(ctx context.Context, rs api.RedundancySettings, data []byte, length, index int, respChan chan slabUploadResponse, candidates []*uploader, mem Memory, maxOverdrive uint64, overdriveTimeout time.Duration) (uploadSpeed int64, overdrivePct float64) { + // create the response + resp := slabUploadResponse{ + slab: object.SlabSlice{ + Slab: object.NewSlab(uint8(rs.MinShards)), + Offset: 0, + Length: uint32(length), + }, + index: index, + } + + // create the shards + shards := make([][]byte, rs.TotalShards) + resp.slab.Slab.Encode(data, shards) + resp.slab.Slab.Encrypt(shards) + + // upload the shards + resp.slab.Slab.Shards, uploadSpeed, overdrivePct, resp.err = u.uploadShards(ctx, shards, candidates, mem, maxOverdrive, overdriveTimeout) + + // send the response + select { + case <-ctx.Done(): + case respChan <- resp: + } + + return +} + func (u *upload) uploadShards(ctx context.Context, shards [][]byte, candidates []*uploader, mem Memory, maxOverdrive uint64, overdriveTimeout time.Duration) (sectors []object.Sector, uploadSpeed int64, overdrivePct float64, err error) { start := time.Now()