Skip to content

Commit

Permalink
merge alan/no-fork
Browse files Browse the repository at this point in the history
  • Loading branch information
guy muroch committed Jul 16, 2024
2 parents 5413f09 + d124eff commit 919b6d5
Show file tree
Hide file tree
Showing 116 changed files with 5,926 additions and 2,026 deletions.
6 changes: 3 additions & 3 deletions .gitlab-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ Build stage Docker image:
- docker tag $IMAGE_NAME:$CI_COMMIT_SHA $DOCKER_REPO_INFRA_STAGE:$CI_COMMIT_SHA
- $DOCKER_LOGIN_TO_INFRA_STAGE_REPO && docker push $DOCKER_REPO_INFRA_STAGE:$CI_COMMIT_SHA
only:
- gm/exporter-alan-merge
- alan/no-fork


# +---------------------+
Expand Down Expand Up @@ -84,7 +84,7 @@ Deploy nodes to hetzner stage:
- .k8/hetzner-stage/scripts/deploy-cluster-65--68.sh $DOCKER_REPO_INFRA_STAGE $CI_COMMIT_SHA ssv $APP_REPLICAS_INFRA_STAGE hetzner.stage.k8s.local hetzner.stage.k8s.local stage.ssv.network $K8S_API_VERSION $STAGE_HEALTH_CHECK_IMAGE $SSV_NODES_CPU_LIMIT $SSV_NODES_MEM_LIMIT
- .k8/hetzner-stage/scripts/deploy-cluster-69--72.sh $DOCKER_REPO_INFRA_STAGE $CI_COMMIT_SHA ssv $APP_REPLICAS_INFRA_STAGE hetzner.stage.k8s.local hetzner.stage.k8s.local stage.ssv.network $K8S_API_VERSION $STAGE_HEALTH_CHECK_IMAGE $SSV_NODES_CPU_LIMIT $SSV_NODES_MEM_LIMIT
only:
- stage
- alan/no-fork

Deploy exporter to hetzner stage:
stage: deploy
Expand All @@ -101,7 +101,7 @@ Deploy exporter to hetzner stage:
- kubectl config get-contexts
- .k8/hetzner-stage/scripts/deploy-holesky-exporters.sh $DOCKER_REPO_INFRA_STAGE $CI_COMMIT_SHA ssv $APP_REPLICAS_INFRA_STAGE hetzner.stage.k8s.local hetzner.stage.k8s.local stage.ssv.network $K8S_API_VERSION $SSV_EXPORTER_CPU_LIMIT $SSV_EXPORTER_MEM_LIMIT
only:
- gm/exporter-alan-merge
- alan/no-fork

# +---------------+
# | Prod |
Expand Down
1 change: 1 addition & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -66,3 +66,4 @@ EXPOSE 5678 5000 4000/udp
ENV GODEBUG="netdns=go"

#ENTRYPOINT ["/go/bin/ssvnode"]

27 changes: 13 additions & 14 deletions cli/operator/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"encoding/base64"
"fmt"
"github.com/ssvlabs/ssv/exporter/exporter_message"
"log"
"math/big"
"net/http"
Expand All @@ -30,6 +29,7 @@ import (
"github.com/ssvlabs/ssv/eth/localevents"
exporterapi "github.com/ssvlabs/ssv/exporter/api"
"github.com/ssvlabs/ssv/exporter/api/decided"
"github.com/ssvlabs/ssv/exporter/convert"
ibftstorage "github.com/ssvlabs/ssv/ibft/storage"
ssv_identity "github.com/ssvlabs/ssv/identity"
"github.com/ssvlabs/ssv/logging"
Expand Down Expand Up @@ -262,6 +262,7 @@ var StartNodeCmd = &cobra.Command{
cfg.SSVOptions.ValidatorOptions.Beacon = consensusClient
cfg.SSVOptions.ValidatorOptions.BeaconSigner = keyManager
cfg.SSVOptions.ValidatorOptions.ValidatorsMap = validatorsMap
cfg.SSVOptions.ValidatorOptions.NetworkConfig = networkConfig

cfg.SSVOptions.ValidatorOptions.OperatorDataStore = operatorDataStore
cfg.SSVOptions.ValidatorOptions.RegistryStorage = nodeStorage
Expand All @@ -272,20 +273,20 @@ var StartNodeCmd = &cobra.Command{
ws := exporterapi.NewWsServer(cmd.Context(), nil, http.NewServeMux(), cfg.WithPing)
cfg.SSVOptions.WS = ws
cfg.SSVOptions.WsAPIPort = cfg.WsAPIPort
cfg.SSVOptions.ValidatorOptions.NewDecidedHandler = decided.NewStreamPublisher(logger, ws, cfg.SSVOptions.ValidatorOptions.UseNewExporterAPI)
cfg.SSVOptions.ValidatorOptions.NewDecidedHandler = decided.NewStreamPublisher(logger, ws)

Check failure on line 276 in cli/operator/node.go

View workflow job for this annotation

GitHub Actions / lint

not enough arguments in call to decided.NewStreamPublisher

Check failure on line 276 in cli/operator/node.go

View workflow job for this annotation

GitHub Actions / lint

not enough arguments in call to decided.NewStreamPublisher

Check failure on line 276 in cli/operator/node.go

View workflow job for this annotation

GitHub Actions / test

not enough arguments in call to decided.NewStreamPublisher
}

cfg.SSVOptions.ValidatorOptions.DutyRoles = []spectypes.BeaconRole{spectypes.BNRoleAttester} // TODO could be better to set in other place

storageRoles := []exporter_message.RunnerRole{
exporter_message.RoleCommittee,
exporter_message.RoleAttester,
exporter_message.RoleProposer,
exporter_message.RoleSyncCommittee,
exporter_message.RoleAggregator,
exporter_message.RoleSyncCommitteeContribution,
exporter_message.RoleValidatorRegistration,
exporter_message.RoleVoluntaryExit,
storageRoles := []convert.RunnerRole{
convert.RoleCommittee,
convert.RoleAttester,
convert.RoleProposer,
convert.RoleSyncCommittee,
convert.RoleAggregator,
convert.RoleSyncCommitteeContribution,
convert.RoleValidatorRegistration,
convert.RoleVoluntaryExit,
}

storageMap := ibftstorage.NewStores()
Expand All @@ -304,7 +305,7 @@ var StartNodeCmd = &cobra.Command{
cfg.SSVOptions.ValidatorController = validatorCtrl
cfg.SSVOptions.ValidatorStore = validatorStore

operatorNode = operator.New(logger, cfg.SSVOptions, slotTickerProvider)
operatorNode = operator.New(logger, cfg.SSVOptions, slotTickerProvider, storageMap)

if cfg.MetricsAPIPort > 0 {
go startMetricsHandler(cmd.Context(), logger, db, metricsReporter, cfg.MetricsAPIPort, cfg.EnableProfile)
Expand Down Expand Up @@ -559,8 +560,6 @@ func setupSSVNetwork(logger *zap.Logger) (networkconfig.NetworkConfig, error) {
return networkconfig.NetworkConfig{}, err
}

types.SetDefaultDomain(networkConfig.Domain)

nodeType := "light"
if cfg.SSVOptions.ValidatorOptions.FullNode {
nodeType = "full"
Expand Down
45 changes: 34 additions & 11 deletions e2e/logs_catcher/matcher_bls.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,27 @@ import (
"github.com/ssvlabs/ssv/e2e/logs_catcher/docker"
)

const (
targetContainer = "ssv-node-1"

verifySignatureErr = "failed processing consensus message: could not process msg: invalid signed message: msg signature invalid: failed to verify signature"
reconstructSignatureErr = "could not reconstruct post consensus signature: could not reconstruct beacon sig: failed to verify reconstruct signature: could not reconstruct a valid signature"
pastRoundErr = "failed processing consensus message: could not process msg: invalid signed message: past round"
reconstructSignaturesSuccess = "reconstructed partial signatures"
submittedAttSuccess = "✅ successfully submitted attestation"
gotDutiesSuccess = "🗂 got duties"

msgHeightField = "\"msg_height\":%d"
msgRoundField = "\"msg_round\":%d"
msgTypeField = "\"msg_type\":\"%s\""
consensusMsgTypeField = "\"consensus_msg_type\":%d"
signersField = "\"signers\":[%d]"
errorField = "\"error\":\"%s\""
dutyIDField = "\"duty_id\":\"%s\""
roleField = "\"role\":\"%s\""
slotField = "\"slot\":%d"
)

type logCondition struct {
role string
slot phase0.Slot
Expand All @@ -40,7 +61,7 @@ func VerifyBLSSignature(pctx context.Context, logger *zap.Logger, cli DockerCLI,
defer startc()

validatorIndex := fmt.Sprintf("v%d", share.ValidatorIndex)
conditionLog, err := StartCondition(startctx, logger, []string{gotDutiesSuccess, validatorIndex}, ssvNodesContainers[0], cli)
conditionLog, err := StartCondition(startctx, logger, []string{gotDutiesSuccess, validatorIndex}, targetContainer, cli)
if err != nil {
return fmt.Errorf("failed to start condition: %w", err)
}
Expand All @@ -60,7 +81,7 @@ func VerifyBLSSignature(pctx context.Context, logger *zap.Logger, cli DockerCLI,
leader := DetermineLeader(dutySlot, committee)
logger.Debug("Leader: ", zap.Uint64("leader", leader))

_, err = StartCondition(startctx, logger, []string{submittedAttSuccess, share.ValidatorPubKey}, ssvNodesContainers[0], cli)
_, err = StartCondition(startctx, logger, []string{submittedAttSuccess, share.ValidatorPubKey}, targetContainer, cli)
if err != nil {
return fmt.Errorf("failed to start condition: %w", err)
}
Expand Down Expand Up @@ -91,7 +112,10 @@ func ParseAndExtractDutyInfo(conditionLog string, corruptedValidatorIndex string
}

func DetermineLeader(dutySlot phase0.Slot, committee []*types.CommitteeMember) types.OperatorID {
leader := qbft.RoundRobinProposer(&qbft.State{Height: qbft.Height(dutySlot), CommitteeMember: committee[0]}, qbft.FirstRound)
share := &types.Operator{
Committee: committee,
}
leader := qbft.RoundRobinProposer(&qbft.State{Height: qbft.Height(dutySlot), Share: share}, qbft.FirstRound)

return leader
}
Expand Down Expand Up @@ -139,7 +163,7 @@ func processNonCorruptedOperatorLogs(ctx context.Context, logger *zap.Logger, cl
msgType: types.SSVConsensusMsgType,
consensusMsgType: qbft.ProposalMsgType,
signer: corruptedOperator,
error: failedVerifySigLog,
error: verifySignatureErr,
},
{
role: types.BNRoleAttester.String(),
Expand All @@ -157,7 +181,7 @@ func processNonCorruptedOperatorLogs(ctx context.Context, logger *zap.Logger, cl
msgType: types.SSVConsensusMsgType,
consensusMsgType: qbft.RoundChangeMsgType,
signer: corruptedOperator,
error: failedVerifySigLog,
error: verifySignatureErr,
},
{
role: types.BNRoleAttester.String(),
Expand All @@ -166,7 +190,7 @@ func processNonCorruptedOperatorLogs(ctx context.Context, logger *zap.Logger, cl
msgType: types.SSVConsensusMsgType,
consensusMsgType: qbft.PrepareMsgType,
signer: corruptedOperator,
error: failedVerifySigLog,
error: verifySignatureErr,
},
// TODO: handle decided failed signature
}
Expand All @@ -179,7 +203,7 @@ func processNonCorruptedOperatorLogs(ctx context.Context, logger *zap.Logger, cl
msgType: types.SSVConsensusMsgType,
consensusMsgType: qbft.PrepareMsgType,
signer: corruptedOperator,
error: failedVerifySigLog,
error: verifySignatureErr,
},
{
role: types.BNRoleAttester.String(),
Expand All @@ -188,7 +212,7 @@ func processNonCorruptedOperatorLogs(ctx context.Context, logger *zap.Logger, cl
msgType: types.SSVConsensusMsgType,
consensusMsgType: qbft.CommitMsgType,
signer: corruptedOperator,
error: failedVerifySigLog,
error: verifySignatureErr,
},
// TODO: handle decided failed signature
}
Expand Down Expand Up @@ -222,11 +246,10 @@ func matchSingleConditionLog(ctx context.Context, logger *zap.Logger, cli Docker
}

filteredLogs := res.Grep(first)

logger.Info("matched", zap.Int("count", len(filteredLogs)), zap.String("target", target), zap.Strings("match_string", first))

if len(filteredLogs) != 1 {
return fmt.Errorf("found non matching messages on (1) %v, want %v got %v", target, 1, len(filteredLogs))
return fmt.Errorf("found non matching messages on %v, want %v got %v", target, 1, len(filteredLogs))
}

return nil
Expand Down Expand Up @@ -265,7 +288,7 @@ func matchDualConditionLog(ctx context.Context, logger *zap.Logger, cli DockerCL
logger.Info("matched", zap.Int("count", len(filteredLogs)), zap.String("target", target), zap.Strings("match_string", fail))

if len(filteredLogs) != 1 {
return fmt.Errorf("found non matching messages on (3) %v, want %v got %v", target, 1, len(filteredLogs))
return fmt.Errorf("found non matching messages on %v, want %v got %v", target, 1, len(filteredLogs))
}
}

Expand Down
2 changes: 2 additions & 0 deletions e2e/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ save_logs() {
export BEACON_NODE_URL=http://prod-standalone-holesky.bloxinfra.com:5052
export EXECUTION_NODE_URL=ws://prod-standalone-holesky.bloxinfra.com:8548/ws

export BEACON_NODE_URL=http://bn-h-2.stage.bloxinfra.com:3502/
export EXECUTION_NODE_URL=ws://bn-h-2.stage.bloxinfra.com:8557/ws
# Step 1: Start the beacon_proxy and ssv-node services
docker compose up -d --build beacon_proxy ssv-node-1 ssv-node-2 ssv-node-3 ssv-node-4

Expand Down
15 changes: 8 additions & 7 deletions eth/eventhandler/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ import (
"encoding/hex"
"errors"
"fmt"
"github.com/ssvlabs/ssv/exporter/exporter_message"

"github.com/ssvlabs/ssv/exporter/convert"

"github.com/attestantio/go-eth2-client/spec/phase0"
ethcommon "github.com/ethereum/go-ethereum/common"
Expand Down Expand Up @@ -373,8 +374,8 @@ func (eh *EventHandler) handleValidatorRemoved(txn basedb.Txn, event *contract.C
return emptyPK, &MalformedEventError{Err: ErrShareBelongsToDifferentOwner}
}

removeDecidedMessages := func(role exporter_message.RunnerRole, store qbftstorage.QBFTStore) error {
messageID := exporter_message.NewMsgID(eh.networkConfig.Domain, share.ValidatorPubKey[:], role)
removeDecidedMessages := func(role convert.RunnerRole, store qbftstorage.QBFTStore) error {
messageID := convert.NewMsgID(eh.networkConfig.Domain, share.ValidatorPubKey[:], role)
return store.CleanAllInstances(logger, messageID[:])
}
err := eh.storageMap.Each(removeDecidedMessages)
Expand Down Expand Up @@ -511,10 +512,6 @@ func (eh *EventHandler) handleValidatorExited(txn basedb.Txn, event *contract.Co
return nil, &MalformedEventError{Err: ErrShareBelongsToDifferentOwner}
}

if !share.BelongsToOperator(eh.operatorDataStore.GetOperatorID()) {
return nil, nil
}

if share.BeaconMetadata == nil {
return nil, nil
}
Expand All @@ -523,10 +520,14 @@ func (eh *EventHandler) handleValidatorExited(txn basedb.Txn, event *contract.Co
copy(pk[:], share.ValidatorPubKey[:])

ed := &duties.ExitDescriptor{
OwnValidator: false,
PubKey: pk,
ValidatorIndex: share.BeaconMetadata.Index,
BlockNumber: event.Raw.BlockNumber,
}
if share.BelongsToOperator(eh.operatorDataStore.GetOperatorID()) {
ed.OwnValidator = true
}

return ed, nil
}
Expand Down
3 changes: 0 additions & 3 deletions eth/executionclient/execution_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,9 +221,6 @@ func (ec *ExecutionClient) StreamLogs(ctx context.Context, fromBlock uint64) <-c

// Healthy returns if execution client is currently healthy: responds to requests and not in the syncing state.
func (ec *ExecutionClient) Healthy(ctx context.Context) error {
// TODO ALAN: revert
return nil

if ec.isClosed() {
return ErrClosed
}
Expand Down
14 changes: 7 additions & 7 deletions exporter/api/msg.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,24 +40,24 @@ type ParticipantsAPI struct {
}

// NewParticipantsAPIMsg creates a new message in a new format from the given message.
func NewParticipantsAPIMsg(msgs ...qbftstorage.ParticipantsRangeEntry) Message {
data, err := ParticipantsAPIData(msgs...)
func NewParticipantsAPIMsg(msg qbftstorage.ParticipantsRangeEntry) Message {
data, err := ParticipantsAPIData(msg)
if err != nil {
return Message{
Type: TypeParticipants,
Data: []string{},
}
}
identifier := specqbft.ControllerIdToMessageID(msgs[0].Identifier[:])
identifier := specqbft.ControllerIdToMessageID(msg.Identifier[:])
pkv := identifier.GetDutyExecutorID()

return Message{
Type: TypeDecided,
Filter: MessageFilter{
PublicKey: hex.EncodeToString(pkv),
From: uint64(msgs[0].Slot),
To: uint64(msgs[len(msgs)-1].Slot),
Role: msgs[0].Identifier.GetRoleType().String(),
From: uint64(msg.Slot),
To: uint64(msg.Slot),
Role: msg.Identifier.GetRoleType().String(),
},
Data: data,
}
Expand Down Expand Up @@ -100,7 +100,7 @@ type MessageFilter struct {
To uint64 `json:"to"`
// Role is the duty type, optional as it's relevant for IBFT data
Role string `json:"role,omitempty"`
// PublicKeys is optional, used for fetching decided messages or information about specific validator/operator
// PublicKey is optional, used for fetching decided messages or information about specific validator/operator
PublicKey string `json:"publicKey,omitempty"`
}

Expand Down
16 changes: 8 additions & 8 deletions exporter/api/query_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,15 @@ package api
import (
"encoding/hex"
"fmt"

"github.com/attestantio/go-eth2-client/spec/phase0"
spectypes "github.com/ssvlabs/ssv-spec/types"
"github.com/ssvlabs/ssv/exporter/exporter_message"
"github.com/ssvlabs/ssv/logging/fields"
"github.com/ssvlabs/ssv/protocol/v2/message"
"github.com/ssvlabs/ssv/protocol/v2/types"
"go.uber.org/zap"

"github.com/ssvlabs/ssv/exporter/convert"
"github.com/ssvlabs/ssv/ibft/storage"
"github.com/ssvlabs/ssv/logging/fields"
"github.com/ssvlabs/ssv/protocol/v2/message"
)

const (
Expand Down Expand Up @@ -47,7 +47,7 @@ func HandleUnknownQuery(logger *zap.Logger, nm *NetworkMessage) {
}

// HandleParticipantsQuery handles TypeParticipants queries.
func HandleParticipantsQuery(logger *zap.Logger, qbftStorage *storage.QBFTStores, nm *NetworkMessage) {
func HandleParticipantsQuery(logger *zap.Logger, qbftStorage *storage.QBFTStores, nm *NetworkMessage, domain spectypes.DomainType) {
logger.Debug("handles query request",
zap.Uint64("from", nm.Msg.Filter.From),
zap.Uint64("to", nm.Msg.Filter.To),
Expand All @@ -72,16 +72,16 @@ func HandleParticipantsQuery(logger *zap.Logger, qbftStorage *storage.QBFTStores
nm.Msg = res
return
}
runnerRole := exporter_message.RunnerRole(beaconRole)
runnerRole := convert.RunnerRole(beaconRole)
roleStorage := qbftStorage.Get(runnerRole)
if roleStorage == nil {
logger.Warn("role storage doesn't exist", fields.Role(spectypes.RunnerRole(runnerRole)))
logger.Warn("role storage doesn't exist", fields.ExporterRole(runnerRole))
res.Data = []string{"internal error - role storage doesn't exist", beaconRole.String()}
nm.Msg = res
return
}

msgID := exporter_message.NewMsgID(types.GetDefaultDomain(), pkRaw, runnerRole)
msgID := convert.NewMsgID(domain, pkRaw, runnerRole)
from := phase0.Slot(nm.Msg.Filter.From)
to := phase0.Slot(nm.Msg.Filter.To)
participantsList, err := roleStorage.GetParticipantsInRange(msgID, from, to)
Expand Down
Loading

0 comments on commit 919b6d5

Please sign in to comment.