Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Compute the stake by quorums #37

Merged
merged 4 commits into from
Nov 17, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 29 additions & 8 deletions core/mock/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@ func NewChainDataMock(numOperators core.OperatorIndex) (*ChainDataMock, error) {
}

func (d *ChainDataMock) GetTotalOperatorState(ctx context.Context, blockNumber uint) *PrivateOperatorState {
return d.GetTotalOperatorStateWithQuorums(ctx, blockNumber, []core.QuorumID{})
}

func (d *ChainDataMock) GetTotalOperatorStateWithQuorums(ctx context.Context, blockNumber uint, quorums []core.QuorumID) *PrivateOperatorState {
indexedOperators := make(map[core.OperatorID]*core.IndexedOperatorInfo, d.NumOperators)
storedOperators := make(map[core.OperatorID]*core.OperatorInfo)
privateOperators := make(map[core.OperatorID]*PrivateOperatorInfo, d.NumOperators)
Expand All @@ -66,7 +70,6 @@ func (d *ChainDataMock) GetTotalOperatorState(ctx context.Context, blockNumber u
quorumStake := 0

for ind := core.OperatorIndex(0); ind < d.NumOperators; ind++ {

if ind == 0 {
key := d.KeyPairs[ind].GetPubKeyG1()
aggPubKey = key.Deserialize(key.Serialize())
Expand Down Expand Up @@ -124,12 +127,30 @@ func (d *ChainDataMock) GetTotalOperatorState(ctx context.Context, blockNumber u
},
}

if len(quorums) > 0 {
totals = make(map[core.QuorumID]*core.OperatorInfo)
for _, id := range quorums {
totals[id] = &core.OperatorInfo{
Stake: big.NewInt(int64(quorumStake)),
Index: d.NumOperators,
}
}
}

operators := map[core.QuorumID]map[core.OperatorID]*core.OperatorInfo{
0: storedOperators,
1: storedOperators,
2: storedOperators,
}
if len(quorums) > 0 {
operators = make(map[core.QuorumID]map[core.OperatorID]*core.OperatorInfo)
for _, id := range quorums {
operators[id] = storedOperators
}
}

operatorState := &core.OperatorState{
Operators: map[core.QuorumID]map[core.OperatorID]*core.OperatorInfo{
0: storedOperators,
1: storedOperators,
2: storedOperators,
},
Operators: operators,
Totals: totals,
BlockNumber: blockNumber,
}
Expand All @@ -156,7 +177,7 @@ func (d *ChainDataMock) GetTotalOperatorState(ctx context.Context, blockNumber u

func (d *ChainDataMock) GetOperatorState(ctx context.Context, blockNumber uint, quorums []core.QuorumID) (*core.OperatorState, error) {

state := d.GetTotalOperatorState(ctx, blockNumber)
state := d.GetTotalOperatorStateWithQuorums(ctx, blockNumber, quorums)

return state.OperatorState, nil

Expand All @@ -172,7 +193,7 @@ func (d *ChainDataMock) GetOperatorStateByOperator(ctx context.Context, blockNum

func (d *ChainDataMock) GetIndexedOperatorState(ctx context.Context, blockNumber uint, quorums []core.QuorumID) (*core.IndexedOperatorState, error) {

state := d.GetTotalOperatorState(ctx, blockNumber)
state := d.GetTotalOperatorStateWithQuorums(ctx, blockNumber, quorums)

return state.IndexedOperatorState, nil

Expand Down
62 changes: 10 additions & 52 deletions disperser/dataapi/metrics_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"math/big"
"time"

"github.com/gammazero/workerpool"
"github.com/Layr-Labs/eigenda/core"
)

const (
Expand All @@ -17,19 +17,21 @@ const (
)

func (s *server) getMetric(ctx context.Context, startTime int64, endTime int64, limit int) (*Metric, error) {
operators, err := s.subgraphClient.QueryOperatorsWithLimit(ctx, limit)
Copy link
Contributor Author

@jianoaix jianoaix Nov 16, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not only inefficient, but also incorrect: if we have more than 10 operators (the default limit), we will not get the total stake.

if err != nil {
return nil, err
}

blockNumber, err := s.transactor.GetCurrentBlockNumber(ctx)
if err != nil {
return nil, fmt.Errorf("failed to get current block number: %w", err)
}
totalStake, err := s.calculateTotalStake(operators, blockNumber)
operatorState, err := s.chainState.GetOperatorState(ctx, uint(blockNumber), []core.QuorumID{core.QuorumID(0)})
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gpsanant we are counting on this to get the stakes onchain, let me know if it's not reliable, we'd otherwise merge this PR

if err != nil {
return nil, err
}
if len(operatorState.Operators) != 1 {
return nil, fmt.Errorf("Requesting for one quorum (quorumID=0), but got %v", operatorState.Operators)
}
totalStake := big.NewInt(0)
for _, op := range operatorState.Operators[0] {
totalStake.Add(totalStake, op.Stake)
}

result, err := s.promClient.QueryDisperserBlobSizeBytesPerSecond(ctx, time.Unix(startTime, 0), time.Unix(endTime, 0))
if err != nil {
Expand All @@ -56,7 +58,7 @@ func (s *server) getMetric(ctx context.Context, startTime int64, endTime int64,
return &Metric{
Throughput: troughput,
CostInWei: costInWei,
TotalStake: uint64(totalStake),
TotalStake: totalStake.Uint64(),
}, nil
}

Expand All @@ -73,50 +75,6 @@ func (s *server) getThroughput(ctx context.Context, start int64, end int64) ([]*
return calculateAverageThroughput(result.Values, avgThroughputWindowSize), nil
}

func (s *server) calculateTotalStake(operators []*Operator, blockNumber uint32) (int64, error) {
var (
totalStakeByOperatorChan = make(chan *big.Int, len(operators))
pool = workerpool.New(maxWorkersGetOperatorState)
)

s.logger.Debug("Number of operators to calculate stake:", len(operators))
for _, o := range operators {
operatorId, err := ConvertHexadecimalToBytes(o.OperatorId)
if err != nil {
s.logger.Error("Failed to convert operator id to hex string: ", "operatorId", operatorId, "err", err)
return 0, err
}

pool.Submit(func() {
operatorState, err := s.chainState.GetOperatorStateByOperator(context.Background(), uint(blockNumber), operatorId)
if err != nil {
s.logger.Error("Failed to get operator state: ", "operatorId", operatorId, "blockNumber", blockNumber, "err", err)
totalStakeByOperatorChan <- big.NewInt(-1)
return
}
totalStake := big.NewInt(0)
s.logger.Debug("Operator state:", "operatorId", operatorId, "num quorums", len(operatorState.Totals))
for quorumId, total := range operatorState.Totals {
s.logger.Debug("Operator stake:", "operatorId", operatorId, "quorum", quorumId, "stake", (*total.Stake).Int64())
totalStake.Add(totalStake, total.Stake)
}
totalStakeByOperatorChan <- totalStake
})
}

pool.StopWait()
close(totalStakeByOperatorChan)

totalStake := big.NewInt(0)
for total := range totalStakeByOperatorChan {
if total.Int64() == -1 {
return 0, errors.New("error getting operator state")
}
totalStake.Add(totalStake, total)
}
return totalStake.Int64(), nil
}

func (s *server) calculateTotalCostGasUsedInWei(ctx context.Context) (uint64, error) {
batches, err := s.subgraphClient.QueryBatchesWithLimit(ctx, 1, 0)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion disperser/dataapi/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ func TestFetchMetricsHandler(t *testing.T) {
assert.Equal(t, http.StatusOK, res.StatusCode)
assert.Equal(t, 16555.555555555555, response.Throughput)
assert.Equal(t, uint64(85144853442), response.CostInWei)
assert.Equal(t, uint64(6), response.TotalStake)
assert.Equal(t, uint64(1), response.TotalStake)
}

func TestFetchMetricsTroughputHandler(t *testing.T) {
Expand Down
Loading