diff --git a/api/stream.go b/api/stream.go index cce81105..80acfce0 100644 --- a/api/stream.go +++ b/api/stream.go @@ -11,7 +11,6 @@ import ( "github.com/onflow/go-ethereum/eth/filters" "github.com/onflow/go-ethereum/rpc" "github.com/rs/zerolog" - "github.com/sethvargo/go-limiter" "github.com/onflow/flow-evm-gateway/config" "github.com/onflow/flow-evm-gateway/models" @@ -25,10 +24,9 @@ type StreamAPI struct { blocks storage.BlockIndexer transactions storage.TransactionIndexer receipts storage.ReceiptIndexer - blocksPublisher *models.Publisher - transactionsPublisher *models.Publisher - logsPublisher *models.Publisher - ratelimiter limiter.Store + blocksPublisher *models.Publisher[*models.Block] + transactionsPublisher *models.Publisher[*gethTypes.Transaction] + logsPublisher *models.Publisher[[]*gethTypes.Log] } func NewStreamAPI( @@ -37,10 +35,9 @@ func NewStreamAPI( blocks storage.BlockIndexer, transactions storage.TransactionIndexer, receipts storage.ReceiptIndexer, - blocksPublisher *models.Publisher, - transactionsPublisher *models.Publisher, - logsPublisher *models.Publisher, - ratelimiter limiter.Store, + blocksPublisher *models.Publisher[*models.Block], + transactionsPublisher *models.Publisher[*gethTypes.Transaction], + logsPublisher *models.Publisher[[]*gethTypes.Log], ) *StreamAPI { return &StreamAPI{ logger: logger, @@ -51,22 +48,17 @@ func NewStreamAPI( blocksPublisher: blocksPublisher, transactionsPublisher: transactionsPublisher, logsPublisher: logsPublisher, - ratelimiter: ratelimiter, } } // NewHeads send a notification each time a new block is appended to the chain. func (s *StreamAPI) NewHeads(ctx context.Context) (*rpc.Subscription, error) { - return s.newSubscription( + return newSubscription( ctx, + s.logger, s.blocksPublisher, - func(notifier *rpc.Notifier, sub *rpc.Subscription) func(any) error { - return func(data any) error { - block, ok := data.(*models.Block) - if !ok { - return fmt.Errorf("invalid data sent to block subscription: %s", sub.ID) - } - + func(notifier *rpc.Notifier, sub *rpc.Subscription) func(block *models.Block) error { + return func(block *models.Block) error { h, err := block.Hash() if err != nil { return err @@ -93,16 +85,12 @@ func (s *StreamAPI) NewHeads(ctx context.Context) (*rpc.Subscription, error) { // transaction enters the transaction pool. If fullTx is true the full tx is // sent to the client, otherwise the hash is sent. func (s *StreamAPI) NewPendingTransactions(ctx context.Context, fullTx *bool) (*rpc.Subscription, error) { - return s.newSubscription( + return newSubscription( ctx, + s.logger, s.transactionsPublisher, - func(notifier *rpc.Notifier, sub *rpc.Subscription) func(any) error { - return func(data any) error { - tx, ok := data.(*gethTypes.Transaction) - if !ok { - return fmt.Errorf("invalid data sent to pending transaction subscription: %s", sub.ID) - } - + func(notifier *rpc.Notifier, sub *rpc.Subscription) func(*gethTypes.Transaction) error { + return func(tx *gethTypes.Transaction) error { if fullTx != nil && *fullTx { return notifier.Notify(sub.ID, tx) } @@ -120,16 +108,12 @@ func (s *StreamAPI) Logs(ctx context.Context, criteria filters.FilterCriteria) ( return nil, fmt.Errorf("failed to create log subscription filter: %w", err) } - return s.newSubscription( + return newSubscription( ctx, + s.logger, s.logsPublisher, - func(notifier *rpc.Notifier, sub *rpc.Subscription) func(any) error { - return func(data any) error { - allLogs, ok := data.([]*gethTypes.Log) - if !ok { - return fmt.Errorf("invalid data sent to log subscription: %s", sub.ID) - } - + func(notifier *rpc.Notifier, sub *rpc.Subscription) func([]*gethTypes.Log) error { + return func(allLogs []*gethTypes.Log) error { for _, log := range allLogs { // todo we could optimize this matching for cases where we have multiple subscriptions // using the same filter criteria, we could only filter once and stream to all subscribers @@ -148,10 +132,11 @@ func (s *StreamAPI) Logs(ctx context.Context, criteria filters.FilterCriteria) ( ) } -func (s *StreamAPI) newSubscription( +func newSubscription[T any]( ctx context.Context, - publisher *models.Publisher, - callback func(notifier *rpc.Notifier, sub *rpc.Subscription) func(any) error, + logger zerolog.Logger, + publisher *models.Publisher[T], + callback func(notifier *rpc.Notifier, sub *rpc.Subscription) func(T) error, ) (*rpc.Subscription, error) { notifier, supported := rpc.NotifierFromContext(ctx) if !supported { @@ -162,8 +147,10 @@ func (s *StreamAPI) newSubscription( subs := models.NewSubscription(callback(notifier, rpcSub)) - rpcSub.ID = rpc.ID(subs.ID().String()) - l := s.logger.With().Str("subscription-id", subs.ID().String()).Logger() + l := logger.With(). + Str("gateway-subscription-id", fmt.Sprintf("%p", subs)). + Str("ethereum-subscription-id", string(rpcSub.ID)). + Logger() publisher.Subscribe(subs) diff --git a/bootstrap/bootstrap.go b/bootstrap/bootstrap.go index 05bfbb91..c524fb1c 100644 --- a/bootstrap/bootstrap.go +++ b/bootstrap/bootstrap.go @@ -10,6 +10,7 @@ import ( "github.com/onflow/flow-go-sdk/access" "github.com/onflow/flow-go-sdk/access/grpc" "github.com/onflow/flow-go-sdk/crypto" + gethTypes "github.com/onflow/go-ethereum/core/types" "github.com/rs/zerolog" "github.com/sethvargo/go-limiter/memorystore" grpcOpts "google.golang.org/grpc" @@ -36,9 +37,9 @@ type Storages struct { } type Publishers struct { - Block *models.Publisher - Transaction *models.Publisher - Logs *models.Publisher + Block *models.Publisher[*models.Block] + Transaction *models.Publisher[*gethTypes.Transaction] + Logs *models.Publisher[[]*gethTypes.Log] } type Bootstrap struct { @@ -72,9 +73,9 @@ func New(config *config.Config) (*Bootstrap, error) { return &Bootstrap{ publishers: &Publishers{ - Block: models.NewPublisher(), - Transaction: models.NewPublisher(), - Logs: models.NewPublisher(), + Block: models.NewPublisher[*models.Block](), + Transaction: models.NewPublisher[*gethTypes.Transaction](), + Logs: models.NewPublisher[[]*gethTypes.Log](), }, storages: storages, logger: logger, @@ -209,7 +210,11 @@ func (b *Bootstrap) StartAPIServer(ctx context.Context) error { } // create transaction pool - txPool := requester.NewTxPool(b.client, b.publishers.Transaction, b.logger) + txPool := requester.NewTxPool( + b.client, + b.publishers.Transaction, + b.logger, + ) evm, err := requester.NewEVM( b.client, @@ -260,7 +265,6 @@ func (b *Bootstrap) StartAPIServer(ctx context.Context) error { b.publishers.Block, b.publishers.Transaction, b.publishers.Logs, - ratelimiter, ) pullAPI := api.NewPullAPI( diff --git a/go.mod b/go.mod index d1fcac7f..81a30134 100644 --- a/go.mod +++ b/go.mod @@ -6,7 +6,6 @@ require ( cloud.google.com/go/storage v1.36.0 github.com/cockroachdb/pebble v1.1.1 github.com/goccy/go-json v0.10.2 - github.com/google/uuid v1.6.0 github.com/hashicorp/golang-lru/v2 v2.0.7 github.com/onflow/atree v0.8.0-rc.6 github.com/onflow/cadence v1.0.0 @@ -82,6 +81,7 @@ require ( github.com/golang/protobuf v1.5.4 // indirect github.com/golang/snappy v0.0.5-0.20220116011046-fa5810519dcb // indirect github.com/google/s2a-go v0.1.7 // indirect + github.com/google/uuid v1.6.0 // indirect github.com/googleapis/enterprise-certificate-proxy v0.3.2 // indirect github.com/googleapis/gax-go/v2 v2.12.0 // indirect github.com/gorilla/websocket v1.5.0 // indirect diff --git a/models/stream.go b/models/stream.go index c2befa13..6af69a20 100644 --- a/models/stream.go +++ b/models/stream.go @@ -1,76 +1,74 @@ package models import ( + "fmt" "sync" - - "github.com/google/uuid" ) -type Publisher struct { +type Publisher[T any] struct { mux sync.RWMutex - subscribers map[uuid.UUID]Subscriber + subscribers map[Subscriber[T]]struct{} } -func NewPublisher() *Publisher { - return &Publisher{ +func NewPublisher[T any]() *Publisher[T] { + return &Publisher[T]{ mux: sync.RWMutex{}, - subscribers: make(map[uuid.UUID]Subscriber), + subscribers: make(map[Subscriber[T]]struct{}), } } -func (p *Publisher) Publish(data any) { +func (p *Publisher[T]) Publish(data T) { p.mux.RLock() defer p.mux.RUnlock() - for _, s := range p.subscribers { + for s := range p.subscribers { s.Notify(data) } } -func (p *Publisher) Subscribe(s Subscriber) { +func (p *Publisher[T]) Subscribe(s Subscriber[T]) { p.mux.Lock() defer p.mux.Unlock() - p.subscribers[s.ID()] = s + p.subscribers[s] = struct{}{} } -func (p *Publisher) Unsubscribe(s Subscriber) { +func (p *Publisher[T]) Unsubscribe(s Subscriber[T]) { p.mux.Lock() defer p.mux.Unlock() - delete(p.subscribers, s.ID()) + delete(p.subscribers, s) } -type Subscriber interface { - ID() uuid.UUID - Notify(data any) +type Subscriber[T any] interface { + Notify(data T) Error() <-chan error } -type Subscription struct { +type Subscription[T any] struct { err chan error - callback func(data any) error - uuid uuid.UUID + callback func(data T) error } -func NewSubscription(callback func(any) error) *Subscription { - return &Subscription{ +func NewSubscription[T any](callback func(T) error) *Subscription[T] { + return &Subscription[T]{ callback: callback, - uuid: uuid.New(), + err: make(chan error), } } -func (b *Subscription) Notify(data any) { +func (b *Subscription[T]) Notify(data T) { err := b.callback(data) if err != nil { - b.err <- err + select { + case b.err <- err: + default: + // TODO: handle this better! + panic(fmt.Sprintf("failed to send error to subscription %v", err)) + } } } -func (b *Subscription) ID() uuid.UUID { - return b.uuid -} - -func (b *Subscription) Error() <-chan error { +func (b *Subscription[T]) Error() <-chan error { return b.err } diff --git a/models/stream_test.go b/models/stream_test.go new file mode 100644 index 00000000..8864bc44 --- /dev/null +++ b/models/stream_test.go @@ -0,0 +1,165 @@ +package models_test + +import ( + "fmt" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/onflow/flow-evm-gateway/models" + "github.com/stretchr/testify/require" +) + +func Test_Stream(t *testing.T) { + + t.Run("unsubscribe before subscribing", func(t *testing.T) { + p := newMockPublisher() + s := newMockSubscription() + + require.NotPanics(t, func() { + p.Unsubscribe(s) + }) + }) + + t.Run("subscribe, publish, unsubscribe, publish", func(t *testing.T) { + p := newMockPublisher() + s1 := newMockSubscription() + s2 := newMockSubscription() + + p.Subscribe(s1) + p.Subscribe(s2) + + p.Publish(mockData{}) + + require.Equal(t, uint64(1), s1.CallCount()) + require.Equal(t, uint64(1), s2.CallCount()) + + p.Unsubscribe(s1) + + p.Publish(mockData{}) + + require.Equal(t, uint64(1), s1.CallCount()) + require.Equal(t, uint64(2), s2.CallCount()) + }) + + t.Run("concurrent subscribe, publish, unsubscribe, publish", func(t *testing.T) { + + p := newMockPublisher() + + stopPublishing := make(chan struct{}) + + published := make(chan struct{}) + + // publishing + go func() { + for { + select { + case <-stopPublishing: + return + case <-time.After(time.Millisecond * 1): + p.Publish(mockData{}) + + select { + case published <- struct{}{}: + default: + } + } + } + }() + + waitAllSubscribed := sync.WaitGroup{} + waitAllUnsubscribed := sync.WaitGroup{} + + // 10 goroutines adding 10 subscribers each + // and then unsubscribe all + waitAllSubscribed.Add(10) + waitAllUnsubscribed.Add(10) + for i := 0; i < 10; i++ { + go func() { + subscriptions := make([]*mockSubscription, 10) + + for j := 0; j < 10; j++ { + s := newMockSubscription() + subscriptions[j] = s + p.Subscribe(s) + + } + waitAllSubscribed.Done() + waitAllSubscribed.Wait() + + // wait for all subscribers to receive data + for i := 0; i < 10; i++ { + <-published + } + + for _, s := range subscriptions { + p.Unsubscribe(s) + } + + // there should be at least 1 call + for j := 0; j < 10; j++ { + require.GreaterOrEqual(t, subscriptions[j].CallCount(), uint64(10)) + } + + waitAllUnsubscribed.Done() + }() + } + + waitAllUnsubscribed.Wait() + close(stopPublishing) + }) + + t.Run("error handling", func(t *testing.T) { + p := newMockPublisher() + s := &mockSubscription{} + errContent := fmt.Errorf("failed to process data") + + s.Subscription = models.NewSubscription[mockData](func(data mockData) error { + s.callCount.Add(1) + return errContent + }) + + p.Subscribe(s) + + shouldReceiveError := make(chan struct{}) + ready := make(chan struct{}) + go func() { + close(ready) + select { + case err := <-s.Error(): + require.ErrorIs(t, err, errContent) + case <-shouldReceiveError: + require.Fail(t, "should have received error") + } + }() + <-ready + + p.Publish(mockData{}) + close(shouldReceiveError) + }) +} + +type mockData struct{} + +type mockSubscription struct { + *models.Subscription[mockData] + callCount atomic.Uint64 +} + +func newMockSubscription() *mockSubscription { + s := &mockSubscription{} + s.Subscription = models.NewSubscription[mockData](func(data mockData) error { + s.callCount.Add(1) + return nil + }) + return s +} + +func (s *mockSubscription) CallCount() uint64 { + return s.callCount.Load() +} + +func newMockPublisher() *models.Publisher[mockData] { + return models.NewPublisher[mockData]() +} diff --git a/services/ingestion/engine.go b/services/ingestion/engine.go index df951d9e..37aeae0b 100644 --- a/services/ingestion/engine.go +++ b/services/ingestion/engine.go @@ -6,6 +6,7 @@ import ( pebbleDB "github.com/cockroachdb/pebble" "github.com/onflow/flow-go-sdk" + gethTypes "github.com/onflow/go-ethereum/core/types" "github.com/rs/zerolog" "github.com/onflow/flow-evm-gateway/metrics" @@ -41,8 +42,8 @@ type Engine struct { accounts storage.AccountIndexer log zerolog.Logger evmLastHeight *models.SequentialHeight - blocksPublisher *models.Publisher - logsPublisher *models.Publisher + blocksPublisher *models.Publisher[*models.Block] + logsPublisher *models.Publisher[[]*gethTypes.Log] collector metrics.Collector } @@ -53,8 +54,8 @@ func NewEventIngestionEngine( receipts storage.ReceiptIndexer, transactions storage.TransactionIndexer, accounts storage.AccountIndexer, - blocksPublisher *models.Publisher, - logsPublisher *models.Publisher, + blocksPublisher *models.Publisher[*models.Block], + logsPublisher *models.Publisher[[]*gethTypes.Log], log zerolog.Logger, collector metrics.Collector, ) *Engine { diff --git a/services/ingestion/engine_test.go b/services/ingestion/engine_test.go index 443a1c12..c7f6a77b 100644 --- a/services/ingestion/engine_test.go +++ b/services/ingestion/engine_test.go @@ -68,8 +68,8 @@ func TestSerialBlockIngestion(t *testing.T) { receipts, transactions, accounts, - models.NewPublisher(), - models.NewPublisher(), + models.NewPublisher[*models.Block](), + models.NewPublisher[[]*gethTypes.Log](), zerolog.Nop(), metrics.NopCollector, ) @@ -148,8 +148,8 @@ func TestSerialBlockIngestion(t *testing.T) { receipts, transactions, accounts, - models.NewPublisher(), - models.NewPublisher(), + models.NewPublisher[*models.Block](), + models.NewPublisher[[]*gethTypes.Log](), zerolog.Nop(), metrics.NopCollector, ) @@ -210,11 +210,9 @@ func TestSerialBlockIngestion(t *testing.T) { close(eventsChan) <-waitErr }) - } func TestBlockAndTransactionIngestion(t *testing.T) { - t.Run("successfully ingest transaction and block", func(t *testing.T) { receipts := &storageMock.ReceiptIndexer{} transactions := &storageMock.TransactionIndexer{} @@ -265,8 +263,8 @@ func TestBlockAndTransactionIngestion(t *testing.T) { receipts, transactions, accounts, - models.NewPublisher(), - models.NewPublisher(), + models.NewPublisher[*models.Block](), + models.NewPublisher[[]*gethTypes.Log](), zerolog.Nop(), metrics.NopCollector, ) @@ -368,8 +366,8 @@ func TestBlockAndTransactionIngestion(t *testing.T) { receipts, transactions, accounts, - models.NewPublisher(), - models.NewPublisher(), + models.NewPublisher[*models.Block](), + models.NewPublisher[[]*gethTypes.Log](), zerolog.Nop(), metrics.NopCollector, ) @@ -417,7 +415,8 @@ func TestBlockAndTransactionIngestion(t *testing.T) { { Type: string(blockEvent.Etype), Value: blockCdc, - }}, + }, + }, Height: nextHeight, }) @@ -463,8 +462,8 @@ func TestBlockAndTransactionIngestion(t *testing.T) { receipts, transactions, accounts, - models.NewPublisher(), - models.NewPublisher(), + models.NewPublisher[*models.Block](), + models.NewPublisher[[]*gethTypes.Log](), zerolog.Nop(), metrics.NopCollector, ) diff --git a/services/requester/pool.go b/services/requester/pool.go index 4c91a3c9..bb19551a 100644 --- a/services/requester/pool.go +++ b/services/requester/pool.go @@ -29,13 +29,13 @@ type TxPool struct { logger zerolog.Logger client *CrossSporkClient pool *sync.Map - txPublisher *models.Publisher + txPublisher *models.Publisher[*gethTypes.Transaction] // todo add methods to inspect transaction pool state } func NewTxPool( client *CrossSporkClient, - transactionsPublisher *models.Publisher, + transactionsPublisher *models.Publisher[*gethTypes.Transaction], logger zerolog.Logger, ) *TxPool { return &TxPool{ diff --git a/services/traces/engine.go b/services/traces/engine.go index fd51dfec..0f089f0e 100644 --- a/services/traces/engine.go +++ b/services/traces/engine.go @@ -5,7 +5,6 @@ import ( "sync" "time" - "github.com/google/uuid" "github.com/onflow/flow-go-sdk" gethCommon "github.com/onflow/go-ethereum/common" "github.com/rs/zerolog" @@ -30,7 +29,7 @@ type Engine struct { *models.EngineStatus logger zerolog.Logger - blocksPublisher *models.Publisher + blocksPublisher *models.Publisher[*models.Block] blocks storage.BlockIndexer traces storage.TraceIndexer downloader Downloader @@ -39,7 +38,7 @@ type Engine struct { // NewTracesIngestionEngine creates a new instance of the engine. func NewTracesIngestionEngine( - blocksPublisher *models.Publisher, + blocksPublisher *models.Publisher[*models.Block], blocks storage.BlockIndexer, traces storage.TraceIndexer, downloader Downloader, @@ -70,13 +69,7 @@ func (e *Engine) Run(ctx context.Context) error { // Notify is a handler that is being used to subscribe for new EVM block notifications. // This method should be non-blocking. -func (e *Engine) Notify(data any) { - block, ok := data.(*models.Block) - if !ok { - e.logger.Error().Msg("invalid event type sent to trace ingestion") - return - } - +func (e *Engine) Notify(block *models.Block) { // If the block has no transactions, we simply return early // as there are no transaction traces to index. if len(block.TransactionHashes) == 0 { @@ -126,7 +119,6 @@ func (e *Engine) indexBlockTraces(evmBlock *models.Block, cadenceBlockID flow.Id return e.traces.StoreTransaction(h, trace, nil) }) - if err != nil { e.collector.TraceDownloadFailed() l.Error().Err(err).Msg("failed to download trace") @@ -139,12 +131,6 @@ func (e *Engine) indexBlockTraces(evmBlock *models.Block, cadenceBlockID flow.Id wg.Wait() } -// ID is required by the publisher interface and we return a random uuid since the -// subscription will only happen once by this engine -func (e *Engine) ID() uuid.UUID { - return uuid.New() -} - // Error is required by the publisher, and we just return a nil, // since the errors are handled gracefully in the indexBlockTraces func (e *Engine) Error() <-chan error { diff --git a/services/traces/engine_test.go b/services/traces/engine_test.go index 6bd18711..89473afc 100644 --- a/services/traces/engine_test.go +++ b/services/traces/engine_test.go @@ -27,7 +27,7 @@ import ( // downloaded and stored. func TestTraceIngestion(t *testing.T) { t.Run("successful single block ingestion", func(t *testing.T) { - blockPublisher := models.NewPublisher() + blockPublisher := models.NewPublisher[*models.Block]() blocks := &storageMock.BlockIndexer{} trace := &storageMock.TraceIndexer{} downloader := &mocks.Downloader{} @@ -113,7 +113,7 @@ func TestTraceIngestion(t *testing.T) { }) t.Run("successful multiple blocks ingestion", func(t *testing.T) { - blocksPublisher := models.NewPublisher() + blocksPublisher := models.NewPublisher[*models.Block]() blocks := &storageMock.BlockIndexer{} trace := &storageMock.TraceIndexer{} downloader := &mocks.Downloader{} @@ -230,7 +230,7 @@ func TestTraceIngestion(t *testing.T) { }) t.Run("failed download retries", func(t *testing.T) { - blockBroadcaster := models.NewPublisher() + blockBroadcaster := models.NewPublisher[*models.Block]() blocks := &storageMock.BlockIndexer{} downloader := &mocks.Downloader{} trace := &storageMock.TraceIndexer{}