diff --git a/client/client.go b/client/client.go index 9ccd9cd..ba0a8aa 100644 --- a/client/client.go +++ b/client/client.go @@ -31,23 +31,44 @@ type Client struct { client proto.DelegatedRouting_Client validator record.Validator - provider *Provider - identity crypto.PrivKey + provider *Provider + providerBatchSize int + identity crypto.PrivKey +} + +type Config struct { + Provider *Provider + Identity crypto.PrivKey + ProvideBatchSize int +} + +// Validate validates the current config, and also sets defaults values when needed. +func (c *Config) Validate() error { + if c.Provider != nil && !c.Provider.Peer.ID.MatchesPublicKey(c.Identity.GetPublic()) { + return errors.New("identity does not match provider") + } + + if c.ProvideBatchSize == 0 { + c.ProvideBatchSize = 30000 // this will generate payloads of ~1MB in size + } + + return nil } var _ DelegatedRoutingClient = (*Client)(nil) // NewClient creates a client. // The Provider and identity parameters are option. If they are nil, the `Provide` method will not function. -func NewClient(c proto.DelegatedRouting_Client, p *Provider, identity crypto.PrivKey) (*Client, error) { - if p != nil && !p.Peer.ID.MatchesPublicKey(identity.GetPublic()) { - return nil, errors.New("identity does not match provider") +func NewClient(c proto.DelegatedRouting_Client, cfg *Config) (*Client, error) { + if err := cfg.Validate(); err != nil { + return nil, err } return &Client{ - client: c, - validator: ipns.Validator{}, - provider: p, - identity: identity, + client: c, + validator: ipns.Validator{}, + provider: cfg.Provider, + providerBatchSize: cfg.ProvideBatchSize, + identity: cfg.Identity, }, nil } diff --git a/client/provide.go b/client/provide.go index 907850b..136122b 100644 --- a/client/provide.go +++ b/client/provide.go @@ -6,6 +6,7 @@ import ( "crypto/sha256" "errors" "fmt" + "sync" "time" "github.com/ipfs/go-cid" @@ -272,77 +273,114 @@ type ProvideAsyncResult struct { Err error } -func (fp *Client) Provide(ctx context.Context, keys []cid.Cid, ttl time.Duration) (time.Duration, error) { - req := ProvideRequest{ - Key: keys, - Provider: fp.provider, - AdvisoryTTL: ttl, - Timestamp: time.Now().Unix(), +func chunks(slice []cid.Cid, size int) [][]cid.Cid { + if size == 0 { + return [][]cid.Cid{slice} } - if fp.identity != nil { - if err := req.Sign(fp.identity); err != nil { - return 0, err + var chunks [][]cid.Cid + for i := 0; i < len(slice); i += size { + end := i + size + + if end > len(slice) { + end = len(slice) } - } - record, err := fp.ProvideSignedRecord(ctx, &req) - if err != nil { - return 0, err + chunks = append(chunks, slice[i:end]) } + return chunks +} + +func (fp *Client) Provide(ctx context.Context, allKeys []cid.Cid, ttl time.Duration) (time.Duration, error) { + slices := chunks(allKeys, fp.providerBatchSize) + var d time.Duration var set bool - for resp := range record { - if resp.Err == nil { - set = true - if resp.AdvisoryTTL > d { - d = resp.AdvisoryTTL + var finalErr error + for _, keys := range slices { + req := ProvideRequest{ + Key: keys, + Provider: fp.provider, + AdvisoryTTL: ttl, + Timestamp: time.Now().Unix(), + } + + if fp.identity != nil { + if err := req.Sign(fp.identity); err != nil { + return 0, err + } + } + + record, err := fp.ProvideSignedRecord(ctx, &req) + if err != nil { + return 0, err + } + + for resp := range record { + if resp.Err == nil { + set = true + if resp.AdvisoryTTL > d { + d = resp.AdvisoryTTL + } + } else if resp.Err != nil { + finalErr = resp.Err } - } else if resp.Err != nil { - err = resp.Err } } if set { return d, nil - } else if err == nil { + } else if finalErr == nil { return 0, fmt.Errorf("no response") } - return 0, err + + return 0, finalErr } -func (fp *Client) ProvideAsync(ctx context.Context, keys []cid.Cid, ttl time.Duration) (<-chan time.Duration, error) { - req := ProvideRequest{ - Key: keys, - Provider: fp.provider, - AdvisoryTTL: ttl, - Timestamp: time.Now().Unix(), - } +func (fp *Client) ProvideAsync(ctx context.Context, allKeys []cid.Cid, ttl time.Duration) (<-chan time.Duration, error) { + slices := chunks(allKeys, fp.providerBatchSize) + ch := make(chan time.Duration, 1) + var wg sync.WaitGroup + for _, keys := range slices { + wg.Add(1) + req := ProvideRequest{ + Key: keys, + Provider: fp.provider, + AdvisoryTTL: ttl, + Timestamp: time.Now().Unix(), + } - if fp.identity != nil { - if err := req.Sign(fp.identity); err != nil { + if fp.identity != nil { + if err := req.Sign(fp.identity); err != nil { + close(ch) + return ch, err + } + } + + record, err := fp.ProvideSignedRecord(ctx, &req) + if err != nil { close(ch) return ch, err } + go func() { + defer wg.Done() + for resp := range record { + if resp.Err != nil { + logger.Infof("dropping partial provide failure (%v)", err) + } else { + ch <- resp.AdvisoryTTL + } + } + }() } - record, err := fp.ProvideSignedRecord(ctx, &req) - if err != nil { - close(ch) - return ch, err - } go func() { - defer close(ch) - for resp := range record { - if resp.Err != nil { - logger.Infof("dropping partial provide failure (%v)", err) - } else { - ch <- resp.AdvisoryTTL - } - } + wg.Wait() + close(ch) }() + return ch, nil } diff --git a/test/clientserver_test.go b/test/clientserver_test.go index aaea428..0f9efe4 100644 --- a/test/clientserver_test.go +++ b/test/clientserver_test.go @@ -23,7 +23,7 @@ import ( "github.com/multiformats/go-multihash" ) -func createClientAndServer(t *testing.T, service server.DelegatedRoutingService, p *client.Provider, identity crypto.PrivKey) (*client.Client, *httptest.Server) { +func createClientAndServer(t *testing.T, service server.DelegatedRoutingService, cfg *client.Config) (*client.Client, *httptest.Server) { // start a server s := httptest.NewServer(server.DelegatedRoutingAsyncHandler(service)) @@ -32,7 +32,7 @@ func createClientAndServer(t *testing.T, service server.DelegatedRoutingService, if err != nil { t.Fatal(err) } - c, err := client.NewClient(q, p, identity) + c, err := client.NewClient(q, cfg) if err != nil { t.Fatal(err) } @@ -43,7 +43,7 @@ func createClientAndServer(t *testing.T, service server.DelegatedRoutingService, func testClientServer(t *testing.T, numIter int) (avgLatency time.Duration, deltaGo int, deltaMem uint64) { t.Helper() - c, s := createClientAndServer(t, testDelegatedRoutingService{}, nil, nil) + c, s := createClientAndServer(t, testDelegatedRoutingService{}, &client.Config{}) defer s.Close() // verify result @@ -188,7 +188,7 @@ func (s testStatistic) DeviatesBy(numStddev float64) bool { func TestCancelContext(t *testing.T) { drService := &hangingDelegatedRoutingService{} - c, s := createClientAndServer(t, drService, nil, nil) + c, s := createClientAndServer(t, drService, &client.Config{}) defer s.Close() ctx, cancel := context.WithCancel(context.Background()) diff --git a/test/fallbacks_test.go b/test/fallbacks_test.go index a6c267c..4657336 100644 --- a/test/fallbacks_test.go +++ b/test/fallbacks_test.go @@ -24,7 +24,7 @@ func TestClientWithServerReturningUnknownValues(t *testing.T) { if err != nil { t.Fatal(err) } - c, err := client.NewClient(q, nil, nil) + c, err := client.NewClient(q, &client.Config{}) if err != nil { t.Fatal(err) } diff --git a/test/provide_test.go b/test/provide_test.go index 55ed696..ffe3abe 100644 --- a/test/provide_test.go +++ b/test/provide_test.go @@ -25,7 +25,7 @@ func TestProvideRoundtrip(t *testing.T) { t.Fatal(err) } - c1, s1 := createClientAndServer(t, testDelegatedRoutingService{}, nil, nil) + c1, s1 := createClientAndServer(t, testDelegatedRoutingService{}, &client.Config{}) defer s1.Close() testMH, _ := multihash.Encode([]byte("test"), multihash.IDENTITY) @@ -45,13 +45,74 @@ func TestProvideRoundtrip(t *testing.T) { t.Fatal(err) } - c, s := createClientAndServer(t, testDelegatedRoutingService{}, &client.Provider{ + prov := &client.Provider{ Peer: peer.AddrInfo{ ID: pID, Addrs: []multiaddr.Multiaddr{ma1, ma2}, }, ProviderProto: []client.TransferProtocol{{Codec: multicodec.TransportBitswap}}, - }, priv) + } + + c, s := createClientAndServer(t, testDelegatedRoutingService{}, &client.Config{ + Provider: prov, + Identity: priv, + }) + defer s.Close() + + rc, err := c.Provide(context.Background(), []cid.Cid{testCid}, 2*time.Hour) + if err != nil { + t.Fatal(err) + } + + if rc != time.Hour { + t.Fatal("should have gotten back the the fixed server ttl") + } +} + +func TestProvideRoundtripTwoBatches(t *testing.T) { + priv, _, err := crypto.GenerateEd25519Key(rand.Reader) + if err != nil { + t.Fatal(err) + } + pID, err := peer.IDFromPrivateKey(priv) + if err != nil { + t.Fatal(err) + } + + c1, s1 := createClientAndServer(t, testDelegatedRoutingService{}, &client.Config{ProvideBatchSize: 1}) + defer s1.Close() + + testMH, _ := multihash.Encode([]byte("test"), multihash.IDENTITY) + testCid := cid.NewCidV1(cid.Raw, testMH) + testMH2, _ := multihash.Encode([]byte("test2"), multihash.IDENTITY) + testCid2 := cid.NewCidV1(cid.Raw, testMH2) + + if _, err = c1.Provide(context.Background(), []cid.Cid{testCid, testCid2}, time.Hour); err == nil { + t.Fatal("should get sync error on unsigned provide request.") + } + + ma1, err := multiaddr.NewMultiaddr("/ip4/0.0.0.0/tcp/4001") + if err != nil { + t.Fatal(err) + } + + ma2, err := multiaddr.NewMultiaddr("/ip4/0.0.0.0/tcp/4002") + if err != nil { + t.Fatal(err) + } + + prov := &client.Provider{ + Peer: peer.AddrInfo{ + ID: pID, + Addrs: []multiaddr.Multiaddr{ma1, ma2}, + }, + ProviderProto: []client.TransferProtocol{{Codec: multicodec.TransportBitswap}}, + } + + c, s := createClientAndServer(t, testDelegatedRoutingService{}, &client.Config{ + Provider: prov, + Identity: priv, + }) defer s.Close() rc, err := c.Provide(context.Background(), []cid.Cid{testCid}, 2*time.Hour) diff --git a/test/servererror_test.go b/test/servererror_test.go index 5062f10..f9afb2c 100644 --- a/test/servererror_test.go +++ b/test/servererror_test.go @@ -23,7 +23,7 @@ func TestClientWithServerReturningErrors(t *testing.T) { if err != nil { t.Fatal(err) } - c, err := client.NewClient(q, nil, nil) + c, err := client.NewClient(q, &client.Config{}) if err != nil { t.Fatal(err) }