Skip to content

Commit

Permalink
add support for bulk puts
Browse files Browse the repository at this point in the history
  • Loading branch information
aschmahmann committed Apr 22, 2021
1 parent a217754 commit 1c799d7
Showing 1 changed file with 60 additions and 17 deletions.
77 changes: 60 additions & 17 deletions fullrt/dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ type FullRT struct {
waitFrac float64
timeoutPerOp time.Duration

provideManyParallelism int
bulkSendParallelism int
}

// NewFullRT creates a DHT client that tracks the full network. It takes a protocol prefix for the given network,
Expand Down Expand Up @@ -163,7 +163,7 @@ func NewFullRT(ctx context.Context, h host.Host, protocolPrefix protocol.ID, opt

crawlerInterval: time.Minute * 60,

provideManyParallelism: 10,
bulkSendParallelism: 10,
}

go rt.runCrawler(ctx)
Expand Down Expand Up @@ -835,14 +835,6 @@ func (dht *FullRT) ProvideMany(ctx context.Context, keys []multihash.Multihash)
return routing.ErrNotSupported
}

keysAsPeerIDs := make([]peer.ID, 0, len(keys))
for _, k := range keys {
keysAsPeerIDs = append(keysAsPeerIDs, peer.ID(k))
}
sortedKeys := kb.SortClosestPeers(keysAsPeerIDs, kb.ID(make([]byte, 32)))

var anyProvidesSuccessful uint64 = 0

// Compute addresses once for all provides
pi := peer.AddrInfo{
ID: dht.h.ID(),
Expand All @@ -856,7 +848,7 @@ func (dht *FullRT) ProvideMany(ctx context.Context, keys []multihash.Multihash)
return fmt.Errorf("no known addresses for self, cannot put provider")
}

fn := func(k peer.ID) error {
fn := func(ctx context.Context, k peer.ID) error {
peers, err := dht.GetClosestPeers(ctx, string(k))
if err != nil {
return err
Expand All @@ -873,10 +865,61 @@ func (dht *FullRT) ProvideMany(ctx context.Context, keys []multihash.Multihash)
return nil
}

keysAsPeerIDs := make([]peer.ID, 0, len(keys))
for _, k := range keys {
keysAsPeerIDs = append(keysAsPeerIDs, peer.ID(k))
}

return dht.bulkMessageSend(ctx, keysAsPeerIDs, fn)
}

func (dht *FullRT) PutMany(ctx context.Context, keys []string, values [][]byte) error {
if !dht.enableValues {
return routing.ErrNotSupported
}

if len(keys) != len(values) {
return fmt.Errorf("number of keys does not match the number of values")
}

keysAsPeerIDs := make([]peer.ID, 0, len(keys))
keyRecMap := make(map[string][]byte)
for i, k := range keys {
keysAsPeerIDs = append(keysAsPeerIDs, peer.ID(k))
keyRecMap[k] = values[i]
}

if len(keys) != len(keyRecMap) {
return fmt.Errorf("does not support duplicate keys")
}

fn := func(ctx context.Context, k peer.ID) error {
peers, err := dht.GetClosestPeers(ctx, string(k))
if err != nil {
return err
}
successes := dht.execOnMany(ctx, func(ctx context.Context, p peer.ID) error {
keyStr := string(k)
return dht.protoMessenger.PutValue(ctx, p, record.MakePutRecord(keyStr, keyRecMap[keyStr]))
}, peers)
if successes == 0 {
return fmt.Errorf("no successful puts")
}
return nil
}

return dht.bulkMessageSend(ctx, keysAsPeerIDs, fn)
}

func (dht *FullRT) bulkMessageSend(ctx context.Context, keys []peer.ID, fn func(ctx context.Context, k peer.ID) error) error {
sortedKeys := kb.SortClosestPeers(keys, kb.ID(make([]byte, 32)))

var anySendsSuccessful uint64 = 0

wg := sync.WaitGroup{}
wg.Add(dht.provideManyParallelism)
chunkSize := len(sortedKeys) / dht.provideManyParallelism
for i := 0; i < dht.provideManyParallelism; i++ {
wg.Add(dht.bulkSendParallelism)
chunkSize := len(sortedKeys) / dht.bulkSendParallelism
for i := 0; i < dht.bulkSendParallelism; i++ {
var chunk []peer.ID
end := (i + 1) * chunkSize
if end > len(sortedKeys) {
Expand All @@ -895,17 +938,17 @@ func (dht *FullRT) ProvideMany(ctx context.Context, keys []multihash.Multihash)
logger.Infof("reprovide goroutine: %v pct done - %d/%d done - %d total", (ki*100)/len(chunk), ki, len(chunk), len(sortedKeys))
}
}
if err := fn(key); err != nil {
if err := fn(ctx, key); err != nil {
logger.Infof("failed to complete provide of key :%v. %v", internal.LoggableProviderRecordBytes(key), err)
} else {
atomic.CompareAndSwapUint64(&anyProvidesSuccessful, 0, 1)
atomic.CompareAndSwapUint64(&anySendsSuccessful, 0, 1)
}
}
}()
}
wg.Wait()

if anyProvidesSuccessful == 0 {
if anySendsSuccessful == 0 {
return fmt.Errorf("failed to complete provides")
}

Expand Down

0 comments on commit 1c799d7

Please sign in to comment.