Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

eventhandler: implement handleOperatorRemoved #1796

Open
wants to merge 5 commits into
base: stage
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions eth/eventhandler/event_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,13 @@ import (
"math/big"
"time"

"github.com/ssvlabs/ssv/ekm"

"github.com/attestantio/go-eth2-client/spec/phase0"
ethcommon "github.com/ethereum/go-ethereum/common"
ethtypes "github.com/ethereum/go-ethereum/core/types"
spectypes "github.com/ssvlabs/ssv-spec/types"
"go.uber.org/zap"

"github.com/ssvlabs/ssv/ekm"
"github.com/ssvlabs/ssv/eth/contract"
"github.com/ssvlabs/ssv/eth/eventparser"
"github.com/ssvlabs/ssv/eth/executionclient"
Expand Down
225 changes: 112 additions & 113 deletions eth/eventhandler/event_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,119 +256,6 @@ func TestHandleBlockEventsStream(t *testing.T) {
require.Equal(t, len(ops), len(operators))
})
})
t.Run("test OperatorRemoved event handle", func(t *testing.T) {

// Should return MalformedEventError and no changes to the state
t.Run("test OperatorRemoved incorrect operator ID", func(t *testing.T) {
// Call the contract method
_, err = boundContract.SimcontractTransactor.RemoveOperator(auth, 100500)
require.NoError(t, err)
sim.Commit()

block := <-logs
require.NotEmpty(t, block.Logs)
require.Equal(t, ethcommon.HexToHash("0x0e0ba6c2b04de36d6d509ec5bd155c43a9fe862f8052096dd54f3902a74cca3e"), block.Logs[0].Topics[0])

eventsCh := make(chan executionclient.BlockLogs)
go func() {
defer close(eventsCh)
eventsCh <- block
}()

// Check that there is 1 registered operator
operators, err := eh.nodeStorage.ListOperators(nil, 0, 0)
require.NoError(t, err)
require.Equal(t, len(ops), len(operators))

// Handle the event
lastProcessedBlock, err := eh.HandleBlockEventsStream(eventsCh, false)
require.Equal(t, blockNum+1, lastProcessedBlock)
require.NoError(t, err)
blockNum++

// Check if the operator wasn't removed successfully
operators, err = eh.nodeStorage.ListOperators(nil, 0, 0)
require.NoError(t, err)
require.Equal(t, len(ops), len(operators))
})

// Receive event, unmarshall, parse, check parse event is not nil or with error, operator id is correct
// TODO: fix this test. It checks nothing, due the handleOperatorRemoved method is no-op currently
t.Run("test OperatorRemoved happy flow", func(t *testing.T) {
// Prepare a new operator to remove it later in this test
op, err := createOperators(1, operatorsCount)
require.NoError(t, err)
operatorsCount++

encodedPubKey, err := op[0].privateKey.Public().Base64()
require.NoError(t, err)

// Call the contract method
packedOperatorPubKey, err := eventparser.PackOperatorPublicKey(encodedPubKey)
require.NoError(t, err)
_, err = boundContract.SimcontractTransactor.RegisterOperator(auth, packedOperatorPubKey, big.NewInt(100_000_000))
require.NoError(t, err)

sim.Commit()

block := <-logs
require.NotEmpty(t, block.Logs)
require.Equal(t, ethcommon.HexToHash("0xd839f31c14bd632f424e307b36abff63ca33684f77f28e35dc13718ef338f7f4"), block.Logs[0].Topics[0])

eventsCh := make(chan executionclient.BlockLogs)
go func() {
defer close(eventsCh)
eventsCh <- block
}()

// Check that there is no registered operators
operators, err := eh.nodeStorage.ListOperators(nil, 0, 0)
require.NoError(t, err)
require.Equal(t, len(ops), len(operators))

// Handle OperatorAdded event
lastProcessedBlock, err := eh.HandleBlockEventsStream(eventsCh, false)
require.Equal(t, blockNum+1, lastProcessedBlock)
require.NoError(t, err)
blockNum++
// Check storage for the new operator
operators, err = eh.nodeStorage.ListOperators(nil, 0, 0)
require.NoError(t, err)
require.Equal(t, len(ops)+1, len(operators))

// Now start the OperatorRemoved event handling
// Call the contract method
_, err = boundContract.SimcontractTransactor.RemoveOperator(auth, 4)
require.NoError(t, err)
sim.Commit()

block = <-logs
require.NotEmpty(t, block.Logs)
require.Equal(t, ethcommon.HexToHash("0x0e0ba6c2b04de36d6d509ec5bd155c43a9fe862f8052096dd54f3902a74cca3e"), block.Logs[0].Topics[0])

eventsCh = make(chan executionclient.BlockLogs)
go func() {
defer close(eventsCh)
eventsCh <- block
}()

operators, err = eh.nodeStorage.ListOperators(nil, 0, 0)
require.NoError(t, err)
require.Equal(t, len(ops)+1, len(operators))

// Handle OperatorRemoved event
lastProcessedBlock, err = eh.HandleBlockEventsStream(eventsCh, false)
require.Equal(t, blockNum+1, lastProcessedBlock)
require.NoError(t, err)
blockNum++

// TODO: this should be adjusted when eth/eventhandler/handlers.go#L109 is resolved
// Check if the operator was removed successfully
//operators, err = eh.nodeStorage.ListOperators(nil, 0, 0)
//require.NoError(t, err)
//require.Equal(t, len(ops), len(operators))
})
})

// Receive event, unmarshall, parse, check parse event is not nil or with an error,
// public key is correct, owner is correct, operator ids are correct, shares are correct
Expand Down Expand Up @@ -1346,6 +1233,118 @@ func TestHandleBlockEventsStream(t *testing.T) {
require.False(t, share.Liquidated)
})
})

t.Run("test OperatorRemoved event handle", func(t *testing.T) {

// Should return MalformedEventError and no changes to the state
t.Run("test OperatorRemoved incorrect operator ID", func(t *testing.T) {
// Call the contract method
_, err = boundContract.SimcontractTransactor.RemoveOperator(auth, 100500)
require.NoError(t, err)
sim.Commit()

block := <-logs
require.NotEmpty(t, block.Logs)
require.Equal(t, ethcommon.HexToHash("0x0e0ba6c2b04de36d6d509ec5bd155c43a9fe862f8052096dd54f3902a74cca3e"), block.Logs[0].Topics[0])

eventsCh := make(chan executionclient.BlockLogs)
go func() {
defer close(eventsCh)
eventsCh <- block
}()

// Check that there is 1 registered operator
operators, err := eh.nodeStorage.ListOperators(nil, 0, 0)
require.NoError(t, err)
require.Equal(t, len(ops), len(operators))

// Handle the event
lastProcessedBlock, err := eh.HandleBlockEventsStream(eventsCh, false)
require.Equal(t, blockNum+1, lastProcessedBlock)
require.NoError(t, err)
blockNum++

// Check if the operator wasn't removed successfully
operators, err = eh.nodeStorage.ListOperators(nil, 0, 0)
require.NoError(t, err)
require.Equal(t, len(ops), len(operators))
})

// Receive event, unmarshall, parse, check parse event is not nil or with error, operator id is correct
t.Run("test OperatorRemoved happy flow", func(t *testing.T) {
// Prepare a new operator to remove it later in this test
op, err := createOperators(1, operatorsCount)
require.NoError(t, err)
operatorsCount++

encodedPubKey, err := op[0].privateKey.Public().Base64()
require.NoError(t, err)

// Call the contract method
packedOperatorPubKey, err := eventparser.PackOperatorPublicKey(encodedPubKey)
require.NoError(t, err)
_, err = boundContract.SimcontractTransactor.RegisterOperator(auth, packedOperatorPubKey, big.NewInt(100_000_000))
require.NoError(t, err)

sim.Commit()

block := <-logs
require.NotEmpty(t, block.Logs)
require.Equal(t, ethcommon.HexToHash("0xd839f31c14bd632f424e307b36abff63ca33684f77f28e35dc13718ef338f7f4"), block.Logs[0].Topics[0])

eventsCh := make(chan executionclient.BlockLogs)
go func() {
defer close(eventsCh)
eventsCh <- block
}()

// Check that there is no registered operators
operators, err := eh.nodeStorage.ListOperators(nil, 0, 0)
require.NoError(t, err)
require.Equal(t, len(ops), len(operators))

// Handle OperatorAdded event
lastProcessedBlock, err := eh.HandleBlockEventsStream(eventsCh, false)
require.Equal(t, blockNum+1, lastProcessedBlock)
require.NoError(t, err)
blockNum++
// Check storage for the new operator
operators, err = eh.nodeStorage.ListOperators(nil, 0, 0)
require.NoError(t, err)
require.Equal(t, len(ops)+1, len(operators))

// Now start the OperatorRemoved event handling
// Call the contract method
_, err = boundContract.SimcontractTransactor.RemoveOperator(auth, 4)
require.NoError(t, err)
sim.Commit()

block = <-logs
require.NotEmpty(t, block.Logs)
require.Equal(t, ethcommon.HexToHash("0x0e0ba6c2b04de36d6d509ec5bd155c43a9fe862f8052096dd54f3902a74cca3e"), block.Logs[0].Topics[0])

eventsCh = make(chan executionclient.BlockLogs)
go func() {
defer close(eventsCh)
eventsCh <- block
}()

operators, err = eh.nodeStorage.ListOperators(nil, 0, 0)
require.NoError(t, err)
require.Equal(t, len(ops)+1, len(operators))

// Handle OperatorRemoved event
lastProcessedBlock, err = eh.HandleBlockEventsStream(eventsCh, false)
require.Equal(t, blockNum+1, lastProcessedBlock)
require.NoError(t, err)
blockNum++

// Check if the operator was removed successfully
operators, err = eh.nodeStorage.ListOperators(nil, 0, 0)
require.NoError(t, err)
require.Equal(t, len(ops), len(operators))
})
})
}

func setupEventHandler(t *testing.T, ctx context.Context, logger *zap.Logger, network *networkconfig.NetworkConfig, operator *testOperator, useMockCtrl bool) (*EventHandler, *mocks.MockController, error) {
Expand Down
52 changes: 51 additions & 1 deletion eth/eventhandler/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ func (eh *EventHandler) handleOperatorAdded(txn basedb.Txn, event *contract.Cont
if err != nil {
return fmt.Errorf("could not get operator data by public key: %w", err)
}

if pubkeyExists {
logger.Warn("malformed event: operator public key already exists",
fields.OperatorPubKey(operatorData.PublicKey))
Expand All @@ -79,11 +80,46 @@ func (eh *EventHandler) handleOperatorAdded(txn basedb.Txn, event *contract.Cont
if err != nil {
return fmt.Errorf("save operator data: %w", err)
}

if exists {
logger.Debug("operator data already exists")
return nil
}

var modifiedShares []*ssvtypes.SSVShare
for _, share := range eh.nodeStorage.Shares().List(txn, registrystorage.ByOperatorID(event.OperatorId)) {
if !share.Liquidated {
continue
}

var existingOperatorsCount int
for _, shareMember := range share.Committee {
if shareMember.Signer == event.OperatorId {
existingOperatorsCount++
}

_, ok, err := eh.nodeStorage.GetOperatorData(txn, shareMember.Signer)
if err != nil {
return fmt.Errorf("get operator data: %w", err)
}

if ok {
existingOperatorsCount++
}
}

if existingOperatorsCount == len(share.Committee) {
share.Liquidated = false
modifiedShares = append(modifiedShares, share)
}
}

if len(modifiedShares) > 0 {
if err := eh.nodeStorage.Shares().Save(txn, modifiedShares...); err != nil {
return fmt.Errorf("save shares: %w", err)
}
}

if bytes.Equal(event.PublicKey, eh.operatorDataStore.GetOperatorData().PublicKey) {
eh.operatorDataStore.SetOperatorData(od)
logger = logger.With(zap.Bool("own_operator", true))
Expand Down Expand Up @@ -117,7 +153,21 @@ func (eh *EventHandler) handleOperatorRemoved(txn basedb.Txn, event *contract.Co
fields.Owner(od.OwnerAddress),
)

// This function is currently no-op and it will do nothing. Operator removed event is not used in the current implementation.
if err := eh.nodeStorage.DeleteOperatorData(txn, od.ID); err != nil {
return fmt.Errorf("delete operator data: %w", err)
}

var modifiedShares []*ssvtypes.SSVShare
for _, share := range eh.nodeStorage.Shares().List(txn, registrystorage.ByOperatorID(event.OperatorId)) {
share.Liquidated = true
modifiedShares = append(modifiedShares, share)
}

if len(modifiedShares) > 0 {
if err := eh.nodeStorage.Shares().Save(txn, modifiedShares...); err != nil {
return fmt.Errorf("save shares: %w", err)
}
}

logger.Debug("processed event")
return nil
Expand Down
Loading