Skip to content

Commit

Permalink
optimization: (ValidatorStore) share updates (#1588)
Browse files Browse the repository at this point in the history
* implement bulk update of shares

* improve design for inserting to indexes

* reduce memory allocations and added a benchmark

* fix the test

* fix review comments

* refactors

* refactors

---------

Co-authored-by: moshe-blox <[email protected]>
  • Loading branch information
MatusKysel and moshe-blox authored Aug 11, 2024
1 parent 4c1ff53 commit 062d280
Show file tree
Hide file tree
Showing 3 changed files with 134 additions and 25 deletions.
12 changes: 9 additions & 3 deletions registry/storage/shares.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,18 +240,24 @@ func (s *sharesStorage) unsafeSave(rw basedb.ReadWriter, shares ...*types.SSVSha
return err
}

updateShares := make([]*types.SSVShare, 0, len(shares))
addShares := make([]*types.SSVShare, 0, len(shares))

for _, share := range shares {
key := hex.EncodeToString(share.ValidatorPubKey[:])

// Update validatorStore indices.
if _, ok := s.shares[key]; ok {
s.validatorStore.handleShareUpdated(share)
updateShares = append(updateShares, share)
} else {
s.validatorStore.handleSharesAdded(share)
addShares = append(addShares, share)
}

s.shares[key] = share
}

s.validatorStore.handleSharesUpdated(updateShares...)
s.validatorStore.handleSharesAdded(addShares...)

return nil
}

Expand Down
44 changes: 23 additions & 21 deletions registry/storage/validatorstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,35 +296,37 @@ func (c *validatorStore) handleShareRemoved(pk spectypes.ValidatorPK) {
}
}

func (c *validatorStore) handleShareUpdated(share *types.SSVShare) {
func (c *validatorStore) handleSharesUpdated(shares ...*types.SSVShare) {
c.mu.Lock()
defer c.mu.Unlock()

// Update byValidatorIndex
if share.HasBeaconMetadata() {
c.byValidatorIndex[share.BeaconMetadata.Index] = share
}
for _, share := range shares {

// Update byCommitteeID
for _, committee := range c.byCommitteeID {
if committee.ID != share.CommitteeID() {
continue
// Update byValidatorIndex
if share.HasBeaconMetadata() {
c.byValidatorIndex[share.BeaconMetadata.Index] = share
}
for i, validator := range committee.Validators {
if validator.ValidatorPubKey == share.ValidatorPubKey {
committee.Validators[i] = share
committee.Indices[i] = share.ValidatorIndex
break

// Update byCommitteeID
committee, ok := c.byCommitteeID[share.CommitteeID()]
if ok {
for i, validator := range committee.Validators {
if validator.ValidatorPubKey == share.ValidatorPubKey {
committee.Validators[i] = share
committee.Indices[i] = share.ValidatorIndex
break
}
}
}
}

// Update byOperatorID
for _, data := range c.byOperatorID {
for i, s := range data.shares {
if s.ValidatorPubKey == share.ValidatorPubKey {
data.shares[i] = share
break
// Update byOperatorID
for _, shareMember := range share.Committee {
data := c.byOperatorID[shareMember.Signer]
for i, s := range data.shares {
if s.ValidatorPubKey == share.ValidatorPubKey {
data.shares[i] = share
break
}
}
}
}
Expand Down
103 changes: 102 additions & 1 deletion registry/storage/validatorstore_test.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,15 @@
package storage

import (
"encoding/binary"
"fmt"
"math/rand"
"os"
"sync"
"sync/atomic"
"testing"

"github.com/aquasecurity/table"
eth2apiv1 "github.com/attestantio/go-eth2-client/api/v1"
"github.com/attestantio/go-eth2-client/spec/phase0"
"github.com/ethereum/go-ethereum/common"
Expand Down Expand Up @@ -156,7 +162,7 @@ func TestValidatorStore(t *testing.T) {
})

shareMap[share2.ValidatorPubKey] = updatedShare2
store.handleShareUpdated(updatedShare2)
store.handleSharesUpdated(updatedShare2)

// TODO: updatedShare2 now only changes Quorum field, which doesn't affect any indices. If handleShareUpdated expects to receive shares where indexes field are updated, this needs to be tested.

Expand Down Expand Up @@ -339,3 +345,98 @@ func TestSelfValidatorStore_NilOperatorID(t *testing.T) {
require.Nil(t, selfStore.SelfParticipatingCommittees(99))
require.Nil(t, selfStore.SelfParticipatingCommittees(201))
}

func BenchmarkValidatorStore_Update(b *testing.B) {
shares := map[spectypes.ValidatorPK]*ssvtypes.SSVShare{}

const (
totalOperators = 500
totalValidators = 50_000
)

var validatorIndex atomic.Int64
createShare := func(operators []spectypes.OperatorID) *ssvtypes.SSVShare {
index := validatorIndex.Add(1)

var pk spectypes.ValidatorPK
binary.LittleEndian.PutUint64(pk[:], uint64(index))

var committee []*spectypes.ShareMember
for _, signer := range operators {
committee = append(committee, &spectypes.ShareMember{Signer: signer})
}

return &ssvtypes.SSVShare{
Metadata: ssvtypes.Metadata{
BeaconMetadata: &beaconprotocol.ValidatorMetadata{
Index: phase0.ValidatorIndex(index),
},
},
Share: spectypes.Share{
ValidatorIndex: phase0.ValidatorIndex(index),
ValidatorPubKey: pk,
SharePubKey: pk[:],
Committee: committee,
FeeRecipientAddress: [20]byte{10, 20, 30},
Graffiti: []byte("example"),
},
}
}

for i := 0; i < totalValidators; i++ {
committee := make([]spectypes.OperatorID, 4)
if rand.Float64() < 0.02 {
// 2% chance of a purely random committee.
for i, id := range rand.Perm(totalOperators)[:4] {
committee[i] = spectypes.OperatorID(id)
}
} else {
// 98% chance to form big committees.
first := rand.Intn(totalOperators * 0.2) // 20% of the operators.
for i := range committee {
committee[i] = spectypes.OperatorID((first + i) % totalOperators)
}
}
share := createShare(committee)
shares[share.ValidatorPubKey] = share
}

// Print table of committees and validator counts for debugging.
committees := map[[4]spectypes.OperatorID]int{}
for _, share := range shares {
committee := [4]spectypes.OperatorID{}
for i, member := range share.Committee {
committee[i] = member.Signer
}
committees[committee]++
}
tbl := table.New(os.Stdout)
tbl.SetHeaders("Committee", "Validators")
for committee, count := range committees {
tbl.AddRow(fmt.Sprintf("%v", committee), fmt.Sprintf("%d", count))
}
// tbl.Render() // Uncomment to print.

b.Logf("Total committees: %d", len(committees))

store := newValidatorStore(
func() []*ssvtypes.SSVShare { return maps.Values(shares) },
func(pubKey []byte) *ssvtypes.SSVShare {
return shares[spectypes.ValidatorPK(pubKey)]
},
)
store.handleSharesAdded(maps.Values(shares)...)

pubKeys := maps.Keys(shares)

b.ResetTimer()
for i := 0; i < b.N; i++ {
randomShares := make([]*ssvtypes.SSVShare, 500)
first := rand.Intn(len(pubKeys))
for j := 0; j < 500; j++ {
randomShares[j] = shares[pubKeys[(first+j)%len(pubKeys)]]
}

store.handleSharesUpdated(randomShares...)
}
}

0 comments on commit 062d280

Please sign in to comment.