Skip to content

Commit

Permalink
Merge pull request #954 from SiaFoundation/chris/uploader-downloader-…
Browse files Browse the repository at this point in the history
…stopped

Fail upload/download request if uploader/downloader was already stopped
  • Loading branch information
ChrisSchinnerl authored Feb 14, 2024
2 parents 7ac03a4 + 645f93a commit ba41276
Show file tree
Hide file tree
Showing 4 changed files with 100 additions and 5 deletions.
21 changes: 18 additions & 3 deletions worker/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,12 @@ const (
maxConcurrentSectorsPerHost = 3
)

var (
errDownloaderStopped = errors.New("downloader was stopped")
)

type (
downloader struct {
hk types.PublicKey
host Host

statsDownloadSpeedBytesPerMS *stats.DataPoints // keep track of this separately for stats (no decay is applied)
Expand All @@ -33,6 +36,7 @@ type (
consecutiveFailures uint64
numDownloads uint64
queue []*sectorDownloadReq
stopped bool
}
)

Expand All @@ -55,13 +59,17 @@ func (d *downloader) PublicKey() types.PublicKey {
}

func (d *downloader) Stop() {
d.mu.Lock()
d.stopped = true
d.mu.Unlock()

for {
download := d.pop()
if download == nil {
break
}
if !download.done() {
download.fail(errors.New("downloader stopped"))
download.fail(errDownloaderStopped)
}
}
}
Expand All @@ -80,8 +88,15 @@ func (d *downloader) fillBatch() (batch []*sectorDownloadReq) {
}

func (d *downloader) enqueue(download *sectorDownloadReq) {
// enqueue the job
d.mu.Lock()
// check for stopped
if d.stopped {
d.mu.Unlock()
go download.fail(errDownloaderStopped) // don't block the caller
return
}

// enqueue the job
d.queue = append(d.queue, download)
d.mu.Unlock()

Expand Down
32 changes: 32 additions & 0 deletions worker/downloader_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package worker

import (
"errors"
"testing"
"time"
)

func TestDownloaderStopped(t *testing.T) {
w := newMockWorker()
h := w.addHost()
w.dl.refreshDownloaders(w.contracts())

dl := w.dl.downloaders[h.PublicKey()]
dl.Stop()

req := sectorDownloadReq{
resps: &sectorResponses{
c: make(chan struct{}),
},
}
dl.enqueue(&req)

select {
case <-req.resps.c:
if err := req.resps.responses[0].err; !errors.Is(err, errDownloaderStopped) {
t.Fatal("unexpected error response", err)
}
case <-time.After(10 * time.Second):
t.Fatal("no response")
}
}
20 changes: 18 additions & 2 deletions worker/uploader.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ const (
sectorUploadTimeout = 60 * time.Second
)

var (
errUploaderStopped = errors.New("uploader was stopped")
)

type (
uploader struct {
os ObjectStore
Expand All @@ -36,6 +40,7 @@ type (
fcid types.FileContractID
host Host
queue []*sectorUploadReq
stopped bool

// stats related field
consecutiveFailures uint64
Expand Down Expand Up @@ -136,6 +141,10 @@ outer:
}

func (u *uploader) Stop(err error) {
u.mu.Lock()
u.stopped = true
u.mu.Unlock()

for {
upload := u.pop()
if upload == nil {
Expand All @@ -148,12 +157,19 @@ func (u *uploader) Stop(err error) {
}

func (u *uploader) enqueue(req *sectorUploadReq) {
u.mu.Lock()
// check for stopped
if u.stopped {
u.mu.Unlock()
go req.fail(errUploaderStopped) // don't block the caller
return
}

// decorate the request
req.fcid = u.ContractID()
req.fcid = u.fcid
req.hk = u.hk

// enqueue the request
u.mu.Lock()
u.queue = append(u.queue, req)
u.mu.Unlock()

Expand Down
32 changes: 32 additions & 0 deletions worker/uploader_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package worker

import (
"context"
"errors"
"testing"
"time"
)

func TestUploaderStopped(t *testing.T) {
w := newMockWorker()
w.addHost()
w.ul.refreshUploaders(w.contracts(), 1)

ul := w.ul.uploaders[0]
ul.Stop(errors.New("test"))

req := sectorUploadReq{
responseChan: make(chan sectorUploadResp),
sector: &sectorUpload{ctx: context.Background()},
}
ul.enqueue(&req)

select {
case res := <-req.responseChan:
if !errors.Is(res.err, errUploaderStopped) {
t.Fatal("expected error response")
}
case <-time.After(10 * time.Second):
t.Fatal("no response")
}
}

0 comments on commit ba41276

Please sign in to comment.