Skip to content

Commit

Permalink
First pass at adapting graphsync to http
Browse files Browse the repository at this point in the history
  • Loading branch information
willscott authored and rvagg committed Apr 28, 2023
1 parent b6ac6bf commit c22d369
Show file tree
Hide file tree
Showing 5 changed files with 270 additions and 3 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ require (
github.com/libp2p/go-libp2p-testing v0.12.0
github.com/mitchellh/go-server-timing v1.0.1
github.com/multiformats/go-multiaddr v0.8.0
github.com/multiformats/go-multicodec v0.8.1
github.com/multiformats/go-multicodec v0.8.2-0.20230419141826-bd7ef45ca89b
github.com/multiformats/go-multihash v0.2.1
github.com/prometheus/client_golang v1.14.0
github.com/stretchr/testify v1.8.2
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -571,8 +571,8 @@ github.com/multiformats/go-multibase v0.0.1/go.mod h1:bja2MqRZ3ggyXtZSEDKpl0uO/g
github.com/multiformats/go-multibase v0.0.3/go.mod h1:5+1R4eQrT3PkYZ24C3W2Ue2tPwIdYQD509ZjSb5y9Oc=
github.com/multiformats/go-multibase v0.1.1 h1:3ASCDsuLX8+j4kx58qnJ4YFq/JWTJpCyDW27ztsVTOI=
github.com/multiformats/go-multibase v0.1.1/go.mod h1:ZEjHE+IsUrgp5mhlEAYjMtZwK1k4haNkcaPg9aoe1a8=
github.com/multiformats/go-multicodec v0.8.1 h1:ycepHwavHafh3grIbR1jIXnKCsFm0fqsfEOsJ8NtKE8=
github.com/multiformats/go-multicodec v0.8.1/go.mod h1:L3QTQvMIaVBkXOXXtVmYE+LI16i14xuaojr/H7Ai54k=
github.com/multiformats/go-multicodec v0.8.2-0.20230419141826-bd7ef45ca89b h1:MEoPnsX6gSJh8Esy/sYKIkLlToWF0XWtYBlsr/lDG9U=
github.com/multiformats/go-multicodec v0.8.2-0.20230419141826-bd7ef45ca89b/go.mod h1:L3QTQvMIaVBkXOXXtVmYE+LI16i14xuaojr/H7Ai54k=
github.com/multiformats/go-multihash v0.0.1/go.mod h1:w/5tugSrLEbWqlcgJabL3oHFKTwfvkofsjW2Qa1ct4U=
github.com/multiformats/go-multihash v0.0.8/go.mod h1:YSLudS+Pi8NHE7o6tb3D8vrpKa63epEDmG8nTduyAew=
github.com/multiformats/go-multihash v0.0.9/go.mod h1:YSLudS+Pi8NHE7o6tb3D8vrpKa63epEDmG8nTduyAew=
Expand Down
3 changes: 3 additions & 0 deletions pkg/lassie/lassie.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package lassie

import (
"context"
"net/http"
"time"

"github.com/filecoin-project/lassie/pkg/indexerlookup"
Expand Down Expand Up @@ -107,6 +108,8 @@ func NewLassieWithConfig(ctx context.Context, cfg *LassieConfig) (*Lassie, error
TempDir: cfg.TempDir,
Concurrency: cfg.BitswapConcurrency,
})
case multicodec.TransportIpfsGatewayHttp:
protocolRetrievers[protocol] = retriever.NewHTTPRetriever(session.GetStorageProviderTimeout, *http.DefaultClient)
}
}

Expand Down
260 changes: 260 additions & 0 deletions pkg/retriever/httpretriever.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,260 @@
package retriever

import (
"context"
"errors"
"fmt"
"io"
"net/http"
"net/url"
"sync"
"time"

"github.com/benbjohnson/clock"
"github.com/filecoin-project/lassie/pkg/events"
"github.com/filecoin-project/lassie/pkg/types"
"github.com/ipld/go-car/v2"
"github.com/ipld/go-ipld-prime"
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
"github.com/ipni/go-libipni/maurl"
"go.uber.org/multierr"
)

type HTTPRetriever struct {
Client http.Client
Clock clock.Clock
}

func NewHTTPRetriever(getStorageProviderTimeout GetStorageProviderTimeout, client http.Client) *HTTPRetriever {
return &HTTPRetriever{
Client: client,
}
}

type httpRetrieval struct {
*HTTPRetriever
ctx context.Context
request types.RetrievalRequest
eventChan chan retrievalResult
eventsCallback func(types.RetrievalEvent)
}

func (cfg *HTTPRetriever) Retrieve(
ctx context.Context,
retrievalRequest types.RetrievalRequest,
eventsCallback func(types.RetrievalEvent),
) types.CandidateRetrieval {

if cfg == nil {
cfg = &HTTPRetriever{}
}
if eventsCallback == nil {
eventsCallback = func(re types.RetrievalEvent) {}
}

// state local to this CID's retrieval
return &httpRetrieval{
HTTPRetriever: cfg,
ctx: ctx,
request: retrievalRequest,
eventChan: make(chan retrievalResult),
eventsCallback: eventsCallback,
}
}

func (r *httpRetrieval) RetrieveFromAsyncCandidates(asyncCandidates types.InboundAsyncCandidates) (*types.RetrievalStats, error) {
ctx, cancelCtx := context.WithCancel(r.ctx)

// start retrievals
phaseStartTime := r.Clock.Now()
var waitGroup sync.WaitGroup
waitGroup.Add(1)
go func() {
defer waitGroup.Done()
for {
hasCandidates, candidates, err := asyncCandidates.Next(ctx)
if !hasCandidates || err != nil {
return
}
for _, candidate := range candidates {
// Start the retrieval with the candidate
waitGroup.Add(1)
localCandidate := candidate
go func() {
defer waitGroup.Done()
r.doHTTPDownload(ctx, phaseStartTime, localCandidate)
}()
}
}
}()

finishAll := make(chan struct{}, 1)
go func() {
waitGroup.Wait()
close(r.eventChan)
finishAll <- struct{}{}
}()

stats, err := r.collectHTTPResults(ctx)
cancelCtx()
// optimistically try to wait for all routines to finish
select {
case <-finishAll:
case <-time.After(100 * time.Millisecond):
log.Warn("Unable to successfully cancel all retrieval attempts withing 100ms")
}
return stats, err
}

func (r *httpRetrieval) collectHTTPResults(ctx context.Context) (*types.RetrievalStats, error) {
var retrievalErrors error
for {
select {
case result, ok := <-r.eventChan:
// have we got all responses but no success?
if !ok {
// we failed, and got only retrieval errors
retrievalErrors = multierr.Append(retrievalErrors, ErrAllRetrievalsFailed)
return nil, retrievalErrors
}

if result.Event != nil {
r.eventsCallback(*result.Event)
break
}
if result.Err != nil {
retrievalErrors = multierr.Append(retrievalErrors, result.Err)
}
if result.Stats != nil {
return result.Stats, nil
}
case <-ctx.Done():
return nil, context.Canceled
}
}
}

func (r *httpRetrieval) doHTTPDownload(
ctx context.Context,
phaseStartTime time.Time,
candidate types.RetrievalCandidate,
) {
var stats *types.RetrievalStats
var retrievalErr error

r.sendEvent(events.Started(r.HTTPRetriever.Clock.Now(), r.request.RetrievalID, phaseStartTime, types.RetrievalPhase, candidate))

var candidateURL *url.URL
var err error
for _, addr := range candidate.MinerPeer.Addrs {
candidateURL, err = maurl.ToURL(addr)
if err == nil {
break
}
}
if err != nil {
log.Warnf("Couldn't construct a url for miner %s: %v", candidate.MinerPeer.ID, err)
retrievalErr = fmt.Errorf("%w: %v", ErrConnectFailed, err)
r.sendEvent(events.Failed(r.HTTPRetriever.Clock.Now(), r.request.RetrievalID, phaseStartTime, types.RetrievalPhase, candidate, retrievalErr.Error()))
return
}

reqURL := fmt.Sprintf("%s/ipfs/%s%s", candidateURL, r.request.Cid, r.request.Path)
req, err := http.NewRequestWithContext(ctx, "GET", reqURL, nil)
if err != nil {
log.Warnf("Couldn't construct a http request %s: %v", candidate.MinerPeer.ID, err)
retrievalErr = fmt.Errorf("%w: %v", ErrConnectFailed, err)
r.sendEvent(events.Failed(r.HTTPRetriever.Clock.Now(), r.request.RetrievalID, phaseStartTime, types.RetrievalPhase, candidate, retrievalErr.Error()))
return
}
if r.request.Scope == types.CarScopeBlock {
req.Header.Add("Accept", "application/vnd.ipld.block")
} else {
req.Header.Add("Accept", "application/vnd.ipld.car")
}

resp, err := r.Client.Do(req)

if err != nil {
if ctx.Err() == nil { // not cancelled, maybe timed out though
log.Warnf("Failed to connect to http %s: %v", candidate.MinerPeer.ID, err)
retrievalErr = fmt.Errorf("%w: %v", ErrConnectFailed, err)
r.sendEvent(events.Failed(r.HTTPRetriever.Clock.Now(), r.request.RetrievalID, phaseStartTime, types.RetrievalPhase, candidate, retrievalErr.Error()))
}
} else {
r.sendEvent(events.Connected(r.HTTPRetriever.Clock.Now(), r.request.RetrievalID, phaseStartTime, types.RetrievalPhase, candidate))

if r.canSendResult() { // move on to retrieval phase
r.sendEvent(events.FirstByte(r.HTTPRetriever.Clock.Now(), r.request.RetrievalID, phaseStartTime, candidate))
}
}
defer resp.Body.Close()

cbr, err := car.NewBlockReader(resp.Body)
if err != nil {
r.sendEvent(events.Failed(r.HTTPRetriever.Clock.Now(), r.request.RetrievalID, phaseStartTime, types.RetrievalPhase, candidate, retrievalErr.Error()))
return
}

for {
blk, err := cbr.Next()
if err != nil {
if errors.Is(err, io.EOF) {
break
}
r.sendEvent(events.Failed(r.HTTPRetriever.Clock.Now(), r.request.RetrievalID, phaseStartTime, types.RetrievalPhase, candidate, retrievalErr.Error()))
return
}
w, d, err := r.request.LinkSystem.StorageWriteOpener(ipld.LinkContext{})
if err != nil {
r.sendEvent(events.Failed(r.HTTPRetriever.Clock.Now(), r.request.RetrievalID, phaseStartTime, types.RetrievalPhase, candidate, retrievalErr.Error()))
return
}
_, err = w.Write(blk.RawData())
if err != nil {
r.sendEvent(events.Failed(r.HTTPRetriever.Clock.Now(), r.request.RetrievalID, phaseStartTime, types.RetrievalPhase, candidate, retrievalErr.Error()))
return
}
err = d(cidlink.Link{Cid: blk.Cid()})
if err != nil {
r.sendEvent(events.Failed(r.HTTPRetriever.Clock.Now(), r.request.RetrievalID, phaseStartTime, types.RetrievalPhase, candidate, retrievalErr.Error()))
return
}
}

r.sendEvent(events.Success(
r.HTTPRetriever.Clock.Now(),
r.request.RetrievalID,
phaseStartTime,
candidate,
stats.Size,
stats.Blocks,
stats.Duration,
stats.TotalPayment,
0,
),
)

if r.canSendResult() {
r.sendResult(retrievalResult{PhaseStart: phaseStartTime, PeerID: candidate.MinerPeer.ID, Stats: stats})
}
}

// sendResult will only send a result to the parent goroutine if a retrieval has
// finished (likely by a success), otherwise it will send the result
func (retrieval *httpRetrieval) sendResult(result retrievalResult) bool {
select {
case <-retrieval.ctx.Done():
return false
case retrieval.eventChan <- result:
}
return true
}

func (retrieval *httpRetrieval) sendEvent(event types.RetrievalEvent) {
retrieval.sendResult(retrievalResult{PeerID: event.StorageProviderId(), Event: &event})
}

func (retrieval *httpRetrieval) canSendResult() bool {
return retrieval.ctx.Err() != nil
}
4 changes: 4 additions & 0 deletions pkg/types/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ type RetrievalRequest struct {
Cid cid.Cid
LinkSystem ipld.LinkSystem
Selector ipld.Node
Path string
Scope CarScope
Protocols []multicodec.Code
PreloadLinkSystem ipld.LinkSystem
MaxBlocks uint64
Expand Down Expand Up @@ -97,6 +99,8 @@ func NewRequestForPath(store ipldstorage.WritableStorage, cid cid.Cid, path stri
RetrievalID: retrievalId,
Cid: cid,
Selector: selector,
Path: fmt.Sprintf("%s?car-scope=%s", path, carScope),
Scope: carScope,
LinkSystem: linkSystem,
}, nil
}
Expand Down

0 comments on commit c22d369

Please sign in to comment.