Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support topics sequence page #1463

Merged
merged 5 commits into from
Nov 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions packages/client-sdk-nodejs/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion packages/client-sdk-nodejs/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@
"uuid": "8.3.2"
},
"dependencies": {
"@gomomento/generated-types": "0.113.0",
"@gomomento/generated-types": "0.119.2",
"@gomomento/sdk-core": "file:../core",
"@grpc/grpc-js": "1.10.9",
"@types/google-protobuf": "3.15.10",
Expand Down
30 changes: 18 additions & 12 deletions packages/client-sdk-nodejs/src/internal/pubsub-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -169,11 +169,13 @@ export class PubsubClient extends AbstractPubsubClient<ServiceError> {
topic: options.topicName,
resume_at_topic_sequence_number:
options.subscriptionState.resumeAtTopicSequenceNumber,
sequence_page: options.subscriptionState.lastTopicSequencePage,
});

this.getLogger().trace(
'Subscribing to topic with resume_at_topic_sequence_number: %s',
options.subscriptionState.resumeAtTopicSequenceNumber
'Subscribing to topic with resume_at_topic_sequence_number %s and sequence_page %s',
options.subscriptionState.resumeAtTopicSequenceNumber,
options.subscriptionState.resumeAtTopicSequencePage
);
const call = this.client.Subscribe(request, {
interceptors: this.streamingInterceptors,
Expand Down Expand Up @@ -210,11 +212,14 @@ export class PubsubClient extends AbstractPubsubClient<ServiceError> {

if (resp.item) {
const sequenceNumber = resp.item.topic_sequence_number;
const sequencePage = resp.item.sequence_page;
options.subscriptionState.lastTopicSequenceNumber = sequenceNumber;
options.subscriptionState.lastTopicSequencePage = sequencePage;
this.getLogger().trace(
'Received an item on subscription stream; topic: %s; sequence number: %s',
'Received an item on subscription stream; topic: %s; sequence number: %s; sequence page: %s',
truncateString(options.topicName),
sequenceNumber
sequenceNumber,
sequencePage
);
if (resp.item.value.text) {
options.onItem(
Expand Down Expand Up @@ -247,16 +252,17 @@ export class PubsubClient extends AbstractPubsubClient<ServiceError> {
);
options.onHeartbeat(new TopicHeartbeat());
} else if (resp.discontinuity) {
this.getLogger().trace(
'Received discontinuity from subscription stream; topic: %s',
truncateString(options.topicName)
const topicDiscontinuity = new TopicDiscontinuity(
resp.discontinuity.last_topic_sequence,
resp.discontinuity.new_topic_sequence,
resp.discontinuity.new_sequence_page
);
options.onDiscontinuity(
new TopicDiscontinuity(
resp.discontinuity.last_topic_sequence,
resp.discontinuity.new_topic_sequence
)
this.getLogger().trace(
'Received discontinuity from subscription stream; topic: %s; %s',
truncateString(options.topicName),
topicDiscontinuity.toString()
);
options.onDiscontinuity(topicDiscontinuity);
} else {
this.getLogger().error(
'Received unknown subscription item; topic: %s',
Expand Down
10 changes: 6 additions & 4 deletions packages/client-sdk-web/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion packages/client-sdk-web/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@
"xhr2": "0.2.1"
},
"dependencies": {
"@gomomento/generated-types-webtext": "0.113.0",
"@gomomento/generated-types-webtext": "0.119.2",
"@gomomento/sdk-core": "file:../core",
"@types/google-protobuf": "3.15.6",
"google-protobuf": "3.21.2",
Expand Down
2 changes: 2 additions & 0 deletions packages/client-sdk-web/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ import * as CacheSetBatch from '@gomomento/sdk-core/dist/src/messages/responses/
import * as TopicPublish from '@gomomento/sdk-core/dist/src/messages/responses/topic-publish';
import * as TopicSubscribe from '@gomomento/sdk-core/dist/src/messages/responses/topic-subscribe';
import {TopicItem} from '@gomomento/sdk-core/dist/src/messages/responses/topic-item';
import {TopicDiscontinuity} from '@gomomento/sdk-core/dist/src/messages/responses/topic-discontinuity';

// AuthClient Response Types
import * as GenerateApiKey from '@gomomento/sdk-core/dist/src/messages/responses/generate-api-key';
Expand Down Expand Up @@ -284,6 +285,7 @@ export {
TopicClientConfiguration,
TopicClient,
TopicItem,
TopicDiscontinuity,
anitarua marked this conversation as resolved.
Show resolved Hide resolved
TopicPublish,
TopicSubscribe,
SubscribeCallOptions,
Expand Down
46 changes: 32 additions & 14 deletions packages/client-sdk-web/src/internal/pubsub-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,13 @@ export class PubsubClient<
request.setResumeAtTopicSequenceNumber(
options.subscriptionState.resumeAtTopicSequenceNumber
);
request.setSequencePage(
options.subscriptionState.resumeAtTopicSequencePage
);

this.getLogger().trace(
`Subscribing to topic with resume_at_topic_sequence_number ${options.subscriptionState.resumeAtTopicSequenceNumber} and sequence_page ${options.subscriptionState.resumeAtTopicSequencePage}`
);
Comment on lines +141 to +143
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

while testing locally, I noticed the logs coming from the web sdk looked poorly formatted, reverted to using the string interpolation instead

[2024-11-08T18:08:38.251Z] TRACE (Momento: PubsubClient): Issuing publish request; topic: %s, message length: %stopic-web,15
[2024-11-08T18:08:38.436Z] TRACE (Momento: PubsubClient): Received an item on subscription stream; topic: %s; sequence number: %s; sequence page: %stopic-web,3,1731089315


const call = this.client.subscribe(request, {
...this.clientMetadataProvider.createClientMetadata(),
Expand Down Expand Up @@ -179,10 +186,17 @@ export class PubsubClient<

if (item) {
const sequenceNumber = item.getTopicSequenceNumber();
const sequencePage = item.getSequencePage();
options.subscriptionState.lastTopicSequenceNumber = sequenceNumber;
options.subscriptionState.lastTopicSequencePage = sequencePage;
const publisherId = item.getPublisherId();
const itemText = item.getValue()?.getText();
const itemBinary = item.getValue()?.getBinary();
this.getLogger().trace(
`Received an item on subscription stream; topic: ${truncateString(
options.topicName
)}; sequence number: ${sequenceNumber}; sequence page: ${sequencePage}`
);
if (itemText) {
options.onItem(
new TopicItem(itemText, sequenceNumber, {tokenId: publisherId})
Expand All @@ -193,8 +207,9 @@ export class PubsubClient<
);
} else {
this.getLogger().error(
'Received subscription item with unknown type; topic: %s',
truncateString(options.topicName)
`Received subscription item with unknown type; topic: ${truncateString(
options.topicName
)}`
);
options.onError(
new TopicSubscribe.Error(
Expand All @@ -205,25 +220,28 @@ export class PubsubClient<
}
} else if (resp.getHeartbeat()) {
this.getLogger().trace(
'Received heartbeat from subscription stream; topic: %s',
truncateString(options.topicName)
`Received heartbeat from subscription stream; topic: ${truncateString(
options.topicName
)}`
);
options.onHeartbeat(new TopicHeartbeat());
} else if (discontinuity) {
this.getLogger().trace(
'Received discontinuity from subscription stream; topic: %s',
truncateString(options.topicName)
const topicDiscontinuity = new TopicDiscontinuity(
discontinuity.getLastTopicSequence(),
discontinuity.getNewTopicSequence(),
discontinuity.getNewSequencePage()
);
options.onDiscontinuity(
new TopicDiscontinuity(
discontinuity.getLastTopicSequence(),
discontinuity.getNewTopicSequence()
)
this.getLogger().trace(
`Received a discontinuity; topic: ${truncateString(
options.topicName
)}; ${topicDiscontinuity.toString()}`
);
options.onDiscontinuity(topicDiscontinuity);
} else {
this.getLogger().error(
'Received unknown subscription item; topic: %s',
truncateString(options.topicName)
`Received unknown subscription item; topic: ${truncateString(
options.topicName
)}`
);
options.onError(
new TopicSubscribe.Error(new UnknownError('Unknown item type')),
Expand Down
6 changes: 6 additions & 0 deletions packages/core/src/internal/subscription-state.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
export class SubscriptionState {
private _unsubscribeFn: () => void;
public lastTopicSequenceNumber?: number;
public lastTopicSequencePage?: number;
private _isSubscribed: boolean;
constructor() {
this._unsubscribeFn = () => {
Expand All @@ -16,6 +17,10 @@ export class SubscriptionState {
return (this.lastTopicSequenceNumber ?? -1) + 1;
}

public get resumeAtTopicSequencePage(): number {
return this.lastTopicSequencePage ?? 0;
}

public setSubscribed(): void {
this._isSubscribed = true;
}
Expand Down Expand Up @@ -43,6 +48,7 @@ export class SubscriptionState {
return JSON.stringify(
{
lastTopicSequenceNumber: this.lastTopicSequenceNumber,
lastTopicSequencePage: this.lastTopicSequencePage,
isSubscribed: this._isSubscribed,
},
null,
Expand Down
18 changes: 16 additions & 2 deletions packages/core/src/messages/responses/topic-discontinuity.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,16 @@
export class TopicDiscontinuity {
private readonly _lastSequenceNumber: number;
private readonly _newSequenceNumber: number;
private readonly _newSequencePage: number;

constructor(_lastSequenceNumber: number, _newSequenceNumber: number) {
constructor(
_lastSequenceNumber: number,
_newSequenceNumber: number,
_newSequencePage: number
) {
this._lastSequenceNumber = _lastSequenceNumber;
this._newSequenceNumber = _newSequenceNumber;
this._newSequencePage = _newSequencePage;
}

/**
Expand All @@ -28,8 +34,16 @@ export class TopicDiscontinuity {
return this._newSequenceNumber;
}

/**
* Returns the new sequence page after the discontinuity.
* @returns number
*/
public newSequencePage(): number {
return this._newSequencePage;
}

public toString(): string {
const displayValue = `Last Sequence Number: ${this._lastSequenceNumber}; New Sequence Number: ${this._newSequenceNumber}`;
const displayValue = `Last Sequence Number: ${this._lastSequenceNumber}; New Sequence Number: ${this._newSequenceNumber}; New Sequence Page: ${this._newSequencePage}`;
return `${this.constructor.name}: ${displayValue}`;
}
}
Loading