From 858b565077494072522b0147ae54c113e5c2610b Mon Sep 17 00:00:00 2001 From: Megan Potter <57276408+feywind@users.noreply.github.com> Date: Fri, 18 Oct 2024 15:07:44 -0400 Subject: [PATCH] samples: various sample and doc improvements we had queued up (#1987) * samples: update publishing samples to clarify that topic objects should be cached; also fix a usage of publish() * samples: convert publishWithRetrySettings to use veneer * samples: update publishWithRetrySettings with new defaults; add comment about including all items * docs: clarify what subscriber batching means * samples: update EOD sample with endpoint * samples: add comments about ordered publishing as well --- ...istenForMessagesWithExactlyOnceDelivery.js | 9 ++- samples/publishAvroRecords.js | 6 +- samples/publishBatchedMessages.js | 3 +- samples/publishMessage.js | 9 +-- samples/publishMessageWithCustomAttributes.js | 12 ++-- samples/publishOrderedMessage.js | 12 ++-- samples/publishProtobufMessages.js | 8 ++- samples/publishWithFlowControl.js | 4 +- samples/publishWithOpenTelemetryTracing.js | 3 + samples/publishWithRetrySettings.js | 56 ++++++++--------- samples/resumePublish.js | 10 ++- ...istenForMessagesWithExactlyOnceDelivery.ts | 9 ++- samples/typescript/publishAvroRecords.ts | 6 +- samples/typescript/publishBatchedMessages.ts | 3 +- samples/typescript/publishMessage.ts | 9 +-- .../publishMessageWithCustomAttributes.ts | 12 ++-- samples/typescript/publishOrderedMessage.ts | 12 ++-- samples/typescript/publishProtobufMessages.ts | 8 ++- samples/typescript/publishWithFlowControl.ts | 4 +- .../publishWithOpenTelemetryTracing.ts | 3 + .../typescript/publishWithRetrySettings.ts | 61 ++++++++----------- samples/typescript/resumePublish.ts | 10 ++- src/message-queues.ts | 3 + src/subscriber.ts | 3 +- 24 files changed, 155 insertions(+), 120 deletions(-) diff --git a/samples/listenForMessagesWithExactlyOnceDelivery.js b/samples/listenForMessagesWithExactlyOnceDelivery.js index 1027761a0..a5d549f84 100644 --- a/samples/listenForMessagesWithExactlyOnceDelivery.js +++ b/samples/listenForMessagesWithExactlyOnceDelivery.js @@ -1,4 +1,4 @@ -// Copyright 2022 Google LLC +// Copyright 2022-2024 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -38,8 +38,11 @@ // Imports the Google Cloud client library const {PubSub} = require('@google-cloud/pubsub'); -// Creates a client; cache this for further use -const pubSubClient = new PubSub(); +// Pub/Sub's exactly once delivery guarantee only applies when subscribers connect to the service in the same region. +// For list of locational endpoints for Pub/Sub, see https://cloud.google.com/pubsub/docs/reference/service_apis_overview#list_of_locational_endpoints +const pubSubClient = new PubSub({ + apiEndpoint: 'us-west1-pubsub.googleapis.com:443', +}); async function listenForMessagesWithExactlyOnceDelivery( subscriptionNameOrId, diff --git a/samples/publishAvroRecords.js b/samples/publishAvroRecords.js index f40f3e418..053b9a06b 100644 --- a/samples/publishAvroRecords.js +++ b/samples/publishAvroRecords.js @@ -1,4 +1,4 @@ -// Copyright 2019-2021 Google LLC +// Copyright 2019-2024 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -46,8 +46,10 @@ const fs = require('fs'); const pubSubClient = new PubSub(); async function publishAvroRecords(topicNameOrId) { - // Get the topic metadata to learn about its schema encoding. + // Cache topic objects (publishers) and reuse them. const topic = pubSubClient.topic(topicNameOrId); + + // Get the topic metadata to learn about its schema encoding. const [topicMetadata] = await topic.getMetadata(); const topicSchemaMetadata = topicMetadata.schemaSettings; diff --git a/samples/publishBatchedMessages.js b/samples/publishBatchedMessages.js index 0a45a0880..a5f99b242 100644 --- a/samples/publishBatchedMessages.js +++ b/samples/publishBatchedMessages.js @@ -1,4 +1,4 @@ -// Copyright 2019-2023 Google LLC +// Copyright 2019-2024 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -53,6 +53,7 @@ async function publishBatchedMessages( // Publishes the message as a string, e.g. "Hello, world!" or JSON.stringify(someObject) const dataBuffer = Buffer.from(data); + // Cache topic objects (publishers) and reuse them. const publishOptions = { batching: { maxMessages: maxMessages, diff --git a/samples/publishMessage.js b/samples/publishMessage.js index 2436b07fb..6b0581d08 100644 --- a/samples/publishMessage.js +++ b/samples/publishMessage.js @@ -1,4 +1,4 @@ -// Copyright 2019-2023 Google LLC +// Copyright 2019-2024 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -47,10 +47,11 @@ async function publishMessage(topicNameOrId, data) { // Publishes the message as a string, e.g. "Hello, world!" or JSON.stringify(someObject) const dataBuffer = Buffer.from(data); + // Cache topic objects (publishers) and reuse them. + const topic = pubSubClient.topic(topicNameOrId); + try { - const messageId = await pubSubClient - .topic(topicNameOrId) - .publishMessage({data: dataBuffer}); + const messageId = topic.publishMessage({data: dataBuffer}); console.log(`Message ${messageId} published.`); } catch (error) { console.error(`Received error while publishing: ${error.message}`); diff --git a/samples/publishMessageWithCustomAttributes.js b/samples/publishMessageWithCustomAttributes.js index 3c2e6b43a..224a9ce0d 100644 --- a/samples/publishMessageWithCustomAttributes.js +++ b/samples/publishMessageWithCustomAttributes.js @@ -1,4 +1,4 @@ -// Copyright 2019-2023 Google LLC +// Copyright 2019-2024 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -52,9 +52,13 @@ async function publishMessageWithCustomAttributes(topicNameOrId, data) { username: 'gcp', }; - const messageId = await pubSubClient - .topic(topicNameOrId) - .publishMessage({data: dataBuffer, attributes: customAttributes}); + // Cache topic objects (publishers) and reuse them. + const topic = pubSubClient.topic(topicNameOrId); + + const messageId = topic.publishMessage({ + data: dataBuffer, + attributes: customAttributes, + }); console.log(`Message ${messageId} published.`); } // [END pubsub_publish_custom_attributes] diff --git a/samples/publishOrderedMessage.js b/samples/publishOrderedMessage.js index bb4e0cc4d..19761b6ad 100644 --- a/samples/publishOrderedMessage.js +++ b/samples/publishOrderedMessage.js @@ -1,4 +1,4 @@ -// Copyright 2019-2023 Google LLC +// Copyright 2019-2024 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -61,14 +61,18 @@ async function publishOrderedMessage(topicNameOrId, data, orderingKey) { orderingKey: orderingKey, }; + // Cache topic objects (publishers) and reuse them. + // + // Pub/Sub's ordered delivery guarantee only applies when publishes for an ordering + // key are in the same region. For list of locational endpoints for Pub/Sub, see: + // https://cloud.google.com/pubsub/docs/reference/service_apis_overview#list_of_locational_endpoints const publishOptions = { messageOrdering: true, }; + const topic = pubSubClient.topic(topicNameOrId, publishOptions); // Publishes the message - const messageId = await pubSubClient - .topic(topicNameOrId, publishOptions) - .publishMessage(message); + const messageId = topic.publishMessage(message); console.log(`Message ${messageId} published.`); diff --git a/samples/publishProtobufMessages.js b/samples/publishProtobufMessages.js index bab100563..f486f586f 100644 --- a/samples/publishProtobufMessages.js +++ b/samples/publishProtobufMessages.js @@ -1,4 +1,4 @@ -// Copyright 2019-2021 Google LLC +// Copyright 2019-2024 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -45,8 +45,10 @@ const protobuf = require('protobufjs'); const pubSubClient = new PubSub(); async function publishProtobufMessages(topicNameOrId) { - // Get the topic metadata to learn about its schema. + // Cache topic objects (publishers) and reuse them. const topic = pubSubClient.topic(topicNameOrId); + + // Get the topic metadata to learn about its schema. const [topicMetadata] = await topic.getMetadata(); const topicSchemaMetadata = topicMetadata.schemaSettings; @@ -87,7 +89,7 @@ async function publishProtobufMessages(topicNameOrId) { return; } - const messageId = await topic.publish(dataBuffer); + const messageId = await topic.publishMessage({data: dataBuffer}); console.log(`Protobuf message ${messageId} published.`); } // [END pubsub_publish_proto_messages] diff --git a/samples/publishWithFlowControl.js b/samples/publishWithFlowControl.js index 7e1193f79..9c914e2d2 100644 --- a/samples/publishWithFlowControl.js +++ b/samples/publishWithFlowControl.js @@ -1,4 +1,4 @@ -// Copyright 2021 Google LLC +// Copyright 2021-2024 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -50,7 +50,7 @@ async function publishWithFlowControl(topicNameOrId) { }, }; - // Get a publisher. + // Get a publisher. Cache topic objects (publishers) and reuse them. const topic = pubSubClient.topic(topicNameOrId, options); // For flow controlled publishing, we'll use a publisher flow controller diff --git a/samples/publishWithOpenTelemetryTracing.js b/samples/publishWithOpenTelemetryTracing.js index 42b529739..71d030c96 100644 --- a/samples/publishWithOpenTelemetryTracing.js +++ b/samples/publishWithOpenTelemetryTracing.js @@ -86,7 +86,10 @@ async function publishMessage(topicNameOrId, data) { // Publishes the message as a string, e.g. "Hello, world!" // or JSON.stringify(someObject) const dataBuffer = Buffer.from(data); + + // Cache topic objects (publishers) and reuse them. const publisher = pubSubClient.topic(topicNameOrId); + const messageId = await publisher.publishMessage({data: dataBuffer}); console.log(`Message ${messageId} published.`); diff --git a/samples/publishWithRetrySettings.js b/samples/publishWithRetrySettings.js index 3c50b5a99..24738b7e0 100644 --- a/samples/publishWithRetrySettings.js +++ b/samples/publishWithRetrySettings.js @@ -1,4 +1,4 @@ -// Copyright 2019-2023 Google LLC +// Copyright 2019-2024 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -39,34 +39,20 @@ // Imports the Google Cloud client library. v1 is for the lower level // proto access. -const {v1} = require('@google-cloud/pubsub'); +const {PubSub} = require('@google-cloud/pubsub'); -// Creates a publisher client. -const publisherClient = new v1.PublisherClient({ - // optional auth parameters -}); -async function publishWithRetrySettings(projectId, topicNameOrId, data) { - const formattedTopic = publisherClient.projectTopicPath( - projectId, - topicNameOrId - ); - - // Publishes the message as a string, e.g. "Hello, world!" or JSON.stringify(someObject) - const dataBuffer = Buffer.from(data); - const messagesElement = { - data: dataBuffer, - }; - const messages = [messagesElement]; - - // Build the request - const request = { - topic: formattedTopic, - messages: messages, - }; +async function publishWithRetrySettings(topicNameOrId, data) { + const pubsubClient = new PubSub(); // Retry settings control how the publisher handles retryable failures. Default values are shown. // The `retryCodes` array determines which grpc errors will trigger an automatic retry. // The `backoffSettings` object lets you specify the behaviour of retries over time. + // + // Reference this document to see the current defaults for publishing: + // https://github.com/googleapis/nodejs-pubsub/blob/6e2c28a9298a49dc1b194ce747ff5258c8df6deb/src/v1/publisher_client_config.json#L59 + // + // Please note that _all_ items must be included when passing these settings to topic(). + // Otherwise, unpredictable (incorrect) defaults may be assumed. const retrySettings = { retryCodes: [ 10, // 'ABORTED' @@ -83,36 +69,42 @@ async function publishWithRetrySettings(projectId, topicNameOrId, data) { initialRetryDelayMillis: 100, // The multiplier by which to increase the delay time between the completion // of failed requests, and the initiation of the subsequent retrying request. - retryDelayMultiplier: 1.3, + retryDelayMultiplier: 4, // The maximum delay time, in milliseconds, between requests. // When this value is reached, retryDelayMultiplier will no longer be used to increase delay time. maxRetryDelayMillis: 60000, // The initial timeout parameter to the request. - initialRpcTimeoutMillis: 5000, + initialRpcTimeoutMillis: 60000, // The multiplier by which to increase the timeout parameter between failed requests. rpcTimeoutMultiplier: 1.0, // The maximum timeout parameter, in milliseconds, for a request. When this value is reached, // rpcTimeoutMultiplier will no longer be used to increase the timeout. - maxRpcTimeoutMillis: 600000, + maxRpcTimeoutMillis: 60000, // The total time, in milliseconds, starting from when the initial request is sent, // after which an error will be returned, regardless of the retrying attempts made meanwhile. totalTimeoutMillis: 600000, }, }; - const [response] = await publisherClient.publish(request, { - retry: retrySettings, + // Cache topic objects (publishers) and reuse them. + const topic = pubsubClient.topic(topicNameOrId, { + gaxOpts: { + retry: retrySettings, + }, }); - console.log(`Message ${response.messageIds} published.`); + + // Publishes the message as a string, e.g. "Hello, world!" or JSON.stringify(someObject) + const dataBuffer = Buffer.from(data); + const messageId = await topic.publishMessage({data: dataBuffer}); + console.log(`Message ${messageId} published.`); } // [END pubsub_publisher_retry_settings] function main( - projectId = 'YOUR_PROJECT_ID', topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID', data = JSON.stringify({foo: 'bar'}) ) { - publishWithRetrySettings(projectId, topicNameOrId, data).catch(err => { + publishWithRetrySettings(topicNameOrId, data).catch(err => { console.error(err.message); process.exitCode = 1; }); diff --git a/samples/resumePublish.js b/samples/resumePublish.js index 3dbe4b242..5d66e712f 100644 --- a/samples/resumePublish.js +++ b/samples/resumePublish.js @@ -1,4 +1,4 @@ -// Copyright 2019-2023 Google LLC +// Copyright 2019-2024 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -52,8 +52,14 @@ async function resumePublish(topicNameOrId, data, orderingKey) { messageOrdering: true, }; - // Publishes the message + // Cache topic objects (publishers) and reuse them. + // + // Pub/Sub's ordered delivery guarantee only applies when publishes for an ordering + // key are in the same region. For list of locational endpoints for Pub/Sub, see: + // https://cloud.google.com/pubsub/docs/reference/service_apis_overview#list_of_locational_endpoints const publisher = pubSubClient.topic(topicNameOrId, publishOptions); + + // Publishes the message try { const message = { data: dataBuffer, diff --git a/samples/typescript/listenForMessagesWithExactlyOnceDelivery.ts b/samples/typescript/listenForMessagesWithExactlyOnceDelivery.ts index 618ba9a30..9194c0b6f 100644 --- a/samples/typescript/listenForMessagesWithExactlyOnceDelivery.ts +++ b/samples/typescript/listenForMessagesWithExactlyOnceDelivery.ts @@ -1,4 +1,4 @@ -// Copyright 2022 Google LLC +// Copyright 2022-2024 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -34,8 +34,11 @@ // Imports the Google Cloud client library import {Message, PubSub, AckError} from '@google-cloud/pubsub'; -// Creates a client; cache this for further use -const pubSubClient = new PubSub(); +// Pub/Sub's exactly once delivery guarantee only applies when subscribers connect to the service in the same region. +// For list of locational endpoints for Pub/Sub, see https://cloud.google.com/pubsub/docs/reference/service_apis_overview#list_of_locational_endpoints +const pubSubClient = new PubSub({ + apiEndpoint: 'us-west1-pubsub.googleapis.com:443', +}); async function listenForMessagesWithExactlyOnceDelivery( subscriptionNameOrId: string, diff --git a/samples/typescript/publishAvroRecords.ts b/samples/typescript/publishAvroRecords.ts index d269f5479..9725f9af3 100644 --- a/samples/typescript/publishAvroRecords.ts +++ b/samples/typescript/publishAvroRecords.ts @@ -1,4 +1,4 @@ -// Copyright 2019-2021 Google LLC +// Copyright 2019-2024 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -47,8 +47,10 @@ interface ProvinceObject { } async function publishAvroRecords(topicNameOrId: string) { - // Get the topic metadata to learn about its schema encoding. + // Cache topic objects (publishers) and reuse them. const topic = pubSubClient.topic(topicNameOrId); + + // Get the topic metadata to learn about its schema encoding. const [topicMetadata] = await topic.getMetadata(); const topicSchemaMetadata = topicMetadata.schemaSettings; diff --git a/samples/typescript/publishBatchedMessages.ts b/samples/typescript/publishBatchedMessages.ts index 5f7f76cbc..4d5157beb 100644 --- a/samples/typescript/publishBatchedMessages.ts +++ b/samples/typescript/publishBatchedMessages.ts @@ -1,4 +1,4 @@ -// Copyright 2019-2023 Google LLC +// Copyright 2019-2024 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -49,6 +49,7 @@ async function publishBatchedMessages( // Publishes the message as a string, e.g. "Hello, world!" or JSON.stringify(someObject) const dataBuffer = Buffer.from(data); + // Cache topic objects (publishers) and reuse them. const publishOptions: PublishOptions = { batching: { maxMessages: maxMessages, diff --git a/samples/typescript/publishMessage.ts b/samples/typescript/publishMessage.ts index 86b9e7562..eb7526143 100644 --- a/samples/typescript/publishMessage.ts +++ b/samples/typescript/publishMessage.ts @@ -1,4 +1,4 @@ -// Copyright 2019-2023 Google LLC +// Copyright 2019-2024 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -43,10 +43,11 @@ async function publishMessage(topicNameOrId: string, data: string) { // Publishes the message as a string, e.g. "Hello, world!" or JSON.stringify(someObject) const dataBuffer = Buffer.from(data); + // Cache topic objects (publishers) and reuse them. + const topic = pubSubClient.topic(topicNameOrId); + try { - const messageId = await pubSubClient - .topic(topicNameOrId) - .publishMessage({data: dataBuffer}); + const messageId = topic.publishMessage({data: dataBuffer}); console.log(`Message ${messageId} published.`); } catch (error) { console.error( diff --git a/samples/typescript/publishMessageWithCustomAttributes.ts b/samples/typescript/publishMessageWithCustomAttributes.ts index 9b6619377..df285cb29 100644 --- a/samples/typescript/publishMessageWithCustomAttributes.ts +++ b/samples/typescript/publishMessageWithCustomAttributes.ts @@ -1,4 +1,4 @@ -// Copyright 2019-2023 Google LLC +// Copyright 2019-2024 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -55,9 +55,13 @@ async function publishMessageWithCustomAttributes( username: 'gcp', }; - const messageId = await pubSubClient - .topic(topicNameOrId) - .publishMessage({data: dataBuffer, attributes: customAttributes}); + // Cache topic objects (publishers) and reuse them. + const topic = pubSubClient.topic(topicNameOrId); + + const messageId = topic.publishMessage({ + data: dataBuffer, + attributes: customAttributes, + }); console.log(`Message ${messageId} published.`); } // [END pubsub_publish_custom_attributes] diff --git a/samples/typescript/publishOrderedMessage.ts b/samples/typescript/publishOrderedMessage.ts index 30187a0c2..0fb083208 100644 --- a/samples/typescript/publishOrderedMessage.ts +++ b/samples/typescript/publishOrderedMessage.ts @@ -1,4 +1,4 @@ -// Copyright 2019-2023 Google LLC +// Copyright 2019-2024 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -61,14 +61,18 @@ async function publishOrderedMessage( orderingKey: orderingKey, }; + // Cache topic objects (publishers) and reuse them. + // + // Pub/Sub's ordered delivery guarantee only applies when publishes for an ordering + // key are in the same region. For list of locational endpoints for Pub/Sub, see: + // https://cloud.google.com/pubsub/docs/reference/service_apis_overview#list_of_locational_endpoints const publishOptions: PublishOptions = { messageOrdering: true, }; + const topic = pubSubClient.topic(topicNameOrId, publishOptions); // Publishes the message - const messageId = await pubSubClient - .topic(topicNameOrId, publishOptions) - .publishMessage(message); + const messageId = topic.publishMessage(message); console.log(`Message ${messageId} published.`); diff --git a/samples/typescript/publishProtobufMessages.ts b/samples/typescript/publishProtobufMessages.ts index 0ff9ba64b..4947352bd 100644 --- a/samples/typescript/publishProtobufMessages.ts +++ b/samples/typescript/publishProtobufMessages.ts @@ -1,4 +1,4 @@ -// Copyright 2019-2021 Google LLC +// Copyright 2019-2024 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -46,8 +46,10 @@ interface ProvinceObject { } async function publishProtobufMessages(topicNameOrId: string) { - // Get the topic metadata to learn about its schema. + // Cache topic objects (publishers) and reuse them. const topic = pubSubClient.topic(topicNameOrId); + + // Get the topic metadata to learn about its schema. const [topicMetadata] = await topic.getMetadata(); const topicSchemaMetadata = topicMetadata.schemaSettings; @@ -88,7 +90,7 @@ async function publishProtobufMessages(topicNameOrId: string) { return; } - const messageId = await topic.publish(dataBuffer); + const messageId = await topic.publishMessage({data: dataBuffer}); console.log(`Protobuf message ${messageId} published.`); } // [END pubsub_publish_proto_messages] diff --git a/samples/typescript/publishWithFlowControl.ts b/samples/typescript/publishWithFlowControl.ts index 2d08a0831..c8c991a60 100644 --- a/samples/typescript/publishWithFlowControl.ts +++ b/samples/typescript/publishWithFlowControl.ts @@ -1,4 +1,4 @@ -// Copyright 2021 Google LLC +// Copyright 2021-2024 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -46,7 +46,7 @@ async function publishWithFlowControl(topicNameOrId: string) { }, }; - // Get a publisher. + // Get a publisher. Cache topic objects (publishers) and reuse them. const topic = pubSubClient.topic(topicNameOrId, options); // For flow controlled publishing, we'll use a publisher flow controller diff --git a/samples/typescript/publishWithOpenTelemetryTracing.ts b/samples/typescript/publishWithOpenTelemetryTracing.ts index cd7a82b3a..a92982976 100644 --- a/samples/typescript/publishWithOpenTelemetryTracing.ts +++ b/samples/typescript/publishWithOpenTelemetryTracing.ts @@ -78,7 +78,10 @@ async function publishMessage(topicNameOrId: string, data: string) { // Publishes the message as a string, e.g. "Hello, world!" // or JSON.stringify(someObject) const dataBuffer = Buffer.from(data); + + // Cache topic objects (publishers) and reuse them. const publisher = pubSubClient.topic(topicNameOrId); + const messageId = await publisher.publishMessage({data: dataBuffer}); console.log(`Message ${messageId} published.`); diff --git a/samples/typescript/publishWithRetrySettings.ts b/samples/typescript/publishWithRetrySettings.ts index 1add95d3d..b47498275 100644 --- a/samples/typescript/publishWithRetrySettings.ts +++ b/samples/typescript/publishWithRetrySettings.ts @@ -1,4 +1,4 @@ -// Copyright 2019-2023 Google LLC +// Copyright 2019-2024 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -35,39 +35,20 @@ // Imports the Google Cloud client library. v1 is for the lower level // proto access. -import {v1} from '@google-cloud/pubsub'; +import {PubSub} from '@google-cloud/pubsub'; -// Creates a publisher client. -const publisherClient = new v1.PublisherClient({ - // optional auth parameters -}); - -async function publishWithRetrySettings( - projectId: string, - topicNameOrId: string, - data: string -) { - const formattedTopic = publisherClient.projectTopicPath( - projectId, - topicNameOrId - ); - - // Publishes the message as a string, e.g. "Hello, world!" or JSON.stringify(someObject) - const dataBuffer = Buffer.from(data); - const messagesElement = { - data: dataBuffer, - }; - const messages = [messagesElement]; - - // Build the request - const request = { - topic: formattedTopic, - messages: messages, - }; +async function publishWithRetrySettings(topicNameOrId: string, data: string) { + const pubsubClient = new PubSub(); // Retry settings control how the publisher handles retryable failures. Default values are shown. // The `retryCodes` array determines which grpc errors will trigger an automatic retry. // The `backoffSettings` object lets you specify the behaviour of retries over time. + // + // Reference this document to see the current defaults for publishing: + // https://github.com/googleapis/nodejs-pubsub/blob/6e2c28a9298a49dc1b194ce747ff5258c8df6deb/src/v1/publisher_client_config.json#L59 + // + // Please note that _all_ items must be included when passing these settings to topic(). + // Otherwise, unpredictable (incorrect) defaults may be assumed. const retrySettings = { retryCodes: [ 10, // 'ABORTED' @@ -84,36 +65,42 @@ async function publishWithRetrySettings( initialRetryDelayMillis: 100, // The multiplier by which to increase the delay time between the completion // of failed requests, and the initiation of the subsequent retrying request. - retryDelayMultiplier: 1.3, + retryDelayMultiplier: 4, // The maximum delay time, in milliseconds, between requests. // When this value is reached, retryDelayMultiplier will no longer be used to increase delay time. maxRetryDelayMillis: 60000, // The initial timeout parameter to the request. - initialRpcTimeoutMillis: 5000, + initialRpcTimeoutMillis: 60000, // The multiplier by which to increase the timeout parameter between failed requests. rpcTimeoutMultiplier: 1.0, // The maximum timeout parameter, in milliseconds, for a request. When this value is reached, // rpcTimeoutMultiplier will no longer be used to increase the timeout. - maxRpcTimeoutMillis: 600000, + maxRpcTimeoutMillis: 60000, // The total time, in milliseconds, starting from when the initial request is sent, // after which an error will be returned, regardless of the retrying attempts made meanwhile. totalTimeoutMillis: 600000, }, }; - const [response] = await publisherClient.publish(request, { - retry: retrySettings, + // Cache topic objects (publishers) and reuse them. + const topic = pubsubClient.topic(topicNameOrId, { + gaxOpts: { + retry: retrySettings, + }, }); - console.log(`Message ${response.messageIds} published.`); + + // Publishes the message as a string, e.g. "Hello, world!" or JSON.stringify(someObject) + const dataBuffer = Buffer.from(data); + const messageId = await topic.publishMessage({data: dataBuffer}); + console.log(`Message ${messageId} published.`); } // [END pubsub_publisher_retry_settings] function main( - projectId = 'YOUR_PROJECT_ID', topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID', data = JSON.stringify({foo: 'bar'}) ) { - publishWithRetrySettings(projectId, topicNameOrId, data).catch(err => { + publishWithRetrySettings(topicNameOrId, data).catch(err => { console.error(err.message); process.exitCode = 1; }); diff --git a/samples/typescript/resumePublish.ts b/samples/typescript/resumePublish.ts index ca1e929b3..2a7f9680f 100644 --- a/samples/typescript/resumePublish.ts +++ b/samples/typescript/resumePublish.ts @@ -1,4 +1,4 @@ -// Copyright 2019-2023 Google LLC +// Copyright 2019-2024 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -52,8 +52,14 @@ async function resumePublish( messageOrdering: true, }; - // Publishes the message + // Cache topic objects (publishers) and reuse them. + // + // Pub/Sub's ordered delivery guarantee only applies when publishes for an ordering + // key are in the same region. For list of locational endpoints for Pub/Sub, see: + // https://cloud.google.com/pubsub/docs/reference/service_apis_overview#list_of_locational_endpoints const publisher = pubSubClient.topic(topicNameOrId, publishOptions); + + // Publishes the message try { const message = { data: dataBuffer, diff --git a/src/message-queues.ts b/src/message-queues.ts index a08330a36..b5d1275d2 100644 --- a/src/message-queues.ts +++ b/src/message-queues.ts @@ -55,6 +55,9 @@ export interface QueuedMessage { */ export type QueuedMessages = Array; +/** + * Batching options for sending acks and modacks back to the server. + */ export interface BatchOptions { callOptions?: CallOptions; maxMessages?: number; diff --git a/src/subscriber.ts b/src/subscriber.ts index c20b63be6..834165c7d 100644 --- a/src/subscriber.ts +++ b/src/subscriber.ts @@ -560,7 +560,8 @@ export class Message implements tracing.MessageWithAttributes { * ever have, while it's under library control. * @property {Duration} [maxAckDeadline] The maximum time that ackDeadline should * ever have, while it's under library control. - * @property {BatchOptions} [batching] Request batching options. + * @property {BatchOptions} [batching] Request batching options; this is for + * batching acks and modacks being sent back to the server. * @property {FlowControlOptions} [flowControl] Flow control options. * @property {boolean} [useLegacyFlowControl] Disables enforcing flow control * settings at the Cloud PubSub server and uses the less accurate method