Skip to content
This repository has been archived by the owner on Mar 28, 2023. It is now read-only.

feat: Call provide endpoint in batches. #64

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 30 additions & 9 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,23 +31,44 @@ type Client struct {
client proto.DelegatedRouting_Client
validator record.Validator

provider *Provider
identity crypto.PrivKey
provider *Provider
providerBatchSize int
identity crypto.PrivKey
}

type Config struct {
Provider *Provider
Identity crypto.PrivKey
ProvideBatchSize int
}

// Validate validates the current config, and also sets defaults values when needed.
func (c *Config) Validate() error {
if c.Provider != nil && !c.Provider.Peer.ID.MatchesPublicKey(c.Identity.GetPublic()) {
return errors.New("identity does not match provider")
}

if c.ProvideBatchSize == 0 {
c.ProvideBatchSize = 30000 // this will generate payloads of ~1MB in size
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit (1): this comes with various assumptions about average size of a single item, which will quickly get outdated when we have WebTransport enabled by default (/webtransport multiaddrs with two /certhash segments will baloon the size of the batch beyond initial estimate), or add more transports in the future.

In other places, such as UnixFS autosharding, we've moved away from ballpark counting items assuming they are of some arbitrary average size, and switched to calculating the total size of the final block.

Thoughts on swithcing to byte size, or having two limits? ProvideBatchSize (current one) and ProvideBatchByteSize (new one)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit (1): this comes with various assumptions about average size of a single item, which will quickly get outdated when we have WebTransport enabled by default (/webtransport multiaddrs with two /certhash segments will baloon the size of the batch beyond initial estimate), or add more transports in the future.

Then we need to add a specific limit on the amount of multiaddrs allowed. But that is not the problem right now.

Checking the raw byte size of the payload will make the code hard to read and understand. In my opinion, is not worth it (have a payload of ~2Mb instead of ~900Kb because we added more multiaddresses)

}

return nil
}

var _ DelegatedRoutingClient = (*Client)(nil)

// NewClient creates a client.
// The Provider and identity parameters are option. If they are nil, the `Provide` method will not function.
func NewClient(c proto.DelegatedRouting_Client, p *Provider, identity crypto.PrivKey) (*Client, error) {
if p != nil && !p.Peer.ID.MatchesPublicKey(identity.GetPublic()) {
return nil, errors.New("identity does not match provider")
func NewClient(c proto.DelegatedRouting_Client, cfg *Config) (*Client, error) {
if err := cfg.Validate(); err != nil {
return nil, err
}

return &Client{
client: c,
validator: ipns.Validator{},
provider: p,
identity: identity,
client: c,
validator: ipns.Validator{},
provider: cfg.Provider,
providerBatchSize: cfg.ProvideBatchSize,
identity: cfg.Identity,
}, nil
}
126 changes: 82 additions & 44 deletions client/provide.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"crypto/sha256"
"errors"
"fmt"
"sync"
"time"

"github.com/ipfs/go-cid"
Expand Down Expand Up @@ -272,77 +273,114 @@ type ProvideAsyncResult struct {
Err error
}

func (fp *Client) Provide(ctx context.Context, keys []cid.Cid, ttl time.Duration) (time.Duration, error) {
req := ProvideRequest{
Key: keys,
Provider: fp.provider,
AdvisoryTTL: ttl,
Timestamp: time.Now().Unix(),
func chunks(slice []cid.Cid, size int) [][]cid.Cid {
if size == 0 {
return [][]cid.Cid{slice}
}

if fp.identity != nil {
if err := req.Sign(fp.identity); err != nil {
return 0, err
var chunks [][]cid.Cid
for i := 0; i < len(slice); i += size {
end := i + size

if end > len(slice) {
end = len(slice)
}
}

record, err := fp.ProvideSignedRecord(ctx, &req)
if err != nil {
return 0, err
chunks = append(chunks, slice[i:end])
}

return chunks
}

func (fp *Client) Provide(ctx context.Context, allKeys []cid.Cid, ttl time.Duration) (time.Duration, error) {
slices := chunks(allKeys, fp.providerBatchSize)

var d time.Duration
var set bool
for resp := range record {
if resp.Err == nil {
set = true
if resp.AdvisoryTTL > d {
d = resp.AdvisoryTTL
var finalErr error
for _, keys := range slices {
req := ProvideRequest{
Key: keys,
Provider: fp.provider,
AdvisoryTTL: ttl,
Timestamp: time.Now().Unix(),
}

if fp.identity != nil {
if err := req.Sign(fp.identity); err != nil {
return 0, err
}
}

record, err := fp.ProvideSignedRecord(ctx, &req)
if err != nil {
return 0, err
}

for resp := range record {
if resp.Err == nil {
set = true
if resp.AdvisoryTTL > d {
d = resp.AdvisoryTTL
}
} else if resp.Err != nil {
finalErr = resp.Err
}
} else if resp.Err != nil {
err = resp.Err
}
}

if set {
return d, nil
} else if err == nil {
} else if finalErr == nil {
return 0, fmt.Errorf("no response")
}
return 0, err

return 0, finalErr
}

func (fp *Client) ProvideAsync(ctx context.Context, keys []cid.Cid, ttl time.Duration) (<-chan time.Duration, error) {
req := ProvideRequest{
Key: keys,
Provider: fp.provider,
AdvisoryTTL: ttl,
Timestamp: time.Now().Unix(),
}
func (fp *Client) ProvideAsync(ctx context.Context, allKeys []cid.Cid, ttl time.Duration) (<-chan time.Duration, error) {
slices := chunks(allKeys, fp.providerBatchSize)

ch := make(chan time.Duration, 1)
var wg sync.WaitGroup
for _, keys := range slices {
wg.Add(1)
req := ProvideRequest{
Key: keys,
Provider: fp.provider,
AdvisoryTTL: ttl,
Timestamp: time.Now().Unix(),
}

if fp.identity != nil {
if err := req.Sign(fp.identity); err != nil {
if fp.identity != nil {
if err := req.Sign(fp.identity); err != nil {
close(ch)
return ch, err
}
}

record, err := fp.ProvideSignedRecord(ctx, &req)
if err != nil {
close(ch)
return ch, err
}
go func() {
defer wg.Done()
for resp := range record {
if resp.Err != nil {
logger.Infof("dropping partial provide failure (%v)", err)
} else {
ch <- resp.AdvisoryTTL
}
}
}()
}

record, err := fp.ProvideSignedRecord(ctx, &req)
if err != nil {
close(ch)
return ch, err
}
go func() {
defer close(ch)
for resp := range record {
if resp.Err != nil {
logger.Infof("dropping partial provide failure (%v)", err)
} else {
ch <- resp.AdvisoryTTL
}
}
wg.Wait()
close(ch)
}()

return ch, nil
}

Expand Down
8 changes: 4 additions & 4 deletions test/clientserver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
"github.com/multiformats/go-multihash"
)

func createClientAndServer(t *testing.T, service server.DelegatedRoutingService, p *client.Provider, identity crypto.PrivKey) (*client.Client, *httptest.Server) {
func createClientAndServer(t *testing.T, service server.DelegatedRoutingService, cfg *client.Config) (*client.Client, *httptest.Server) {
// start a server
s := httptest.NewServer(server.DelegatedRoutingAsyncHandler(service))

Expand All @@ -32,7 +32,7 @@ func createClientAndServer(t *testing.T, service server.DelegatedRoutingService,
if err != nil {
t.Fatal(err)
}
c, err := client.NewClient(q, p, identity)
c, err := client.NewClient(q, cfg)
if err != nil {
t.Fatal(err)
}
Expand All @@ -43,7 +43,7 @@ func createClientAndServer(t *testing.T, service server.DelegatedRoutingService,
func testClientServer(t *testing.T, numIter int) (avgLatency time.Duration, deltaGo int, deltaMem uint64) {
t.Helper()

c, s := createClientAndServer(t, testDelegatedRoutingService{}, nil, nil)
c, s := createClientAndServer(t, testDelegatedRoutingService{}, &client.Config{})
defer s.Close()

// verify result
Expand Down Expand Up @@ -188,7 +188,7 @@ func (s testStatistic) DeviatesBy(numStddev float64) bool {

func TestCancelContext(t *testing.T) {
drService := &hangingDelegatedRoutingService{}
c, s := createClientAndServer(t, drService, nil, nil)
c, s := createClientAndServer(t, drService, &client.Config{})
defer s.Close()

ctx, cancel := context.WithCancel(context.Background())
Expand Down
2 changes: 1 addition & 1 deletion test/fallbacks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func TestClientWithServerReturningUnknownValues(t *testing.T) {
if err != nil {
t.Fatal(err)
}
c, err := client.NewClient(q, nil, nil)
c, err := client.NewClient(q, &client.Config{})
if err != nil {
t.Fatal(err)
}
Expand Down
67 changes: 64 additions & 3 deletions test/provide_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func TestProvideRoundtrip(t *testing.T) {
t.Fatal(err)
}

c1, s1 := createClientAndServer(t, testDelegatedRoutingService{}, nil, nil)
c1, s1 := createClientAndServer(t, testDelegatedRoutingService{}, &client.Config{})
defer s1.Close()

testMH, _ := multihash.Encode([]byte("test"), multihash.IDENTITY)
Expand All @@ -45,13 +45,74 @@ func TestProvideRoundtrip(t *testing.T) {
t.Fatal(err)
}

c, s := createClientAndServer(t, testDelegatedRoutingService{}, &client.Provider{
prov := &client.Provider{
Peer: peer.AddrInfo{
ID: pID,
Addrs: []multiaddr.Multiaddr{ma1, ma2},
},
ProviderProto: []client.TransferProtocol{{Codec: multicodec.TransportBitswap}},
}, priv)
}

c, s := createClientAndServer(t, testDelegatedRoutingService{}, &client.Config{
Provider: prov,
Identity: priv,
})
defer s.Close()

rc, err := c.Provide(context.Background(), []cid.Cid{testCid}, 2*time.Hour)
if err != nil {
t.Fatal(err)
}

if rc != time.Hour {
t.Fatal("should have gotten back the the fixed server ttl")
}
}

func TestProvideRoundtripTwoBatches(t *testing.T) {
priv, _, err := crypto.GenerateEd25519Key(rand.Reader)
if err != nil {
t.Fatal(err)
}
pID, err := peer.IDFromPrivateKey(priv)
if err != nil {
t.Fatal(err)
}

c1, s1 := createClientAndServer(t, testDelegatedRoutingService{}, &client.Config{ProvideBatchSize: 1})
defer s1.Close()

testMH, _ := multihash.Encode([]byte("test"), multihash.IDENTITY)
testCid := cid.NewCidV1(cid.Raw, testMH)
testMH2, _ := multihash.Encode([]byte("test2"), multihash.IDENTITY)
testCid2 := cid.NewCidV1(cid.Raw, testMH2)

if _, err = c1.Provide(context.Background(), []cid.Cid{testCid, testCid2}, time.Hour); err == nil {
t.Fatal("should get sync error on unsigned provide request.")
}

ma1, err := multiaddr.NewMultiaddr("/ip4/0.0.0.0/tcp/4001")
if err != nil {
t.Fatal(err)
}

ma2, err := multiaddr.NewMultiaddr("/ip4/0.0.0.0/tcp/4002")
if err != nil {
t.Fatal(err)
}

prov := &client.Provider{
Peer: peer.AddrInfo{
ID: pID,
Addrs: []multiaddr.Multiaddr{ma1, ma2},
},
ProviderProto: []client.TransferProtocol{{Codec: multicodec.TransportBitswap}},
}

c, s := createClientAndServer(t, testDelegatedRoutingService{}, &client.Config{
Provider: prov,
Identity: priv,
})
defer s.Close()

rc, err := c.Provide(context.Background(), []cid.Cid{testCid}, 2*time.Hour)
Expand Down
2 changes: 1 addition & 1 deletion test/servererror_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func TestClientWithServerReturningErrors(t *testing.T) {
if err != nil {
t.Fatal(err)
}
c, err := client.NewClient(q, nil, nil)
c, err := client.NewClient(q, &client.Config{})
if err != nil {
t.Fatal(err)
}
Expand Down