Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[changelog] Add message compaction as an opt in feature to changelog consumer #1230

Open
wants to merge 5 commits into
base: main
Choose a base branch
from

Conversation

ZacAttack
Copy link
Contributor

[changelog] Add message compaction as an opt in feature to changelog consumer

This introduces a configuration which will compact down data returned from poll to only contain the latest records for a given key. This also maintains the order of results returned.

Resolves #XXX

How was this PR tested?

Does this PR introduce any user-facing changes?

  • No. You can skip the rest of this section.
  • Yes. Make sure to explain your proposed changes and call out the behavior change.

This introduces a configuration which will compact down data returned from poll to only contain the latest records for a given key.  This also maintains the order of results returned.
// a single key (just as a map would), but we want to keep the position of the last insertion as well. So in order
// to do that, we remove the entry before inserting it.
for (PubSubMessage<K, ChangeEvent<V>, VeniceChangeCoordinate> message: pubSubMessages) {
if (tempMap.containsKey(message.getKey())) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In your test case, K is String, which is comparable as expected.
But in this file, key is byte[]. byte[] arrays are compared based on reference equality rather than content equality.

Copy link
Contributor Author

@ZacAttack ZacAttack Oct 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's an interesting point. The type of message.getKey will be parameterized by the type given in the client... So it would imply that that type needs to be something that implements equals(), which probably won't be universally true. I know in java we have Arrays.equals() for this sort of thing, but then we'd have to peek under the covers a bit..... I'm not totally sure how to overcome this.

On the flip side, K will be some deserialized value that will either be an avro primitive or an avro serializable class. The primitive that gets us in trouble is as you mentioned the byte[] type. So maybe the answer is to introspect for that edge case and maybe collection types? Though collection types as a key to Venice should be very rare.

// it's put in at the position of the first insertion. This isn't quite what we want, we want to keep only
// a single key (just as a map would), but we want to keep the position of the last insertion as well. So in order
// to do that, we remove the entry before inserting it.
for (PubSubMessage<K, ChangeEvent<V>, VeniceChangeCoordinate> message: pubSubMessages) {
Copy link
Contributor

@xunyin8 xunyin8 Oct 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC pubSubMessages could contain control messages if includeControlMessage == true. Could this break anything for CC client? I know heartbeat keys are equal but compacting heartbeat is fine. What about other control messages?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants