-
Notifications
You must be signed in to change notification settings - Fork 11.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[indexer] use in memory buffer to store obj changes and update snapsh… #18007
base: main
Are you sure you want to change the base?
Conversation
The latest updates on your projects. Learn more about Vercel for Git ↗︎
|
// The number of object changes in the buffer before the snapshot processor starts processing them. | ||
// TODO: placeholder value, need to tune this |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense, the object updates in this buffer will likely be larger than the typical live objects update but we don't want it to be too large either
@@ -138,14 +161,22 @@ where | |||
} | |||
} | |||
|
|||
info!("Objects snapshot processor starts updating objects_snapshot periodically..."); | |||
loop { | |||
// We are not in backfill mode but it's possible that the snapshot checkpoint is behind the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wouldn't the snapshot checkpoint always be behind though
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah I see, this is to handle the scenario where we've started populating the buffer, but the lhs of the buffer is larger than the max checkpoint of snapshot. In that case, we must still upsert from objects history
I think we can speed up this process in a similar vein by first fetching the object updates, and then separately doing the upsert? Instead of relying on the current upsert logic which takes around 20 mins to index 600 checkpoints
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it't possible to be faster, but reading those object updates and upsert has the networking overhead of passing the data back-n-forth.
// Now the cp in objects snapshot is greater than what's in the buffer, so we can start flushing | ||
// the buffer to objects snapshot. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure if addressed but one potential issue might arise if snapshot exceeds buffer such that some object in snapshot may be a later version than what we'll end up upserting from buffer
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes it's indeed possible, although the buffer will contain later versions too and will overwrite with the correct version. To solve this, I can just add cp to each TransactionObjectChanges
in the buffer and discard the ones that's before start_cp.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Addressed in the latest commit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
overall looks great, left several comments, thanks for picking this up and working this out quickly!
if buffer_size > OBJECTS_SNAPSHOT_BUFFER_THRESHOLD { | ||
// flush everything in the buffer | ||
// TODO: what if there's too many things in the buffer? Maybe it's better to flush in batches | ||
let object_changes = self.buffer.lock().unwrap().buffer.drain(..).collect(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
iiuc if we drain everything from the buffer and DB commit finishes very quickly, then objects_snapshot
will be very close to the latest cp, in other words, the available range can be < 10, which seems off?
an alternative is, we track the buffer_size by checkpoint, it has a max size for example 1800, a flushing starting size for example 900, and a flushing end size for example 300, then
- the sender keeps pushing until hitting 1800, if it hits 1800, it will wait to avoid bloating memory
- the receiver will flush as long as the total size is > 900, and always flush until 300 checkpoints are left, so that we always have at least 300 checkpoints in the available range
- we have a different max size is to reduce the odds / smooth some checkpoint commits on the receiver end, instead of blocking immediately on the sender side.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Addressed the comment in the latest commit by flushing when the buffer cp is behind by 900 and setting the batch size to be 600.
let mut buffer = objects_snapshot_buffer.lock().unwrap(); | ||
// The buffer has never been used so we need to set the startup_checkpoint | ||
if buffer.startup_checkpoint.is_none() { | ||
buffer.startup_checkpoint = Some(first_checkpoint_seq); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is the init logic of startup_checkpoint
, but seems that I did not find the updating logic when we remove checkpoints?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not updated. It's only here to tell snapshot processor when it can start using the buffer.
@@ -162,8 +193,36 @@ where | |||
.latest_object_snapshot_sequence_number | |||
.set(start_cp as i64); | |||
} | |||
|
|||
buffer_cp = self.buffer.lock().unwrap().startup_checkpoint; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
@@ -184,6 +191,7 @@ where | |||
.unwrap_or_default(); | |||
|
|||
if latest_cp > start_cp + self.config.snapshot_max_lag as u64 { | |||
let end_cp = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oops this line should be deleted
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Damn, thanks for tackling this Emma! These changes make sense to me.
On the committer
side, we introduce a sender channel, and some new logic so that when we are not in objects_snapshot
backfill mode, we'll load object changes into the sender channel one checkpoint at a time.
The story gets a bit more complex on the objects_snapshot_processor
side.
Say we start from backfill, and we get to a point where start_cp > fullnode_cp - MAX_LAG
. At this instant, the state of indexer db is such that snapshot's max cp = checkpoint table's max cp. Since we have caught up, we flip the switch to transition out of backfill state. There's one more check we need to do before resuming things as normal, and that's making sure that there isn't a gap between objects_snapshot
and the buffer. If snapshot's checkpoint is more than 1 checkpoint behind the buffer, we cannot start flushing the buffer into objects_snapshot
until we've bridged the gap, otherwise we'll have some missing object updates data. So, we need to update objects_snapshot
the old fashioned way until objects_snapshot_cp > buffer_cp - 1
. The -1 is there because it's fine for objects_snapshot to be at some checkpoint 99 while buffer is 100.
Once this gap has been plugged, we are good to go with flushing buffer into objects_snapshot
.
Semgrep found 1
Risk: Affected versions of vite are vulnerable to Improper Handling Of Case Sensitivity / Exposure Of Sensitive Information To An Unauthorized Actor / Improper Access Control. The vulnerability arises when the Vite development server's option, Manual Review Advice: A vulnerability from this advisory is reachable if you host vite's development server on Windows, and you rely on Fix: Upgrade this library to at least version 4.5.2 at sui/examples/trading/frontend/pnpm-lock.yaml:4700. Reference(s): GHSA-c24v-8rfc-w8vw, CVE-2023-34092, CVE-2024-23331 Ignore this finding from ssc-efa14576-9601-4ae6-939c-3da58aa25013. |
This PR is stale because it has been open 60 days with no activity. Remove stale label or comment or this will be closed in 7 days. |
…ot table
Description
Introduce an object change buffer that gets populated by the checkpoint commiter and consumed by the snapshot processor to update the objects snapshot table asynchronously.
Test plan
Tested locally against devnet data.
Release notes
Check each box that your changes affect. If none of the boxes relate to your changes, release notes aren't required.
For each box you select, include information after the relevant heading that describes the impact of your changes that a user might notice and any actions they must take to implement updates.