Skip to content

Commit

Permalink
address review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
janezpodhostnik committed Nov 12, 2024
1 parent 083deeb commit ab6775e
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 36 deletions.
52 changes: 24 additions & 28 deletions services/ingestion/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,8 @@ func NewEventIngestionEngine(

// Stop the engine.
func (e *Engine) Stop() {
// todo
e.MarkDone()
<-e.Stopped()
}

// Run the Cadence event ingestion engine.
Expand All @@ -115,30 +116,33 @@ func (e *Engine) Run(ctx context.Context) error {
e.log.Info().Msg("starting ingestion")

e.MarkReady()
defer e.MarkStopped()

for events := range e.subscriber.Subscribe(ctx) {
events := e.subscriber.Subscribe(ctx)

for {
select {
case <-ctx.Done():
case <-e.Done():
// stop the engine
return nil
default:
}

if events.Err != nil {
return fmt.Errorf(
"failure in event subscription with: %w",
events.Err,
)
}

err := e.processEvents(ctx, events.Events)
if err != nil {
e.log.Error().Err(err).Msg("failed to process EVM events")
return err
case events, ok := <-events:
if !ok {
return nil
}
if events.Err != nil {
return fmt.Errorf(
"failure in event subscription with: %w",
events.Err,
)
}

err := e.processEvents(events.Events)
if err != nil {
e.log.Error().Err(err).Msg("failed to process EVM events")
return err
}
}
}

return nil
}

// processEvents converts the events to block and transactions and indexes them.
Expand All @@ -153,7 +157,7 @@ func (e *Engine) Run(ctx context.Context) error {
// https://github.com/onflow/flow-go/blob/master/fvm/evm/types/events.go
//
// Any error is unexpected and fatal.
func (e *Engine) processEvents(ctx context.Context, events *models.CadenceEvents) error {
func (e *Engine) processEvents(events *models.CadenceEvents) error {
e.log.Info().
Uint64("cadence-height", events.CadenceHeight()).
Int("cadence-event-length", events.Length()).
Expand Down Expand Up @@ -258,14 +262,6 @@ func (e *Engine) processEvents(ctx context.Context, events *models.CadenceEvents
}
}

select {
case <-ctx.Done():
// Temporary solution to avoid committing the batch when the DB is closed
// TODO(JanezP): handle this better
return nil
default:
}

if err := batch.Commit(pebbleDB.Sync); err != nil {
return fmt.Errorf("failed to commit indexed data for Cadence block %d: %w", events.CadenceHeight(), err)
}
Expand Down
10 changes: 5 additions & 5 deletions storage/pebble/keys.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,12 @@ func NewMVCCComparer() *pebble.Comparer {
// edge case. Not sure if this is possible, but just in case
return 0
}
if a[0] != registerKeyMarker {
// default comparer
return len(a)
if a[0] == registerKeyMarker {
// special case for registers
return len(a) - 8
}
// special case for registers
return len(a) - 8
// default comparer
return len(a)
}
comparer.Name = "flow.MVCCComparer"

Expand Down
13 changes: 10 additions & 3 deletions storage/register_delta.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,17 @@ func (r *RegisterDelta) GetUpdates() flow.RegisterEntries {
}

func (r *RegisterDelta) AllocateSlabIndex(owner []byte) (atree.SlabIndex, error) {
return allocateSlabIndex(owner, r)

}

// allocateSlabIndex allocates a new slab index for the given owner and key.
// this method only uses the storage get/set methods.
func allocateSlabIndex(owner []byte, storage types.BackendStorage) (atree.SlabIndex, error) {
// get status
address := flow.BytesToAddress(owner)
id := flow.AccountStatusRegisterID(address)
statusBytes, err := r.GetValue(owner, []byte(id.Key))
statusBytes, err := storage.GetValue(owner, []byte(id.Key))
if err != nil {
return atree.SlabIndex{}, fmt.Errorf(
"failed to load account status for the account (%s): %w",
Expand All @@ -100,7 +107,7 @@ func (r *RegisterDelta) AllocateSlabIndex(owner []byte) (atree.SlabIndex, error)
// compute storage size changes)
// this way the getValue would load this value from deltas
key := atree.SlabIndexToLedgerKey(index)
err = r.SetValue(owner, key, []byte{})
err = storage.SetValue(owner, key, []byte{})
if err != nil {
return atree.SlabIndex{}, fmt.Errorf(
"failed to allocate an storage index: %w",
Expand All @@ -110,7 +117,7 @@ func (r *RegisterDelta) AllocateSlabIndex(owner []byte) (atree.SlabIndex, error)
// update the storageIndex bytes
status.SetStorageIndex(newIndexBytes)

err = r.SetValue(owner, []byte(id.Key), status.ToBytes())
err = storage.SetValue(owner, []byte(id.Key), status.ToBytes())
if err != nil {
return atree.SlabIndex{}, fmt.Errorf(
"failed to store the account status for account (%s): %w",
Expand Down

0 comments on commit ab6775e

Please sign in to comment.