Skip to content

Commit

Permalink
merge: fix: issues discovered in kubo v0.21.0-rc1 (#851)
Browse files Browse the repository at this point in the history
Bring back fixes into master
  • Loading branch information
Jorropo authored Jun 15, 2023
2 parents 3c568a7 + 5bbf6ca commit a497df1
Show file tree
Hide file tree
Showing 18 changed files with 438 additions and 462 deletions.
256 changes: 139 additions & 117 deletions dht.go

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions dht_bootstrap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,8 +191,8 @@ func TestBootstrappersReplacable(t *testing.T) {
require.NoError(t, d.host.Network().ClosePeer(d5.self))
connectNoSync(t, ctx, d, d1)
connectNoSync(t, ctx, d, d5)
d.peerFound(ctx, d5.self)
d.peerFound(ctx, d1.self)
d.peerFound(d5.self)
d.peerFound(d1.self)
time.Sleep(1 * time.Second)

require.Len(t, d.routingTable.ListPeers(), 2)
Expand Down
5 changes: 0 additions & 5 deletions dht_net.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,11 +110,6 @@ func (dht *IpfsDHT) handleNewMessage(s network.Stream) bool {
return false
}

// a peer has queried us, let's add it to RT. A new go routine is required
// because we can't block the stream handler until the remote peer answers
// our query.
go dht.peerFound(dht.ctx, mPeer)

if c := baseLogger.Check(zap.DebugLevel, "handling message"); c != nil {
c.Write(zap.String("from", mPeer.String()),
zap.Int32("type", int32(req.GetType())),
Expand Down
9 changes: 9 additions & 0 deletions dht_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,15 @@ func Resiliency(beta int) Option {
}
}

// LookupInterval configures maximal number of go routines that can be used to
// perform a lookup check operation, before adding a new node to the routing table.
func LookupCheckConcurrency(n int) Option {
return func(c *dhtcfg.Config) error {
c.LookupCheckConcurrency = n
return nil
}
}

// MaxRecordAge specifies the maximum time that any node will hold onto a record ("PutValue record")
// from the time its received. This does not apply to any other forms of validity that
// the record may contain.
Expand Down
69 changes: 29 additions & 40 deletions dht_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,9 +168,7 @@ func connectNoSync(t *testing.T, ctx context.Context, a, b *IpfsDHT) {
t.Fatal("peers setup incorrectly: no local address")
}

a.peerstore.AddAddrs(idB, addrB, peerstore.TempAddrTTL)
pi := peer.AddrInfo{ID: idB}
if err := a.host.Connect(ctx, pi); err != nil {
if err := a.host.Connect(ctx, peer.AddrInfo{ID: idB, Addrs: addrB}); err != nil {
t.Fatal(err)
}
}
Expand Down Expand Up @@ -298,8 +296,6 @@ func TestValueGetSet(t *testing.T) {
t.Fatalf("Expected 'world' got '%s'", string(val))
}

// late connect

connect(t, ctx, dhts[2], dhts[0])
connect(t, ctx, dhts[2], dhts[1])

Expand Down Expand Up @@ -612,40 +608,41 @@ func waitForWellFormedTables(t *testing.T, dhts []*IpfsDHT, minPeers, avgPeers i
// test "well-formed-ness" (>= minPeers peers in every routing table)
t.Helper()

checkTables := func() bool {
totalPeers := 0
for _, dht := range dhts {
rtlen := dht.routingTable.Size()
totalPeers += rtlen
if minPeers > 0 && rtlen < minPeers {
// t.Logf("routing table for %s only has %d peers (should have >%d)", dht.self, rtlen, minPeers)
return false
}
}
actualAvgPeers := totalPeers / len(dhts)
t.Logf("avg rt size: %d", actualAvgPeers)
if avgPeers > 0 && actualAvgPeers < avgPeers {
t.Logf("avg rt size: %d < %d", actualAvgPeers, avgPeers)
return false
}
return true
}

timeoutA := time.After(timeout)
for {
select {
case <-timeoutA:
t.Errorf("failed to reach well-formed routing tables after %s", timeout)
return
case <-time.After(5 * time.Millisecond):
if checkTables() {
if checkForWellFormedTablesOnce(t, dhts, minPeers, avgPeers) {
// succeeded
return
}
}
}
}

func checkForWellFormedTablesOnce(t *testing.T, dhts []*IpfsDHT, minPeers, avgPeers int) bool {
t.Helper()
totalPeers := 0
for _, dht := range dhts {
rtlen := dht.routingTable.Size()
totalPeers += rtlen
if minPeers > 0 && rtlen < minPeers {
//t.Logf("routing table for %s only has %d peers (should have >%d)", dht.self, rtlen, minPeers)
return false
}
}
actualAvgPeers := totalPeers / len(dhts)
t.Logf("avg rt size: %d", actualAvgPeers)
if avgPeers > 0 && actualAvgPeers < avgPeers {
t.Logf("avg rt size: %d < %d", actualAvgPeers, avgPeers)
return false
}
return true
}

func printRoutingTables(dhts []*IpfsDHT) {
// the routing tables should be full now. let's inspect them.
fmt.Printf("checking routing table of %d\n", len(dhts))
Expand Down Expand Up @@ -681,24 +678,16 @@ func TestRefresh(t *testing.T) {
<-time.After(100 * time.Millisecond)
// bootstrap a few times until we get good tables.
t.Logf("bootstrapping them so they find each other %d", nDHTs)
ctxT, cancelT := context.WithTimeout(ctx, 5*time.Second)
defer cancelT()

for ctxT.Err() == nil {
bootstrap(t, ctxT, dhts)
for {
bootstrap(t, ctx, dhts)

// wait a bit.
select {
case <-time.After(50 * time.Millisecond):
continue // being explicit
case <-ctxT.Done():
return
if checkForWellFormedTablesOnce(t, dhts, 7, 10) {
break
}
}

waitForWellFormedTables(t, dhts, 7, 10, 10*time.Second)

cancelT()
time.Sleep(time.Microsecond * 50)
}

if u.Debug {
// the routing tables should be full now. let's inspect them.
Expand Down Expand Up @@ -2123,7 +2112,7 @@ func TestBootstrapPeersFunc(t *testing.T) {
bootstrapPeersB = []peer.AddrInfo{addrA}
lock.Unlock()

dhtB.fixLowPeers(ctx)
dhtB.fixLowPeers()
require.NotEqual(t, 0, len(dhtB.host.Network().Peers()))
}

Expand Down
5 changes: 2 additions & 3 deletions fullrt/dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ func NewFullRT(h host.Host, protocolPrefix protocol.ID, options ...Option) (*Ful
ctx, cancel := context.WithCancel(context.Background())

self := h.ID()
pm, err := providers.NewProviderManager(ctx, self, h.Peerstore(), dhtcfg.Datastore, fullrtcfg.pmOpts...)
pm, err := providers.NewProviderManager(self, h.Peerstore(), dhtcfg.Datastore, fullrtcfg.pmOpts...)
if err != nil {
cancel()
return nil, err
Expand Down Expand Up @@ -355,9 +355,8 @@ func (dht *FullRT) runCrawler(ctx context.Context) {

func (dht *FullRT) Close() error {
dht.cancel()
err := dht.ProviderManager.Process().Close()
dht.wg.Wait()
return err
return dht.ProviderManager.Close()
}

func (dht *FullRT) Bootstrap(ctx context.Context) error {
Expand Down
38 changes: 19 additions & 19 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,8 @@ require (
github.com/ipfs/go-datastore v0.6.0
github.com/ipfs/go-detect-race v0.0.1
github.com/ipfs/go-log v1.0.5
github.com/jbenet/goprocess v0.1.4
github.com/libp2p/go-libp2p v0.27.5
github.com/libp2p/go-libp2p-kbucket v0.6.1
github.com/libp2p/go-libp2p v0.27.6
github.com/libp2p/go-libp2p-kbucket v0.6.3
github.com/libp2p/go-libp2p-record v0.2.0
github.com/libp2p/go-libp2p-routing-helpers v0.7.0
github.com/libp2p/go-libp2p-testing v0.12.0
Expand All @@ -32,12 +31,13 @@ require (
go.opencensus.io v0.24.0
go.opentelemetry.io/otel v1.16.0
go.opentelemetry.io/otel/trace v1.16.0
go.uber.org/multierr v1.11.0
go.uber.org/zap v1.24.0
gonum.org/v1/gonum v0.13.0
)

require (
github.com/benbjohnson/clock v1.3.0 // indirect
github.com/benbjohnson/clock v1.3.5 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/containerd/cgroups v1.1.0 // indirect
Expand All @@ -55,28 +55,29 @@ require (
github.com/godbus/dbus/v5 v5.1.0 // indirect
github.com/golang/mock v1.6.0 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/google/pprof v0.0.0-20230405160723-4a4c7d95572b // indirect
github.com/google/pprof v0.0.0-20230602150820-91b7bce49751 // indirect
github.com/gorilla/websocket v1.5.0 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/huin/goupnp v1.1.0 // indirect
github.com/huin/goupnp v1.2.0 // indirect
github.com/ipfs/go-log/v2 v2.5.1 // indirect
github.com/ipld/go-ipld-prime v0.20.0 // indirect
github.com/jackpal/go-nat-pmp v1.0.2 // indirect
github.com/jbenet/go-temp-err-catcher v0.1.0 // indirect
github.com/klauspost/compress v1.16.4 // indirect
github.com/jbenet/goprocess v0.1.4 // indirect
github.com/klauspost/compress v1.16.5 // indirect
github.com/klauspost/cpuid/v2 v2.2.5 // indirect
github.com/koron/go-ssdp v0.0.4 // indirect
github.com/libp2p/go-buffer-pool v0.1.0 // indirect
github.com/libp2p/go-cidranger v1.1.0 // indirect
github.com/libp2p/go-flow-metrics v0.1.0 // indirect
github.com/libp2p/go-libp2p-asn-util v0.3.0 // indirect
github.com/libp2p/go-nat v0.1.0 // indirect
github.com/libp2p/go-reuseport v0.2.0 // indirect
github.com/libp2p/go-reuseport v0.3.0 // indirect
github.com/libp2p/go-yamux/v4 v4.0.0 // indirect
github.com/marten-seemann/tcp v0.0.0-20210406111302-dfbc87cc63fd // indirect
github.com/mattn/go-isatty v0.0.19 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect
github.com/miekg/dns v1.1.53 // indirect
github.com/miekg/dns v1.1.54 // indirect
github.com/mikioh/tcpinfo v0.0.0-20190314235526-30a79bb1804b // indirect
github.com/mikioh/tcpopt v0.0.0-20190314235656-172688c1accc // indirect
github.com/minio/sha256-simd v1.0.1 // indirect
Expand All @@ -86,37 +87,36 @@ require (
github.com/multiformats/go-multiaddr-fmt v0.1.0 // indirect
github.com/multiformats/go-multicodec v0.9.0 // indirect
github.com/multiformats/go-varint v0.0.7 // indirect
github.com/onsi/ginkgo/v2 v2.9.2 // indirect
github.com/onsi/ginkgo/v2 v2.9.7 // indirect
github.com/opencontainers/runtime-spec v1.0.2 // indirect
github.com/opentracing/opentracing-go v1.2.0 // indirect
github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/polydawn/refmt v0.89.0 // indirect
github.com/prometheus/client_golang v1.14.0 // indirect
github.com/prometheus/client_model v0.3.0 // indirect
github.com/prometheus/client_model v0.4.0 // indirect
github.com/prometheus/common v0.42.0 // indirect
github.com/prometheus/procfs v0.9.0 // indirect
github.com/quic-go/qpack v0.4.0 // indirect
github.com/quic-go/qtls-go1-19 v0.3.2 // indirect
github.com/quic-go/qtls-go1-20 v0.2.2 // indirect
github.com/quic-go/quic-go v0.33.0 // indirect
github.com/quic-go/webtransport-go v0.5.2 // indirect
github.com/quic-go/webtransport-go v0.5.3 // indirect
github.com/raulk/go-watchdog v1.3.0 // indirect
github.com/spaolacci/murmur3 v1.1.0 // indirect
go.opentelemetry.io/otel/metric v1.16.0 // indirect
go.uber.org/atomic v1.11.0 // indirect
go.uber.org/dig v1.16.1 // indirect
go.uber.org/dig v1.17.0 // indirect
go.uber.org/fx v1.19.2 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/crypto v0.9.0 // indirect
golang.org/x/crypto v0.10.0 // indirect
golang.org/x/exp v0.0.0-20230321023759-10a507213a29 // indirect
golang.org/x/mod v0.10.0 // indirect
golang.org/x/net v0.10.0 // indirect
golang.org/x/sync v0.1.0 // indirect
golang.org/x/sys v0.8.0 // indirect
golang.org/x/text v0.9.0 // indirect
golang.org/x/tools v0.7.0 // indirect
golang.org/x/sync v0.2.0 // indirect
golang.org/x/sys v0.9.0 // indirect
golang.org/x/text v0.10.0 // indirect
golang.org/x/tools v0.9.1 // indirect
google.golang.org/protobuf v1.30.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
lukechampine.com/blake3 v1.2.1 // indirect
Expand Down
Loading

0 comments on commit a497df1

Please sign in to comment.