Skip to content

Commit

Permalink
worker: fix TestUploadPackedSlab NDF
Browse files Browse the repository at this point in the history
  • Loading branch information
peterjan committed Mar 5, 2024
1 parent 85141d3 commit e09f7ca
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 75 deletions.
12 changes: 8 additions & 4 deletions internal/test/tt.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,12 +90,16 @@ func (t impl) OKAll(vs ...interface{}) {

func (t impl) Retry(tries int, durationBetweenAttempts time.Duration, fn func() error) {
t.Helper()
for i := 1; i < tries; i++ {
err := fn()
t.OK(Retry(tries, durationBetweenAttempts, fn))
}

func Retry(tries int, durationBetweenAttempts time.Duration, fn func() error) (err error) {
for i := 0; i < tries; i++ {
err = fn()
if err == nil {
return
return nil
}
time.Sleep(durationBetweenAttempts)
}
t.OK(fn())
return
}
23 changes: 6 additions & 17 deletions worker/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ func (w *worker) upload(ctx context.Context, r io.Reader, contracts []api.Contra
// upload packed slab
if len(packedSlabs) > 0 {
if err := w.tryUploadPackedSlab(ctx, mem, packedSlabs[0], up.rs, up.contractSet, lockingPriorityBlockedUpload); err != nil {
w.logger.Errorf("couldn't upload packed slabs, err: %v", err)
w.logger.Error(err)
}
}
}
Expand Down Expand Up @@ -227,10 +227,6 @@ func (w *worker) threadedUploadPackedSlabs(rs api.RedundancySettings, contractSe
w.uploadsMu.Unlock()
}()

// upload packed slabs
var mu sync.Mutex
var errs error

// derive a context that we can use as an interrupt in case of an error or shutdown.
interruptCtx, interruptCancel := context.WithCancel(w.shutdownCtx)
defer interruptCancel()
Expand All @@ -246,9 +242,9 @@ func (w *worker) threadedUploadPackedSlabs(rs api.RedundancySettings, contractSe
// fetch packed slab to upload
packedSlabs, err := w.bus.PackedSlabsForUpload(interruptCtx, defaultPackedSlabsLockDuration, uint8(rs.MinShards), uint8(rs.TotalShards), contractSet, 1)
if err != nil {
mu.Lock()
errs = errors.Join(errs, fmt.Errorf("couldn't fetch packed slabs from bus: %v", err))
mu.Unlock()
w.logger.Errorf("couldn't fetch packed slabs from bus: %v", err)
mem.Release()
break
}

// no more packed slabs to upload
Expand All @@ -270,21 +266,14 @@ func (w *worker) threadedUploadPackedSlabs(rs api.RedundancySettings, contractSe

// try to upload a packed slab, if there were no packed slabs left to upload ok is false
if err := w.tryUploadPackedSlab(ctx, mem, ps, rs, contractSet, lockPriority); err != nil {
mu.Lock()
errs = errors.Join(errs, err)
mu.Unlock()
w.logger.Error(err)
interruptCancel() // prevent new uploads from being launched
}
}(packedSlabs[0])
}

// wait for all threads to finish
wg.Wait()

// log errors
if err := errors.Join(errs); err != nil {
w.logger.Errorf("couldn't upload packed slabs, err: %v", err)
}
return
}

Expand Down Expand Up @@ -890,7 +879,7 @@ loop:
for slab.numInflight > 0 && !done {
select {
case <-u.shutdownCtx.Done():
return nil, 0, 0, errors.New("upload stopped")
return nil, 0, 0, ErrShuttingDown
case <-ctx.Done():
return nil, 0, 0, ctx.Err()
case resp := <-respChan:
Expand Down
99 changes: 45 additions & 54 deletions worker/upload_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,14 @@ import (
"bytes"
"context"
"errors"
"math"
"fmt"
"testing"
"time"

rhpv2 "go.sia.tech/core/rhp/v2"
"go.sia.tech/core/types"
"go.sia.tech/renterd/api"
"go.sia.tech/renterd/internal/test"
"go.sia.tech/renterd/object"
"lukechampine.com/frand"
)
Expand Down Expand Up @@ -128,7 +129,7 @@ func TestUploadPackedSlab(t *testing.T) {
w := newTestWorker(t)

// add hosts to worker
w.AddHosts(testRedundancySettings.TotalShards * 2)
w.AddHosts(testRedundancySettings.TotalShards)

// convenience variables
os := w.os
Expand All @@ -140,9 +141,6 @@ func TestUploadPackedSlab(t *testing.T) {
params := testParameters(t.Name())
params.packing = true

// block aysnc packed slab uploads
w.BlockAsyncPackedSlabUploads(params)

// create test data
data := frand.Bytes(128)

Expand Down Expand Up @@ -208,67 +206,60 @@ func TestUploadPackedSlab(t *testing.T) {
t.Fatal("data mismatch")
}

// configure max buffer size
os.setSlabBufferMaxSizeSoft(128)

// upload 2x64 bytes using the worker
params.path = t.Name() + "2"
_, err = w.upload(context.Background(), bytes.NewReader(frand.Bytes(64)), w.Contracts(), params)
if err != nil {
t.Fatal(err)
}
params.path = t.Name() + "3"
_, err = w.upload(context.Background(), bytes.NewReader(frand.Bytes(64)), w.Contracts(), params)
if err != nil {
t.Fatal(err)
// define a helper that counts packed slabs
packedSlabsCount := func() int {
t.Helper()
os.mu.Lock()
cnt := len(os.partials)
os.mu.Unlock()
return cnt
}

// assert we still have two packed slabs (buffer limit not reached)
pss, err = os.PackedSlabsForUpload(context.Background(), 0, uint8(params.rs.MinShards), uint8(params.rs.TotalShards), testContractSet, math.MaxInt)
if err != nil {
t.Fatal(err)
} else if len(pss) != 2 {
t.Fatal("expected 2 packed slab")
// define a helper that uploads data using the worker
var c int
uploadBytes := func(n int) {
t.Helper()
params.path = fmt.Sprintf("%s_%d", t.Name(), c)
_, err := w.upload(context.Background(), bytes.NewReader(frand.Bytes(n)), w.Contracts(), params)
if err != nil {
t.Fatal(err)
}
c++
}

// upload one more byte (buffer limit reached)
params.path = t.Name() + "4"
_, err = w.upload(context.Background(), bytes.NewReader(frand.Bytes(1)), w.Contracts(), params)
if err != nil {
t.Fatal(err)
}
// block aysnc packed slab uploads
w.BlockAsyncPackedSlabUploads(params)

// assert we still have two packed slabs (one got uploaded synchronously)
pss, err = os.PackedSlabsForUpload(context.Background(), 0, uint8(params.rs.MinShards), uint8(params.rs.TotalShards), testContractSet, math.MaxInt)
if err != nil {
t.Fatal(err)
} else if len(pss) != 2 {
t.Fatal("expected 2 packed slab")
// configure max buffer size
os.setSlabBufferMaxSizeSoft(128)

// upload 2x64 bytes using the worker and assert we still have two packed
// slabs (buffer limit not reached)
uploadBytes(64)
uploadBytes(64)
if packedSlabsCount() != 2 {
t.Fatal("expected 2 packed slabs")
}

// allow some time for the background thread to realise we blocked async
// packed slab uploads
time.Sleep(time.Second)
// upload one more byte and assert we still have two packed slabs (one got
// uploaded synchronously because buffer limit was reached)
uploadBytes(1)
if packedSlabsCount() != 2 {
t.Fatal("expected 2 packed slabs")
}

// unblock asynchronous uploads
w.UnblockAsyncPackedSlabUploads(params)
uploadBytes(129) // ensure background thread is running

// upload 1 byte using the worker
params.path = t.Name() + "5"
_, err = w.upload(context.Background(), bytes.NewReader(frand.Bytes(129)), w.Contracts(), params)
if err != nil {
t.Fatal(err)
}

// allow some time for the thread to pick up the packed slabs
time.Sleep(time.Second)

// assert we uploaded all packed slabs
pss, err = os.PackedSlabsForUpload(context.Background(), 0, uint8(params.rs.MinShards), uint8(params.rs.TotalShards), testContractSet, 1)
if err != nil {
// assert packed slabs get uploaded asynchronously
if err := test.Retry(100, 100*time.Millisecond, func() error {
if packedSlabsCount() != 0 {
return errors.New("expected 0 packed slabs")
}
return nil
}); err != nil {
t.Fatal(err)
} else if len(pss) != 0 {
t.Fatal("expected 0 packed slab")
}
}

Expand Down

0 comments on commit e09f7ca

Please sign in to comment.