diff --git a/services/ingestion/engine.go b/services/ingestion/engine.go index ab7a01c7..47997039 100644 --- a/services/ingestion/engine.go +++ b/services/ingestion/engine.go @@ -93,7 +93,8 @@ func NewEventIngestionEngine( // Stop the engine. func (e *Engine) Stop() { - // todo + e.MarkDone() + <-e.Stopped() } // Run the Cadence event ingestion engine. @@ -115,30 +116,33 @@ func (e *Engine) Run(ctx context.Context) error { e.log.Info().Msg("starting ingestion") e.MarkReady() + defer e.MarkStopped() - for events := range e.subscriber.Subscribe(ctx) { + events := e.subscriber.Subscribe(ctx) + + for { select { - case <-ctx.Done(): + case <-e.Done(): // stop the engine return nil - default: - } - - if events.Err != nil { - return fmt.Errorf( - "failure in event subscription with: %w", - events.Err, - ) - } - - err := e.processEvents(ctx, events.Events) - if err != nil { - e.log.Error().Err(err).Msg("failed to process EVM events") - return err + case events, ok := <-events: + if !ok { + return nil + } + if events.Err != nil { + return fmt.Errorf( + "failure in event subscription with: %w", + events.Err, + ) + } + + err := e.processEvents(events.Events) + if err != nil { + e.log.Error().Err(err).Msg("failed to process EVM events") + return err + } } } - - return nil } // processEvents converts the events to block and transactions and indexes them. @@ -153,7 +157,7 @@ func (e *Engine) Run(ctx context.Context) error { // https://github.com/onflow/flow-go/blob/master/fvm/evm/types/events.go // // Any error is unexpected and fatal. -func (e *Engine) processEvents(ctx context.Context, events *models.CadenceEvents) error { +func (e *Engine) processEvents(events *models.CadenceEvents) error { e.log.Info(). Uint64("cadence-height", events.CadenceHeight()). Int("cadence-event-length", events.Length()). @@ -258,14 +262,6 @@ func (e *Engine) processEvents(ctx context.Context, events *models.CadenceEvents } } - select { - case <-ctx.Done(): - // Temporary solution to avoid committing the batch when the DB is closed - // TODO(JanezP): handle this better - return nil - default: - } - if err := batch.Commit(pebbleDB.Sync); err != nil { return fmt.Errorf("failed to commit indexed data for Cadence block %d: %w", events.CadenceHeight(), err) } diff --git a/storage/pebble/keys.go b/storage/pebble/keys.go index f52eb0c4..aa46b61a 100644 --- a/storage/pebble/keys.go +++ b/storage/pebble/keys.go @@ -67,12 +67,12 @@ func NewMVCCComparer() *pebble.Comparer { // edge case. Not sure if this is possible, but just in case return 0 } - if a[0] != registerKeyMarker { - // default comparer - return len(a) + if a[0] == registerKeyMarker { + // special case for registers + return len(a) - 8 } - // special case for registers - return len(a) - 8 + // default comparer + return len(a) } comparer.Name = "flow.MVCCComparer" diff --git a/storage/register_delta.go b/storage/register_delta.go index 4aba9fc3..e7c5b85e 100644 --- a/storage/register_delta.go +++ b/storage/register_delta.go @@ -73,10 +73,17 @@ func (r *RegisterDelta) GetUpdates() flow.RegisterEntries { } func (r *RegisterDelta) AllocateSlabIndex(owner []byte) (atree.SlabIndex, error) { + return allocateSlabIndex(owner, r) + +} + +// allocateSlabIndex allocates a new slab index for the given owner and key. +// this method only uses the storage get/set methods. +func allocateSlabIndex(owner []byte, storage types.BackendStorage) (atree.SlabIndex, error) { // get status address := flow.BytesToAddress(owner) id := flow.AccountStatusRegisterID(address) - statusBytes, err := r.GetValue(owner, []byte(id.Key)) + statusBytes, err := storage.GetValue(owner, []byte(id.Key)) if err != nil { return atree.SlabIndex{}, fmt.Errorf( "failed to load account status for the account (%s): %w", @@ -100,7 +107,7 @@ func (r *RegisterDelta) AllocateSlabIndex(owner []byte) (atree.SlabIndex, error) // compute storage size changes) // this way the getValue would load this value from deltas key := atree.SlabIndexToLedgerKey(index) - err = r.SetValue(owner, key, []byte{}) + err = storage.SetValue(owner, key, []byte{}) if err != nil { return atree.SlabIndex{}, fmt.Errorf( "failed to allocate an storage index: %w", @@ -110,7 +117,7 @@ func (r *RegisterDelta) AllocateSlabIndex(owner []byte) (atree.SlabIndex, error) // update the storageIndex bytes status.SetStorageIndex(newIndexBytes) - err = r.SetValue(owner, []byte(id.Key), status.ToBytes()) + err = storage.SetValue(owner, []byte(id.Key), status.ToBytes()) if err != nil { return atree.SlabIndex{}, fmt.Errorf( "failed to store the account status for account (%s): %w",