Skip to content

Commit

Permalink
clenup of event subscriber
Browse files Browse the repository at this point in the history
  • Loading branch information
janezpodhostnik committed Oct 29, 2024
1 parent e95b413 commit 2b52189
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 66 deletions.
2 changes: 1 addition & 1 deletion bootstrap/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func (b *Bootstrap) StartEventIngestion(ctx context.Context) error {
Msg("indexing cadence height information")

// create event subscriber
subscriber := ingestion.NewRPCSubscriber(
subscriber := ingestion.NewRPCEventSubscriber(
b.client,
b.config.HeartbeatInterval,
b.config.FlowNetworkID,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,30 +27,40 @@ type EventSubscriber interface {
Subscribe(ctx context.Context, height uint64) <-chan models.BlockEvents
}

var _ EventSubscriber = &RPCSubscriber{}
var _ EventSubscriber = &RPCEventSubscriber{}

type RPCSubscriber struct {
client *requester.CrossSporkClient
chain flowGo.ChainID
heartbeatInterval uint64
logger zerolog.Logger
type RPCEventSubscriberConfig struct {
HeartbeatInterval uint64
}

type RPCEventSubscriber struct {
RPCEventSubscriberConfig

logger zerolog.Logger

client *requester.CrossSporkClient
chain flowGo.ChainID

recovery bool
recoveredEvents []flow.Event
}

func NewRPCSubscriber(
func NewRPCEventSubscriber(
client *requester.CrossSporkClient,
heartbeatInterval uint64,
chainID flowGo.ChainID,
logger zerolog.Logger,
) *RPCSubscriber {
) *RPCEventSubscriber {
logger = logger.With().Str("component", "subscriber").Logger()
return &RPCSubscriber{
client: client,
heartbeatInterval: heartbeatInterval,
chain: chainID,
logger: logger,
return &RPCEventSubscriber{
RPCEventSubscriberConfig: RPCEventSubscriberConfig{
HeartbeatInterval: heartbeatInterval,
},

logger: logger,

client: client,
chain: chainID,
}
}

Expand All @@ -59,23 +69,23 @@ func NewRPCSubscriber(
// to listen all new events in the current spork.
//
// If error is encountered during backfill the subscription will end and the response chanel will be closed.
func (r *RPCSubscriber) Subscribe(ctx context.Context, height uint64) <-chan models.BlockEvents {
events := make(chan models.BlockEvents)
func (r *RPCEventSubscriber) Subscribe(ctx context.Context, height uint64) <-chan models.BlockEvents {
eventsChan := make(chan models.BlockEvents)

go func() {
defer func() {
close(events)
close(eventsChan)
}()

// if the height is from the previous spork, backfill all the events from previous sporks first
// if the height is from the previous spork, backfill all the eventsChan from previous sporks first
if r.client.IsPastSpork(height) {
r.logger.Info().
Uint64("height", height).
Msg("height found in previous spork, starting to backfill")

// backfill all the missed events, handling of context cancellation is done by the producer
// backfill all the missed eventsChan, handling of context cancellation is done by the producer
for ev := range r.backfill(ctx, height) {
events <- ev
eventsChan <- ev

if ev.Err != nil {
return
Expand All @@ -96,21 +106,22 @@ func (r *RPCSubscriber) Subscribe(ctx context.Context, height uint64) <-chan mod
Msg("backfilling done, subscribe for live data")

// subscribe in the current spork, handling of context cancellation is done by the producer
for ev := range r.subscribe(ctx, height, access.WithHeartbeatInterval(r.heartbeatInterval)) {
events <- ev
// TODO(JanezP): I think the heartbeat interval should always be 1 here
for ev := range r.subscribe(ctx, height, access.WithHeartbeatInterval(r.HeartbeatInterval)) {
eventsChan <- ev
}

r.logger.Warn().Msg("ended subscription for events")
}()

return events
return eventsChan
}

// subscribe to events by the provided height and handle any errors.
//
// Subscribing to EVM specific events and handle any disconnection errors
// as well as context cancellations.
func (r *RPCSubscriber) subscribe(ctx context.Context, height uint64, opts ...access.SubscribeOption) <-chan models.BlockEvents {
func (r *RPCEventSubscriber) subscribe(ctx context.Context, height uint64, opts ...access.SubscribeOption) <-chan models.BlockEvents {
eventsChan := make(chan models.BlockEvents)

_, err := r.client.GetBlockHeaderByHeight(ctx, height)
Expand All @@ -120,7 +131,7 @@ func (r *RPCSubscriber) subscribe(ctx context.Context, height uint64, opts ...ac
return eventsChan
}

eventStream, errChan, err := r.client.SubscribeEventsByBlockHeight(ctx, height, r.blocksFilter(), opts...)
eventStream, errChan, err := r.client.SubscribeEventsByBlockHeight(ctx, height, blocksFilter(r.chain), opts...)
if err != nil {
eventsChan <- models.NewBlockEventsError(
fmt.Errorf("failed to subscribe to events by block height: %d, with: %w", height, err),
Expand Down Expand Up @@ -187,12 +198,12 @@ func (r *RPCSubscriber) subscribe(ctx context.Context, height uint64, opts ...ac
// and check for each event it receives whether we reached the end, if we reach the end it will increase
// the height by one (next height), and check if we are still in previous sporks, if so repeat everything,
// otherwise return.
func (r *RPCSubscriber) backfill(ctx context.Context, height uint64) <-chan models.BlockEvents {
events := make(chan models.BlockEvents)
func (r *RPCEventSubscriber) backfill(ctx context.Context, height uint64) <-chan models.BlockEvents {
eventsChan := make(chan models.BlockEvents)

go func() {
defer func() {
close(events)
close(eventsChan)
}()

for {
Expand All @@ -207,7 +218,7 @@ func (r *RPCSubscriber) backfill(ctx context.Context, height uint64) <-chan mode

latestHeight, err := r.client.GetLatestHeightForSpork(ctx, height)
if err != nil {
events <- models.NewBlockEventsError(err)
eventsChan <- models.NewBlockEventsError(err)
return
}

Expand All @@ -217,7 +228,7 @@ func (r *RPCSubscriber) backfill(ctx context.Context, height uint64) <-chan mode
Msg("backfilling spork")

for ev := range r.subscribe(ctx, height, access.WithHeartbeatInterval(1)) {
events <- ev
eventsChan <- ev

if ev.Err != nil {
return
Expand All @@ -238,48 +249,22 @@ func (r *RPCSubscriber) backfill(ctx context.Context, height uint64) <-chan mode
}
}()

return events
}

// blockFilter define events we subscribe to:
// A.{evm}.EVM.BlockExecuted and A.{evm}.EVM.TransactionExecuted,
// where {evm} is EVM deployed contract address, which depends on the chain ID we configure.
func (r *RPCSubscriber) blocksFilter() flow.EventFilter {
evmAddress := common.Address(systemcontracts.SystemContractsForChain(r.chain).EVMContract.Address)

blockExecutedEvent := common.NewAddressLocation(
nil,
evmAddress,
string(events.EventTypeBlockExecuted),
).ID()

transactionExecutedEvent := common.NewAddressLocation(
nil,
evmAddress,
string(events.EventTypeTransactionExecuted),
).ID()

return flow.EventFilter{
EventTypes: []string{
blockExecutedEvent,
transactionExecutedEvent,
},
}
return eventsChan
}

// fetchMissingData is used as a backup mechanism for fetching EVM-related
// events, when the event streaming API returns an inconsistent response.
// An inconsistent response could be an EVM block that references EVM
// transactions which are not present in the response. It falls back
// to using grpc requests instead of streaming.
func (r *RPCSubscriber) fetchMissingData(
func (r *RPCEventSubscriber) fetchMissingData(
ctx context.Context,
blockEvents flow.BlockEvents,
) models.BlockEvents {
// remove existing events
blockEvents.Events = nil

for _, eventType := range r.blocksFilter().EventTypes {
for _, eventType := range blocksFilter(r.chain).EventTypes {
recoveredEvents, err := r.client.GetEventsForHeightRange(
ctx,
eventType,
Expand Down Expand Up @@ -309,7 +294,7 @@ func (r *RPCSubscriber) fetchMissingData(
// accumulateEventsMissingBlock will keep receiving transaction events until it can produce a valid
// EVM block event containing a block and transactions. At that point it will reset the recovery mode
// and return the valid block events.
func (r *RPCSubscriber) accumulateEventsMissingBlock(events flow.BlockEvents) models.BlockEvents {
func (r *RPCEventSubscriber) accumulateEventsMissingBlock(events flow.BlockEvents) models.BlockEvents {
r.recoveredEvents = append(r.recoveredEvents, events.Events...)
events.Events = r.recoveredEvents

Expand All @@ -329,7 +314,7 @@ func (r *RPCSubscriber) accumulateEventsMissingBlock(events flow.BlockEvents) mo
// in which case we might miss one of the events (missing transaction), or it can be
// due to a failure from the system transaction which commits an EVM block, which results
// in missing EVM block event but present transactions.
func (r *RPCSubscriber) recover(
func (r *RPCEventSubscriber) recover(
ctx context.Context,
events flow.BlockEvents,
err error,
Expand All @@ -349,3 +334,29 @@ func (r *RPCSubscriber) recover(

return models.NewBlockEventsError(err)
}

// blockFilter define events we subscribe to:
// A.{evm}.EVM.BlockExecuted and A.{evm}.EVM.TransactionExecuted,
// where {evm} is EVM deployed contract address, which depends on the chain ID we configure.
func blocksFilter(chainId flowGo.ChainID) flow.EventFilter {
evmAddress := common.Address(systemcontracts.SystemContractsForChain(chainId).EVMContract.Address)

blockExecutedEvent := common.NewAddressLocation(
nil,
evmAddress,
string(events.EventTypeBlockExecuted),
).ID()

transactionExecutedEvent := common.NewAddressLocation(
nil,
evmAddress,
string(events.EventTypeTransactionExecuted),
).ID()

return flow.EventFilter{
EventTypes: []string{
blockExecutedEvent,
transactionExecutedEvent,
},
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func Test_Subscribing(t *testing.T) {
)
require.NoError(t, err)

subscriber := NewRPCSubscriber(client, 100, flowGo.Previewnet, zerolog.Nop())
subscriber := NewRPCEventSubscriber(client, 100, flowGo.Previewnet, zerolog.Nop())

events := subscriber.Subscribe(context.Background(), 1)

Expand Down Expand Up @@ -83,7 +83,7 @@ func Test_MissingBlockEvent(t *testing.T) {
)
require.NoError(t, err)

subscriber := NewRPCSubscriber(client, 100, flowGo.Previewnet, zerolog.Nop())
subscriber := NewRPCEventSubscriber(client, 100, flowGo.Previewnet, zerolog.Nop())

events := subscriber.Subscribe(context.Background(), 1)

Expand Down Expand Up @@ -185,7 +185,7 @@ func Test_SubscribingWithRetryOnError(t *testing.T) {
)
require.NoError(t, err)

subscriber := NewRPCSubscriber(client, 100, flowGo.Previewnet, zerolog.Nop())
subscriber := NewRPCEventSubscriber(client, 100, flowGo.Previewnet, zerolog.Nop())

events := subscriber.Subscribe(context.Background(), 1)

Expand Down Expand Up @@ -248,7 +248,7 @@ func Test_SubscribingWithRetryOnErrorMultipleBlocks(t *testing.T) {
)
require.NoError(t, err)

subscriber := NewRPCSubscriber(client, 100, flowGo.Previewnet, zerolog.Nop())
subscriber := NewRPCEventSubscriber(client, 100, flowGo.Previewnet, zerolog.Nop())

events := subscriber.Subscribe(context.Background(), 1)

Expand Down Expand Up @@ -310,7 +310,7 @@ func Test_SubscribingWithRetryOnErrorEmptyBlocks(t *testing.T) {
)
require.NoError(t, err)

subscriber := NewRPCSubscriber(client, 100, flowGo.Previewnet, zerolog.Nop())
subscriber := NewRPCEventSubscriber(client, 100, flowGo.Previewnet, zerolog.Nop())

events := subscriber.Subscribe(context.Background(), 1)

Expand Down

0 comments on commit 2b52189

Please sign in to comment.