Skip to content

Commit

Permalink
feat: support topics sequence page (#1463)
Browse files Browse the repository at this point in the history
* feat: support topics sequence page

* fix: forgot to export TopicDiscontinuity in web sdk

* add sequence page to topic discontinuity toString

* add resumeAtTopicSequencePage accessor

* use resumeAtTopicSequencePage and fix logger statements
  • Loading branch information
anitarua authored Nov 11, 2024
1 parent 44b5334 commit f32881a
Show file tree
Hide file tree
Showing 9 changed files with 88 additions and 38 deletions.
10 changes: 6 additions & 4 deletions packages/client-sdk-nodejs/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion packages/client-sdk-nodejs/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@
"uuid": "8.3.2"
},
"dependencies": {
"@gomomento/generated-types": "0.113.0",
"@gomomento/generated-types": "0.119.2",
"@gomomento/sdk-core": "file:../core",
"@grpc/grpc-js": "1.10.9",
"@types/google-protobuf": "3.15.10",
Expand Down
30 changes: 18 additions & 12 deletions packages/client-sdk-nodejs/src/internal/pubsub-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -169,11 +169,13 @@ export class PubsubClient extends AbstractPubsubClient<ServiceError> {
topic: options.topicName,
resume_at_topic_sequence_number:
options.subscriptionState.resumeAtTopicSequenceNumber,
sequence_page: options.subscriptionState.lastTopicSequencePage,
});

this.getLogger().trace(
'Subscribing to topic with resume_at_topic_sequence_number: %s',
options.subscriptionState.resumeAtTopicSequenceNumber
'Subscribing to topic with resume_at_topic_sequence_number %s and sequence_page %s',
options.subscriptionState.resumeAtTopicSequenceNumber,
options.subscriptionState.resumeAtTopicSequencePage
);
const call = this.client.Subscribe(request, {
interceptors: this.streamingInterceptors,
Expand Down Expand Up @@ -210,11 +212,14 @@ export class PubsubClient extends AbstractPubsubClient<ServiceError> {

if (resp.item) {
const sequenceNumber = resp.item.topic_sequence_number;
const sequencePage = resp.item.sequence_page;
options.subscriptionState.lastTopicSequenceNumber = sequenceNumber;
options.subscriptionState.lastTopicSequencePage = sequencePage;
this.getLogger().trace(
'Received an item on subscription stream; topic: %s; sequence number: %s',
'Received an item on subscription stream; topic: %s; sequence number: %s; sequence page: %s',
truncateString(options.topicName),
sequenceNumber
sequenceNumber,
sequencePage
);
if (resp.item.value.text) {
options.onItem(
Expand Down Expand Up @@ -247,16 +252,17 @@ export class PubsubClient extends AbstractPubsubClient<ServiceError> {
);
options.onHeartbeat(new TopicHeartbeat());
} else if (resp.discontinuity) {
this.getLogger().trace(
'Received discontinuity from subscription stream; topic: %s',
truncateString(options.topicName)
const topicDiscontinuity = new TopicDiscontinuity(
resp.discontinuity.last_topic_sequence,
resp.discontinuity.new_topic_sequence,
resp.discontinuity.new_sequence_page
);
options.onDiscontinuity(
new TopicDiscontinuity(
resp.discontinuity.last_topic_sequence,
resp.discontinuity.new_topic_sequence
)
this.getLogger().trace(
'Received discontinuity from subscription stream; topic: %s; %s',
truncateString(options.topicName),
topicDiscontinuity.toString()
);
options.onDiscontinuity(topicDiscontinuity);
} else {
this.getLogger().error(
'Received unknown subscription item; topic: %s',
Expand Down
10 changes: 6 additions & 4 deletions packages/client-sdk-web/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion packages/client-sdk-web/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@
"xhr2": "0.2.1"
},
"dependencies": {
"@gomomento/generated-types-webtext": "0.113.0",
"@gomomento/generated-types-webtext": "0.119.2",
"@gomomento/sdk-core": "file:../core",
"@types/google-protobuf": "3.15.6",
"google-protobuf": "3.21.2",
Expand Down
2 changes: 2 additions & 0 deletions packages/client-sdk-web/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ import * as CacheSetBatch from '@gomomento/sdk-core/dist/src/messages/responses/
import * as TopicPublish from '@gomomento/sdk-core/dist/src/messages/responses/topic-publish';
import * as TopicSubscribe from '@gomomento/sdk-core/dist/src/messages/responses/topic-subscribe';
import {TopicItem} from '@gomomento/sdk-core/dist/src/messages/responses/topic-item';
import {TopicDiscontinuity} from '@gomomento/sdk-core/dist/src/messages/responses/topic-discontinuity';

// AuthClient Response Types
import * as GenerateApiKey from '@gomomento/sdk-core/dist/src/messages/responses/generate-api-key';
Expand Down Expand Up @@ -284,6 +285,7 @@ export {
TopicClientConfiguration,
TopicClient,
TopicItem,
TopicDiscontinuity,
TopicPublish,
TopicSubscribe,
SubscribeCallOptions,
Expand Down
46 changes: 32 additions & 14 deletions packages/client-sdk-web/src/internal/pubsub-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,13 @@ export class PubsubClient<
request.setResumeAtTopicSequenceNumber(
options.subscriptionState.resumeAtTopicSequenceNumber
);
request.setSequencePage(
options.subscriptionState.resumeAtTopicSequencePage
);

this.getLogger().trace(
`Subscribing to topic with resume_at_topic_sequence_number ${options.subscriptionState.resumeAtTopicSequenceNumber} and sequence_page ${options.subscriptionState.resumeAtTopicSequencePage}`
);

const call = this.client.subscribe(request, {
...this.clientMetadataProvider.createClientMetadata(),
Expand Down Expand Up @@ -179,10 +186,17 @@ export class PubsubClient<

if (item) {
const sequenceNumber = item.getTopicSequenceNumber();
const sequencePage = item.getSequencePage();
options.subscriptionState.lastTopicSequenceNumber = sequenceNumber;
options.subscriptionState.lastTopicSequencePage = sequencePage;
const publisherId = item.getPublisherId();
const itemText = item.getValue()?.getText();
const itemBinary = item.getValue()?.getBinary();
this.getLogger().trace(
`Received an item on subscription stream; topic: ${truncateString(
options.topicName
)}; sequence number: ${sequenceNumber}; sequence page: ${sequencePage}`
);
if (itemText) {
options.onItem(
new TopicItem(itemText, sequenceNumber, {tokenId: publisherId})
Expand All @@ -193,8 +207,9 @@ export class PubsubClient<
);
} else {
this.getLogger().error(
'Received subscription item with unknown type; topic: %s',
truncateString(options.topicName)
`Received subscription item with unknown type; topic: ${truncateString(
options.topicName
)}`
);
options.onError(
new TopicSubscribe.Error(
Expand All @@ -205,25 +220,28 @@ export class PubsubClient<
}
} else if (resp.getHeartbeat()) {
this.getLogger().trace(
'Received heartbeat from subscription stream; topic: %s',
truncateString(options.topicName)
`Received heartbeat from subscription stream; topic: ${truncateString(
options.topicName
)}`
);
options.onHeartbeat(new TopicHeartbeat());
} else if (discontinuity) {
this.getLogger().trace(
'Received discontinuity from subscription stream; topic: %s',
truncateString(options.topicName)
const topicDiscontinuity = new TopicDiscontinuity(
discontinuity.getLastTopicSequence(),
discontinuity.getNewTopicSequence(),
discontinuity.getNewSequencePage()
);
options.onDiscontinuity(
new TopicDiscontinuity(
discontinuity.getLastTopicSequence(),
discontinuity.getNewTopicSequence()
)
this.getLogger().trace(
`Received a discontinuity; topic: ${truncateString(
options.topicName
)}; ${topicDiscontinuity.toString()}`
);
options.onDiscontinuity(topicDiscontinuity);
} else {
this.getLogger().error(
'Received unknown subscription item; topic: %s',
truncateString(options.topicName)
`Received unknown subscription item; topic: ${truncateString(
options.topicName
)}`
);
options.onError(
new TopicSubscribe.Error(new UnknownError('Unknown item type')),
Expand Down
6 changes: 6 additions & 0 deletions packages/core/src/internal/subscription-state.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
export class SubscriptionState {
private _unsubscribeFn: () => void;
public lastTopicSequenceNumber?: number;
public lastTopicSequencePage?: number;
private _isSubscribed: boolean;
constructor() {
this._unsubscribeFn = () => {
Expand All @@ -16,6 +17,10 @@ export class SubscriptionState {
return (this.lastTopicSequenceNumber ?? -1) + 1;
}

public get resumeAtTopicSequencePage(): number {
return this.lastTopicSequencePage ?? 0;
}

public setSubscribed(): void {
this._isSubscribed = true;
}
Expand Down Expand Up @@ -43,6 +48,7 @@ export class SubscriptionState {
return JSON.stringify(
{
lastTopicSequenceNumber: this.lastTopicSequenceNumber,
lastTopicSequencePage: this.lastTopicSequencePage,
isSubscribed: this._isSubscribed,
},
null,
Expand Down
18 changes: 16 additions & 2 deletions packages/core/src/messages/responses/topic-discontinuity.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,16 @@
export class TopicDiscontinuity {
private readonly _lastSequenceNumber: number;
private readonly _newSequenceNumber: number;
private readonly _newSequencePage: number;

constructor(_lastSequenceNumber: number, _newSequenceNumber: number) {
constructor(
_lastSequenceNumber: number,
_newSequenceNumber: number,
_newSequencePage: number
) {
this._lastSequenceNumber = _lastSequenceNumber;
this._newSequenceNumber = _newSequenceNumber;
this._newSequencePage = _newSequencePage;
}

/**
Expand All @@ -28,8 +34,16 @@ export class TopicDiscontinuity {
return this._newSequenceNumber;
}

/**
* Returns the new sequence page after the discontinuity.
* @returns number
*/
public newSequencePage(): number {
return this._newSequencePage;
}

public toString(): string {
const displayValue = `Last Sequence Number: ${this._lastSequenceNumber}; New Sequence Number: ${this._newSequenceNumber}`;
const displayValue = `Last Sequence Number: ${this._lastSequenceNumber}; New Sequence Number: ${this._newSequenceNumber}; New Sequence Page: ${this._newSequencePage}`;
return `${this.constructor.name}: ${displayValue}`;
}
}

0 comments on commit f32881a

Please sign in to comment.