-
Notifications
You must be signed in to change notification settings - Fork 10
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Ingestion Performance improvements #653
Ingestion Performance improvements #653
Conversation
Important Review skippedAuto reviews are disabled on base/target branches other than the default branch. Please check the settings in the CodeRabbit UI or the You can disable this status message by setting the Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
1823011
to
ab6775e
Compare
c9af370
to
6c193ad
Compare
is there a problem with streaming part on AN or some configuration problem ? 50 b/s seems super low |
@bluesign the streaming api has a heartbeat rate of 50/s. We could increase that by an order of magnitude or 2, but the batch request will still be faster, because the rate limit is 100/s (this seems to hold up in practice as well) and you can request 250 blocks. With the change here, we don't hit the 2.5 kb/s limit though. There is now a different bottleneck, and I think it might be pebble disk IO. A future optimization might be not committing a pebble batch until it contains at least X transactions/blocks. |
d493e6c
to
fd3f08d
Compare
@@ -40,13 +40,15 @@ var Cmd = &cobra.Command{ | |||
os.Exit(1) | |||
} | |||
|
|||
ctx, cancel := context.WithCancel(context.Background()) | |||
ctx, cancel := context.WithCancel(command.Context()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The changes in this files are not needed for performance, but they made it slightly easier to test things, so I left them in.
// if heartbeat interval with no data still update the cadence height | ||
if events.Empty() { | ||
if err := e.blocks.SetLatestCadenceHeight(events.CadenceHeight(), nil); err != nil { | ||
if err := e.blocks.SetLatestCadenceHeight(events.CadenceHeight(), batch); err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was a bug
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice 👌
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM! 👏
@janezpodhostnik I believe the |
@janezpodhostnik yeah it makes sense, but I think it also makes sense to publish checkpoint tbh. I think it makes no sense people to hammer ANs with getEvents, also there will be need for ledger data. I think separating pebbleDB per spork and sharing it via sporks.json ( and client downloading from gcp for backfill ) would be more ideal solution. |
@bluesign Definitely! Checkpoints would be great they would speed up bootstrapping and reduce pressure on ANs. |
@@ -195,7 +197,7 @@ func (r *RPCEventSubscriber) subscribe(ctx context.Context, height uint64) <-cha | |||
// and check for each event it receives whether we reached the end, if we reach the end it will increase | |||
// the height by one (next height), and check if we are still in previous sporks, if so repeat everything, | |||
// otherwise return. | |||
func (r *RPCEventSubscriber) backfill(ctx context.Context, height uint64) <-chan models.BlockEvents { | |||
func (r *RPCEventSubscriber) backfill(ctx context.Context, currentHeight uint64) <-chan models.BlockEvents { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
func (r *RPCEventSubscriber) backfill(ctx context.Context, currentHeight uint64) <-chan models.BlockEvents { | |
func (r *RPCEventSubscriber) backfill(ctx context.Context, currentCadenceHeight uint64) <-chan models.BlockEvents { |
Uint64("start-height", height). | ||
Uint64("last-spork-height", latestHeight). | ||
Msg("backfilling spork") | ||
Uint64("next-height", currentHeight). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Uint64("next-height", currentHeight). | |
Uint64("next-cadence-height", currentHeight). |
|
||
r.logger.Debug().Msg(fmt.Sprintf("backfilling [%d / %d]...", ev.Events.CadenceHeight(), latestHeight)) | ||
func (r *RPCEventSubscriber) backfillSpork(ctx context.Context, fromHeight uint64, eventsChan chan<- models.BlockEvents) (uint64, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
func (r *RPCEventSubscriber) backfillSpork(ctx context.Context, fromHeight uint64, eventsChan chan<- models.BlockEvents) (uint64, error) { | |
func (r *RPCEventSubscriber) backfillSpork(ctx context.Context, fromCadenceHeight uint64, eventsChan chan<- models.BlockEvents) (uint64, error) { |
// sort both, just in case | ||
sort.Slice(blocks, func(i, j int) bool { | ||
return blocks[i].Height < blocks[j].Height | ||
}) | ||
sort.Slice(transactions, func(i, j int) bool { | ||
return transactions[i].Height < transactions[j].Height | ||
}) | ||
|
||
if len(transactions) != len(blocks) { | ||
return 0, fmt.Errorf("transactions and blocks have different length") | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can perform the check first
// sort both, just in case | |
sort.Slice(blocks, func(i, j int) bool { | |
return blocks[i].Height < blocks[j].Height | |
}) | |
sort.Slice(transactions, func(i, j int) bool { | |
return transactions[i].Height < transactions[j].Height | |
}) | |
if len(transactions) != len(blocks) { | |
return 0, fmt.Errorf("transactions and blocks have different length") | |
} | |
if len(transactions) != len(blocks) { | |
return 0, fmt.Errorf("transactions and blocks have different length") | |
} | |
// sort both, just in case | |
sort.Slice(blocks, func(i, j int) bool { | |
return blocks[i].Height < blocks[j].Height | |
}) | |
sort.Slice(transactions, func(i, j int) bool { | |
return transactions[i].Height < transactions[j].Height | |
}) |
|
||
r.logger.Debug().Msg(fmt.Sprintf("backfilling [%d / %d]...", ev.Events.CadenceHeight(), latestHeight)) | ||
func (r *RPCEventSubscriber) backfillSpork(ctx context.Context, fromHeight uint64, eventsChan chan<- models.BlockEvents) (uint64, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you add comment to this function to explain the Spork
in the name? and also the return value?
Is the returned height the first height for the next spork of the fromHeight?
Msg("completed backfilling") | ||
|
||
return | ||
} | ||
|
||
latestHeight, err := r.client.GetLatestHeightForSpork(ctx, height) | ||
currentHeight, err := r.backfillSpork(ctx, currentHeight, eventsChan) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why backfill only takes one height, instead of a [from-to] height range?
Would it be more clear that we query the height range that needs to be backfilled and make the backfillSpork
function take the height range instead?
dd8aa43
to
35b1749
Compare
35b1749
to
2e65322
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Awesome 👏
b3212e0
into
feature/local-tx-reexecution
Closes: #???
Description
For contributor use:
master
branchFiles changed
in the Github PR explorer