Skip to content

Commit

Permalink
feat: support topics sequence page
Browse files Browse the repository at this point in the history
  • Loading branch information
anitarua committed Nov 6, 2024
1 parent 44b5334 commit dbcfe9e
Show file tree
Hide file tree
Showing 8 changed files with 65 additions and 19 deletions.
10 changes: 6 additions & 4 deletions packages/client-sdk-nodejs/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion packages/client-sdk-nodejs/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@
"uuid": "8.3.2"
},
"dependencies": {
"@gomomento/generated-types": "0.113.0",
"@gomomento/generated-types": "0.119.2",
"@gomomento/sdk-core": "file:../core",
"@grpc/grpc-js": "1.10.9",
"@types/google-protobuf": "3.15.10",
Expand Down
16 changes: 11 additions & 5 deletions packages/client-sdk-nodejs/src/internal/pubsub-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -169,11 +169,13 @@ export class PubsubClient extends AbstractPubsubClient<ServiceError> {
topic: options.topicName,
resume_at_topic_sequence_number:
options.subscriptionState.resumeAtTopicSequenceNumber,
sequence_page: options.subscriptionState.lastTopicSequencePage,
});

this.getLogger().trace(
'Subscribing to topic with resume_at_topic_sequence_number: %s',
options.subscriptionState.resumeAtTopicSequenceNumber
'Subscribing to topic with resume_at_topic_sequence_number %s and sequence_page %s',
options.subscriptionState.resumeAtTopicSequenceNumber,
options.subscriptionState.lastTopicSequencePage ?? 'undefined'
);
const call = this.client.Subscribe(request, {
interceptors: this.streamingInterceptors,
Expand Down Expand Up @@ -210,11 +212,14 @@ export class PubsubClient extends AbstractPubsubClient<ServiceError> {

if (resp.item) {
const sequenceNumber = resp.item.topic_sequence_number;
const sequencePage = resp.item.sequence_page;
options.subscriptionState.lastTopicSequenceNumber = sequenceNumber;
options.subscriptionState.lastTopicSequencePage = sequencePage;
this.getLogger().trace(
'Received an item on subscription stream; topic: %s; sequence number: %s',
'Received an item on subscription stream; topic: %s; sequence number: %s; sequence page: %s',
truncateString(options.topicName),
sequenceNumber
sequenceNumber,
sequencePage
);
if (resp.item.value.text) {
options.onItem(
Expand Down Expand Up @@ -254,7 +259,8 @@ export class PubsubClient extends AbstractPubsubClient<ServiceError> {
options.onDiscontinuity(
new TopicDiscontinuity(
resp.discontinuity.last_topic_sequence,
resp.discontinuity.new_topic_sequence
resp.discontinuity.new_topic_sequence,
resp.discontinuity.new_sequence_page
)
);
} else {
Expand Down
10 changes: 6 additions & 4 deletions packages/client-sdk-web/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion packages/client-sdk-web/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@
"xhr2": "0.2.1"
},
"dependencies": {
"@gomomento/generated-types-webtext": "0.113.0",
"@gomomento/generated-types-webtext": "0.119.2",
"@gomomento/sdk-core": "file:../core",
"@types/google-protobuf": "3.15.6",
"google-protobuf": "3.21.2",
Expand Down
26 changes: 23 additions & 3 deletions packages/client-sdk-web/src/internal/pubsub-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,15 @@ export class PubsubClient<
request.setResumeAtTopicSequenceNumber(
options.subscriptionState.resumeAtTopicSequenceNumber
);
if (options.subscriptionState.lastTopicSequencePage !== undefined) {
request.setSequencePage(options.subscriptionState.lastTopicSequencePage);
}

this.getLogger().trace(
'Subscribing to topic with resume_at_topic_sequence_number %s and sequence_page %s',
options.subscriptionState.resumeAtTopicSequenceNumber,
options.subscriptionState.lastTopicSequencePage ?? 'undefined'
);

const call = this.client.subscribe(request, {
...this.clientMetadataProvider.createClientMetadata(),
Expand Down Expand Up @@ -179,10 +188,18 @@ export class PubsubClient<

if (item) {
const sequenceNumber = item.getTopicSequenceNumber();
const sequencePage = item.getSequencePage();
options.subscriptionState.lastTopicSequenceNumber = sequenceNumber;
options.subscriptionState.lastTopicSequencePage = sequencePage;
const publisherId = item.getPublisherId();
const itemText = item.getValue()?.getText();
const itemBinary = item.getValue()?.getBinary();
this.getLogger().trace(
'Received an item on subscription stream; topic: %s; sequence number: %s; sequence page: %s',
truncateString(options.topicName),
sequenceNumber,
sequencePage
);
if (itemText) {
options.onItem(
new TopicItem(itemText, sequenceNumber, {tokenId: publisherId})
Expand Down Expand Up @@ -211,13 +228,16 @@ export class PubsubClient<
options.onHeartbeat(new TopicHeartbeat());
} else if (discontinuity) {
this.getLogger().trace(
'Received discontinuity from subscription stream; topic: %s',
truncateString(options.topicName)
'Received a discontinuity; topic: %s; new sequence number: %s; new sequence page: %s',
truncateString(options.topicName),
discontinuity.getNewTopicSequence(),
discontinuity.getNewSequencePage()
);
options.onDiscontinuity(
new TopicDiscontinuity(
discontinuity.getLastTopicSequence(),
discontinuity.getNewTopicSequence()
discontinuity.getNewTopicSequence(),
discontinuity.getNewSequencePage()
)
);
} else {
Expand Down
2 changes: 2 additions & 0 deletions packages/core/src/internal/subscription-state.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
export class SubscriptionState {
private _unsubscribeFn: () => void;
public lastTopicSequenceNumber?: number;
public lastTopicSequencePage?: number;
private _isSubscribed: boolean;
constructor() {
this._unsubscribeFn = () => {
Expand Down Expand Up @@ -43,6 +44,7 @@ export class SubscriptionState {
return JSON.stringify(
{
lastTopicSequenceNumber: this.lastTopicSequenceNumber,
lastTopicSequencePage: this.lastTopicSequencePage,
isSubscribed: this._isSubscribed,
},
null,
Expand Down
16 changes: 15 additions & 1 deletion packages/core/src/messages/responses/topic-discontinuity.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,16 @@
export class TopicDiscontinuity {
private readonly _lastSequenceNumber: number;
private readonly _newSequenceNumber: number;
private readonly _newSequencePage: number;

constructor(_lastSequenceNumber: number, _newSequenceNumber: number) {
constructor(
_lastSequenceNumber: number,
_newSequenceNumber: number,
_newSequencePage: number
) {
this._lastSequenceNumber = _lastSequenceNumber;
this._newSequenceNumber = _newSequenceNumber;
this._newSequencePage = _newSequencePage;
}

/**
Expand All @@ -28,6 +34,14 @@ export class TopicDiscontinuity {
return this._newSequenceNumber;
}

/**
* Returns the new sequence page after the discontinuity.
* @returns number
*/
public newSequencePage(): number {
return this._newSequencePage;
}

public toString(): string {
const displayValue = `Last Sequence Number: ${this._lastSequenceNumber}; New Sequence Number: ${this._newSequenceNumber}`;
return `${this.constructor.name}: ${displayValue}`;
Expand Down

0 comments on commit dbcfe9e

Please sign in to comment.