Skip to content

Commit

Permalink
samples: various sample and doc improvements we had queued up (#1987)
Browse files Browse the repository at this point in the history
* samples: update publishing samples to clarify that topic objects should be cached; also fix a usage of publish()

* samples: convert publishWithRetrySettings to use veneer

* samples: update publishWithRetrySettings with new defaults; add comment about including all items

* docs: clarify what subscriber batching means

* samples: update EOD sample with endpoint

* samples: add comments about ordered publishing as well
  • Loading branch information
feywind authored Oct 18, 2024
1 parent 6e2c28a commit 858b565
Show file tree
Hide file tree
Showing 24 changed files with 155 additions and 120 deletions.
9 changes: 6 additions & 3 deletions samples/listenForMessagesWithExactlyOnceDelivery.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2022 Google LLC
// Copyright 2022-2024 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -38,8 +38,11 @@
// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();
// Pub/Sub's exactly once delivery guarantee only applies when subscribers connect to the service in the same region.
// For list of locational endpoints for Pub/Sub, see https://cloud.google.com/pubsub/docs/reference/service_apis_overview#list_of_locational_endpoints
const pubSubClient = new PubSub({
apiEndpoint: 'us-west1-pubsub.googleapis.com:443',
});

async function listenForMessagesWithExactlyOnceDelivery(
subscriptionNameOrId,
Expand Down
6 changes: 4 additions & 2 deletions samples/publishAvroRecords.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2019-2021 Google LLC
// Copyright 2019-2024 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -46,8 +46,10 @@ const fs = require('fs');
const pubSubClient = new PubSub();

async function publishAvroRecords(topicNameOrId) {
// Get the topic metadata to learn about its schema encoding.
// Cache topic objects (publishers) and reuse them.
const topic = pubSubClient.topic(topicNameOrId);

// Get the topic metadata to learn about its schema encoding.
const [topicMetadata] = await topic.getMetadata();
const topicSchemaMetadata = topicMetadata.schemaSettings;

Expand Down
3 changes: 2 additions & 1 deletion samples/publishBatchedMessages.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2019-2023 Google LLC
// Copyright 2019-2024 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -53,6 +53,7 @@ async function publishBatchedMessages(
// Publishes the message as a string, e.g. "Hello, world!" or JSON.stringify(someObject)
const dataBuffer = Buffer.from(data);

// Cache topic objects (publishers) and reuse them.
const publishOptions = {
batching: {
maxMessages: maxMessages,
Expand Down
9 changes: 5 additions & 4 deletions samples/publishMessage.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2019-2023 Google LLC
// Copyright 2019-2024 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -47,10 +47,11 @@ async function publishMessage(topicNameOrId, data) {
// Publishes the message as a string, e.g. "Hello, world!" or JSON.stringify(someObject)
const dataBuffer = Buffer.from(data);

// Cache topic objects (publishers) and reuse them.
const topic = pubSubClient.topic(topicNameOrId);

try {
const messageId = await pubSubClient
.topic(topicNameOrId)
.publishMessage({data: dataBuffer});
const messageId = topic.publishMessage({data: dataBuffer});
console.log(`Message ${messageId} published.`);
} catch (error) {
console.error(`Received error while publishing: ${error.message}`);
Expand Down
12 changes: 8 additions & 4 deletions samples/publishMessageWithCustomAttributes.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2019-2023 Google LLC
// Copyright 2019-2024 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -52,9 +52,13 @@ async function publishMessageWithCustomAttributes(topicNameOrId, data) {
username: 'gcp',
};

const messageId = await pubSubClient
.topic(topicNameOrId)
.publishMessage({data: dataBuffer, attributes: customAttributes});
// Cache topic objects (publishers) and reuse them.
const topic = pubSubClient.topic(topicNameOrId);

const messageId = topic.publishMessage({
data: dataBuffer,
attributes: customAttributes,
});
console.log(`Message ${messageId} published.`);
}
// [END pubsub_publish_custom_attributes]
Expand Down
12 changes: 8 additions & 4 deletions samples/publishOrderedMessage.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2019-2023 Google LLC
// Copyright 2019-2024 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -61,14 +61,18 @@ async function publishOrderedMessage(topicNameOrId, data, orderingKey) {
orderingKey: orderingKey,
};

// Cache topic objects (publishers) and reuse them.
//
// Pub/Sub's ordered delivery guarantee only applies when publishes for an ordering
// key are in the same region. For list of locational endpoints for Pub/Sub, see:
// https://cloud.google.com/pubsub/docs/reference/service_apis_overview#list_of_locational_endpoints
const publishOptions = {
messageOrdering: true,
};
const topic = pubSubClient.topic(topicNameOrId, publishOptions);

// Publishes the message
const messageId = await pubSubClient
.topic(topicNameOrId, publishOptions)
.publishMessage(message);
const messageId = topic.publishMessage(message);

console.log(`Message ${messageId} published.`);

Expand Down
8 changes: 5 additions & 3 deletions samples/publishProtobufMessages.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2019-2021 Google LLC
// Copyright 2019-2024 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -45,8 +45,10 @@ const protobuf = require('protobufjs');
const pubSubClient = new PubSub();

async function publishProtobufMessages(topicNameOrId) {
// Get the topic metadata to learn about its schema.
// Cache topic objects (publishers) and reuse them.
const topic = pubSubClient.topic(topicNameOrId);

// Get the topic metadata to learn about its schema.
const [topicMetadata] = await topic.getMetadata();
const topicSchemaMetadata = topicMetadata.schemaSettings;

Expand Down Expand Up @@ -87,7 +89,7 @@ async function publishProtobufMessages(topicNameOrId) {
return;
}

const messageId = await topic.publish(dataBuffer);
const messageId = await topic.publishMessage({data: dataBuffer});
console.log(`Protobuf message ${messageId} published.`);
}
// [END pubsub_publish_proto_messages]
Expand Down
4 changes: 2 additions & 2 deletions samples/publishWithFlowControl.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2021 Google LLC
// Copyright 2021-2024 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -50,7 +50,7 @@ async function publishWithFlowControl(topicNameOrId) {
},
};

// Get a publisher.
// Get a publisher. Cache topic objects (publishers) and reuse them.
const topic = pubSubClient.topic(topicNameOrId, options);

// For flow controlled publishing, we'll use a publisher flow controller
Expand Down
3 changes: 3 additions & 0 deletions samples/publishWithOpenTelemetryTracing.js
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,10 @@ async function publishMessage(topicNameOrId, data) {
// Publishes the message as a string, e.g. "Hello, world!"
// or JSON.stringify(someObject)
const dataBuffer = Buffer.from(data);

// Cache topic objects (publishers) and reuse them.
const publisher = pubSubClient.topic(topicNameOrId);

const messageId = await publisher.publishMessage({data: dataBuffer});
console.log(`Message ${messageId} published.`);

Expand Down
56 changes: 24 additions & 32 deletions samples/publishWithRetrySettings.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2019-2023 Google LLC
// Copyright 2019-2024 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -39,34 +39,20 @@

// Imports the Google Cloud client library. v1 is for the lower level
// proto access.
const {v1} = require('@google-cloud/pubsub');
const {PubSub} = require('@google-cloud/pubsub');

// Creates a publisher client.
const publisherClient = new v1.PublisherClient({
// optional auth parameters
});
async function publishWithRetrySettings(projectId, topicNameOrId, data) {
const formattedTopic = publisherClient.projectTopicPath(
projectId,
topicNameOrId
);

// Publishes the message as a string, e.g. "Hello, world!" or JSON.stringify(someObject)
const dataBuffer = Buffer.from(data);
const messagesElement = {
data: dataBuffer,
};
const messages = [messagesElement];

// Build the request
const request = {
topic: formattedTopic,
messages: messages,
};
async function publishWithRetrySettings(topicNameOrId, data) {
const pubsubClient = new PubSub();

// Retry settings control how the publisher handles retryable failures. Default values are shown.
// The `retryCodes` array determines which grpc errors will trigger an automatic retry.
// The `backoffSettings` object lets you specify the behaviour of retries over time.
//
// Reference this document to see the current defaults for publishing:
// https://github.com/googleapis/nodejs-pubsub/blob/6e2c28a9298a49dc1b194ce747ff5258c8df6deb/src/v1/publisher_client_config.json#L59
//
// Please note that _all_ items must be included when passing these settings to topic().
// Otherwise, unpredictable (incorrect) defaults may be assumed.
const retrySettings = {
retryCodes: [
10, // 'ABORTED'
Expand All @@ -83,36 +69,42 @@ async function publishWithRetrySettings(projectId, topicNameOrId, data) {
initialRetryDelayMillis: 100,
// The multiplier by which to increase the delay time between the completion
// of failed requests, and the initiation of the subsequent retrying request.
retryDelayMultiplier: 1.3,
retryDelayMultiplier: 4,
// The maximum delay time, in milliseconds, between requests.
// When this value is reached, retryDelayMultiplier will no longer be used to increase delay time.
maxRetryDelayMillis: 60000,
// The initial timeout parameter to the request.
initialRpcTimeoutMillis: 5000,
initialRpcTimeoutMillis: 60000,
// The multiplier by which to increase the timeout parameter between failed requests.
rpcTimeoutMultiplier: 1.0,
// The maximum timeout parameter, in milliseconds, for a request. When this value is reached,
// rpcTimeoutMultiplier will no longer be used to increase the timeout.
maxRpcTimeoutMillis: 600000,
maxRpcTimeoutMillis: 60000,
// The total time, in milliseconds, starting from when the initial request is sent,
// after which an error will be returned, regardless of the retrying attempts made meanwhile.
totalTimeoutMillis: 600000,
},
};

const [response] = await publisherClient.publish(request, {
retry: retrySettings,
// Cache topic objects (publishers) and reuse them.
const topic = pubsubClient.topic(topicNameOrId, {
gaxOpts: {
retry: retrySettings,
},
});
console.log(`Message ${response.messageIds} published.`);

// Publishes the message as a string, e.g. "Hello, world!" or JSON.stringify(someObject)
const dataBuffer = Buffer.from(data);
const messageId = await topic.publishMessage({data: dataBuffer});
console.log(`Message ${messageId} published.`);
}
// [END pubsub_publisher_retry_settings]

function main(
projectId = 'YOUR_PROJECT_ID',
topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID',
data = JSON.stringify({foo: 'bar'})
) {
publishWithRetrySettings(projectId, topicNameOrId, data).catch(err => {
publishWithRetrySettings(topicNameOrId, data).catch(err => {
console.error(err.message);
process.exitCode = 1;
});
Expand Down
10 changes: 8 additions & 2 deletions samples/resumePublish.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2019-2023 Google LLC
// Copyright 2019-2024 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -52,8 +52,14 @@ async function resumePublish(topicNameOrId, data, orderingKey) {
messageOrdering: true,
};

// Publishes the message
// Cache topic objects (publishers) and reuse them.
//
// Pub/Sub's ordered delivery guarantee only applies when publishes for an ordering
// key are in the same region. For list of locational endpoints for Pub/Sub, see:
// https://cloud.google.com/pubsub/docs/reference/service_apis_overview#list_of_locational_endpoints
const publisher = pubSubClient.topic(topicNameOrId, publishOptions);

// Publishes the message
try {
const message = {
data: dataBuffer,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2022 Google LLC
// Copyright 2022-2024 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -34,8 +34,11 @@
// Imports the Google Cloud client library
import {Message, PubSub, AckError} from '@google-cloud/pubsub';

// Creates a client; cache this for further use
const pubSubClient = new PubSub();
// Pub/Sub's exactly once delivery guarantee only applies when subscribers connect to the service in the same region.
// For list of locational endpoints for Pub/Sub, see https://cloud.google.com/pubsub/docs/reference/service_apis_overview#list_of_locational_endpoints
const pubSubClient = new PubSub({
apiEndpoint: 'us-west1-pubsub.googleapis.com:443',
});

async function listenForMessagesWithExactlyOnceDelivery(
subscriptionNameOrId: string,
Expand Down
6 changes: 4 additions & 2 deletions samples/typescript/publishAvroRecords.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2019-2021 Google LLC
// Copyright 2019-2024 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -47,8 +47,10 @@ interface ProvinceObject {
}

async function publishAvroRecords(topicNameOrId: string) {
// Get the topic metadata to learn about its schema encoding.
// Cache topic objects (publishers) and reuse them.
const topic = pubSubClient.topic(topicNameOrId);

// Get the topic metadata to learn about its schema encoding.
const [topicMetadata] = await topic.getMetadata();
const topicSchemaMetadata = topicMetadata.schemaSettings;

Expand Down
3 changes: 2 additions & 1 deletion samples/typescript/publishBatchedMessages.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2019-2023 Google LLC
// Copyright 2019-2024 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -49,6 +49,7 @@ async function publishBatchedMessages(
// Publishes the message as a string, e.g. "Hello, world!" or JSON.stringify(someObject)
const dataBuffer = Buffer.from(data);

// Cache topic objects (publishers) and reuse them.
const publishOptions: PublishOptions = {
batching: {
maxMessages: maxMessages,
Expand Down
9 changes: 5 additions & 4 deletions samples/typescript/publishMessage.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2019-2023 Google LLC
// Copyright 2019-2024 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -43,10 +43,11 @@ async function publishMessage(topicNameOrId: string, data: string) {
// Publishes the message as a string, e.g. "Hello, world!" or JSON.stringify(someObject)
const dataBuffer = Buffer.from(data);

// Cache topic objects (publishers) and reuse them.
const topic = pubSubClient.topic(topicNameOrId);

try {
const messageId = await pubSubClient
.topic(topicNameOrId)
.publishMessage({data: dataBuffer});
const messageId = topic.publishMessage({data: dataBuffer});
console.log(`Message ${messageId} published.`);
} catch (error) {
console.error(
Expand Down
12 changes: 8 additions & 4 deletions samples/typescript/publishMessageWithCustomAttributes.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2019-2023 Google LLC
// Copyright 2019-2024 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -55,9 +55,13 @@ async function publishMessageWithCustomAttributes(
username: 'gcp',
};

const messageId = await pubSubClient
.topic(topicNameOrId)
.publishMessage({data: dataBuffer, attributes: customAttributes});
// Cache topic objects (publishers) and reuse them.
const topic = pubSubClient.topic(topicNameOrId);

const messageId = topic.publishMessage({
data: dataBuffer,
attributes: customAttributes,
});
console.log(`Message ${messageId} published.`);
}
// [END pubsub_publish_custom_attributes]
Expand Down
Loading

0 comments on commit 858b565

Please sign in to comment.