From 8329f47c62687ac2ac181f034be7ed0330e9e397 Mon Sep 17 00:00:00 2001 From: Marcio Cruz de Almeida <67694075+marciocadev@users.noreply.github.com> Date: Tue, 30 Apr 2024 07:21:08 -0300 Subject: [PATCH] feat(sdk): cloud.Topic publish() support variadic parameter (#6281) Now we can send multiple messages at once to a topic. Closes #6072 ## Checklist - [x] Title matches [Winglang's style guide](https://www.winglang.io/contributing/start-here/pull_requests#how-are-pull-request-titles-formatted) - [x] Description explains motivation and solution - [x] Tests added (always) - [ ] Docs updated (only required for features) - [ ] Added `pr/e2e-full` label if this feature requires end-to-end testing *By submitting this pull request, I confirm that my contribution is made under the terms of the [Wing Cloud Contribution License](https://github.com/winglang/wing/blob/main/CONTRIBUTION_LICENSE.md)*. --- docs/docs/04-standard-library/cloud/topic.md | 15 +- .../sdk_tests/topic/variadic-parameter.test.w | 17 ++ .../incomplete_inflight_namespace.snap | 4 +- .../completions/namespace_middle_dot.snap | 4 +- .../completions/new_expression_nested.snap | 2 +- .../partial_type_reference_annotation.snap | 4 +- .../variable_type_annotation_namespace.snap | 4 +- libs/wingsdk/src/cloud/topic.md | 7 +- libs/wingsdk/src/cloud/topic.ts | 7 +- libs/wingsdk/src/shared-aws/topic.inflight.ts | 65 +++++- libs/wingsdk/src/target-sim/topic.inflight.ts | 23 ++- .../test/shared-aws/topic.inflight.test.ts | 50 ++++- .../__snapshots__/bucket.test.ts.snap | 10 +- .../__snapshots__/topic-producer.test.ts.snap | 25 ++- .../test/target-sim/topic-producer.test.ts | 50 +++++ ...ariadic-parameter.test.w_compile_tf-aws.md | 185 ++++++++++++++++++ .../variadic-parameter.test.w_test_sim.md | 12 ++ 17 files changed, 435 insertions(+), 49 deletions(-) create mode 100644 examples/tests/sdk_tests/topic/variadic-parameter.test.w create mode 100644 tools/hangar/__snapshots__/test_corpus/sdk_tests/topic/variadic-parameter.test.w_compile_tf-aws.md create mode 100644 tools/hangar/__snapshots__/test_corpus/sdk_tests/topic/variadic-parameter.test.w_test_sim.md diff --git a/docs/docs/04-standard-library/cloud/topic.md b/docs/docs/04-standard-library/cloud/topic.md index f07374488a7..58624ad5c3f 100644 --- a/docs/docs/04-standard-library/cloud/topic.md +++ b/docs/docs/04-standard-library/cloud/topic.md @@ -56,7 +56,7 @@ topic.subscribeQueue(queue); ### Publishing to a topic -The inflight method `publish` sends a message to all of the topic's subscribers. +The inflight method `publish` sends messages to all of the topic's subscribers. ```js bring cloud; @@ -64,7 +64,10 @@ bring cloud; let topic = new cloud.Topic(); inflight () => { - topic.publish("Hello World!"); + topic.publish( + "Topics can now publish", + "multiple messages at once" + ); }; ``` @@ -155,7 +158,7 @@ new cloud.Topic(props?: TopicProps); | **Name** | **Description** | | --- | --- | -| publish | Publish message to topic. | +| publish | Publish messages to topic, if multiple messages are passed then they will be published as a batch if supported by the target platform. | --- @@ -202,12 +205,12 @@ Subscribing queue to the topic. ##### `publish` ```wing -inflight publish(message: str): void +inflight publish(...messages: Array): void ``` -Publish message to topic. +Publish messages to topic, if multiple messages are passed then they will be published as a batch if supported by the target platform. -###### `message`Required +###### `messages`Required - *Type:* str diff --git a/examples/tests/sdk_tests/topic/variadic-parameter.test.w b/examples/tests/sdk_tests/topic/variadic-parameter.test.w new file mode 100644 index 00000000000..479c0552f94 --- /dev/null +++ b/examples/tests/sdk_tests/topic/variadic-parameter.test.w @@ -0,0 +1,17 @@ +bring cloud; +bring util; + +let c = new cloud.Counter(); +let t = new cloud.Topic(); +t.onMessage(inflight (msg: str) => { + c.inc(); +}); + +test "publish message array to topic" { + t.publish("msg1", "msg2", "msg3", + "msg4", "msg5", "msg6", "msg7", + "msg8", "msg9", "msg10", "msg11", + "msg12", "msg13", "msg14", "msg15"); + + assert(util.waitUntil(inflight () => { return c.peek() == 15; })); +} diff --git a/libs/wingc/src/lsp/snapshots/completions/incomplete_inflight_namespace.snap b/libs/wingc/src/lsp/snapshots/completions/incomplete_inflight_namespace.snap index bdc1a2b270b..7b6e8936156 100644 --- a/libs/wingc/src/lsp/snapshots/completions/incomplete_inflight_namespace.snap +++ b/libs/wingc/src/lsp/snapshots/completions/incomplete_inflight_namespace.snap @@ -71,7 +71,7 @@ source: libs/wingc/src/lsp/completions.rs kind: 7 documentation: kind: markdown - value: "```wing\nclass Topic\n```\n---\nA topic.\n\n### Initializer\n- `...props` — `TopicProps?`\n### Fields\n- `node` — `Node` — The tree node.\n### Methods\n- `isConstruct` — `preflight (x: any): bool` — Checks if `x` is a construct.\n- `onLift` — `preflight (host: IInflightHost, ops: Array): void` — A hook called by the Wing compiler once for each inflight host that needs to use this resource inflight.\n- `onLiftType` — `preflight (host: IInflightHost, ops: Array): void` — A hook called by the Wing compiler once for each inflight host that needs to use this type inflight.\n- `onMessage` — `preflight (inflight: inflight (event: str): void, props: TopicOnMessageOptions?): Function` — Run an inflight whenever an message is published to the topic.\n- `publish` — `inflight (message: str): void` — Publish message to topic.\n- `subscribeQueue` — `preflight (queue: Queue, props: TopicSubscribeQueueOptions?): void` — Subscribing queue to the topic.\n- `toString` — `preflight (): str` — Returns a string representation of this construct." + value: "```wing\nclass Topic\n```\n---\nA topic.\n\n### Initializer\n- `...props` — `TopicProps?`\n### Fields\n- `node` — `Node` — The tree node.\n### Methods\n- `isConstruct` — `preflight (x: any): bool` — Checks if `x` is a construct.\n- `onLift` — `preflight (host: IInflightHost, ops: Array): void` — A hook called by the Wing compiler once for each inflight host that needs to use this resource inflight.\n- `onLiftType` — `preflight (host: IInflightHost, ops: Array): void` — A hook called by the Wing compiler once for each inflight host that needs to use this type inflight.\n- `onMessage` — `preflight (inflight: inflight (event: str): void, props: TopicOnMessageOptions?): Function` — Run an inflight whenever an message is published to the topic.\n- `publish` — `inflight (...messages: Array?): void` — Publish messages to topic, if multiple messages are passed then they will be published as a batch if supported by the target platform.\n- `subscribeQueue` — `preflight (queue: Queue, props: TopicSubscribeQueueOptions?): void` — Subscribing queue to the topic.\n- `toString` — `preflight (): str` — Returns a string representation of this construct." sortText: gg|Topic - label: Website kind: 7 @@ -515,7 +515,7 @@ source: libs/wingc/src/lsp/completions.rs kind: 8 documentation: kind: markdown - value: "```wing\ninterface ITopicClient\n```\n---\nInflight interface for `Topic`.\n### Methods\n- `publish` — `inflight (message: str): void` — Publish message to topic." + value: "```wing\ninterface ITopicClient\n```\n---\nInflight interface for `Topic`.\n### Methods\n- `publish` — `inflight (...messages: Array?): void` — Publish messages to topic, if multiple messages are passed then they will be published as a batch if supported by the target platform." sortText: ii|ITopicClient - label: ITopicOnMessageHandler kind: 8 diff --git a/libs/wingc/src/lsp/snapshots/completions/namespace_middle_dot.snap b/libs/wingc/src/lsp/snapshots/completions/namespace_middle_dot.snap index bdc1a2b270b..7b6e8936156 100644 --- a/libs/wingc/src/lsp/snapshots/completions/namespace_middle_dot.snap +++ b/libs/wingc/src/lsp/snapshots/completions/namespace_middle_dot.snap @@ -71,7 +71,7 @@ source: libs/wingc/src/lsp/completions.rs kind: 7 documentation: kind: markdown - value: "```wing\nclass Topic\n```\n---\nA topic.\n\n### Initializer\n- `...props` — `TopicProps?`\n### Fields\n- `node` — `Node` — The tree node.\n### Methods\n- `isConstruct` — `preflight (x: any): bool` — Checks if `x` is a construct.\n- `onLift` — `preflight (host: IInflightHost, ops: Array): void` — A hook called by the Wing compiler once for each inflight host that needs to use this resource inflight.\n- `onLiftType` — `preflight (host: IInflightHost, ops: Array): void` — A hook called by the Wing compiler once for each inflight host that needs to use this type inflight.\n- `onMessage` — `preflight (inflight: inflight (event: str): void, props: TopicOnMessageOptions?): Function` — Run an inflight whenever an message is published to the topic.\n- `publish` — `inflight (message: str): void` — Publish message to topic.\n- `subscribeQueue` — `preflight (queue: Queue, props: TopicSubscribeQueueOptions?): void` — Subscribing queue to the topic.\n- `toString` — `preflight (): str` — Returns a string representation of this construct." + value: "```wing\nclass Topic\n```\n---\nA topic.\n\n### Initializer\n- `...props` — `TopicProps?`\n### Fields\n- `node` — `Node` — The tree node.\n### Methods\n- `isConstruct` — `preflight (x: any): bool` — Checks if `x` is a construct.\n- `onLift` — `preflight (host: IInflightHost, ops: Array): void` — A hook called by the Wing compiler once for each inflight host that needs to use this resource inflight.\n- `onLiftType` — `preflight (host: IInflightHost, ops: Array): void` — A hook called by the Wing compiler once for each inflight host that needs to use this type inflight.\n- `onMessage` — `preflight (inflight: inflight (event: str): void, props: TopicOnMessageOptions?): Function` — Run an inflight whenever an message is published to the topic.\n- `publish` — `inflight (...messages: Array?): void` — Publish messages to topic, if multiple messages are passed then they will be published as a batch if supported by the target platform.\n- `subscribeQueue` — `preflight (queue: Queue, props: TopicSubscribeQueueOptions?): void` — Subscribing queue to the topic.\n- `toString` — `preflight (): str` — Returns a string representation of this construct." sortText: gg|Topic - label: Website kind: 7 @@ -515,7 +515,7 @@ source: libs/wingc/src/lsp/completions.rs kind: 8 documentation: kind: markdown - value: "```wing\ninterface ITopicClient\n```\n---\nInflight interface for `Topic`.\n### Methods\n- `publish` — `inflight (message: str): void` — Publish message to topic." + value: "```wing\ninterface ITopicClient\n```\n---\nInflight interface for `Topic`.\n### Methods\n- `publish` — `inflight (...messages: Array?): void` — Publish messages to topic, if multiple messages are passed then they will be published as a batch if supported by the target platform." sortText: ii|ITopicClient - label: ITopicOnMessageHandler kind: 8 diff --git a/libs/wingc/src/lsp/snapshots/completions/new_expression_nested.snap b/libs/wingc/src/lsp/snapshots/completions/new_expression_nested.snap index df591b8673e..683e41dd6e3 100644 --- a/libs/wingc/src/lsp/snapshots/completions/new_expression_nested.snap +++ b/libs/wingc/src/lsp/snapshots/completions/new_expression_nested.snap @@ -126,7 +126,7 @@ source: libs/wingc/src/lsp/completions.rs kind: 7 documentation: kind: markdown - value: "```wing\nclass Topic\n```\n---\nA topic.\n\n### Initializer\n- `...props` — `TopicProps?`\n### Fields\n- `node` — `Node` — The tree node.\n### Methods\n- `isConstruct` — `preflight (x: any): bool` — Checks if `x` is a construct.\n- `onLift` — `preflight (host: IInflightHost, ops: Array): void` — A hook called by the Wing compiler once for each inflight host that needs to use this resource inflight.\n- `onLiftType` — `preflight (host: IInflightHost, ops: Array): void` — A hook called by the Wing compiler once for each inflight host that needs to use this type inflight.\n- `onMessage` — `preflight (inflight: inflight (event: str): void, props: TopicOnMessageOptions?): Function` — Run an inflight whenever an message is published to the topic.\n- `publish` — `inflight (message: str): void` — Publish message to topic.\n- `subscribeQueue` — `preflight (queue: Queue, props: TopicSubscribeQueueOptions?): void` — Subscribing queue to the topic.\n- `toString` — `preflight (): str` — Returns a string representation of this construct." + value: "```wing\nclass Topic\n```\n---\nA topic.\n\n### Initializer\n- `...props` — `TopicProps?`\n### Fields\n- `node` — `Node` — The tree node.\n### Methods\n- `isConstruct` — `preflight (x: any): bool` — Checks if `x` is a construct.\n- `onLift` — `preflight (host: IInflightHost, ops: Array): void` — A hook called by the Wing compiler once for each inflight host that needs to use this resource inflight.\n- `onLiftType` — `preflight (host: IInflightHost, ops: Array): void` — A hook called by the Wing compiler once for each inflight host that needs to use this type inflight.\n- `onMessage` — `preflight (inflight: inflight (event: str): void, props: TopicOnMessageOptions?): Function` — Run an inflight whenever an message is published to the topic.\n- `publish` — `inflight (...messages: Array?): void` — Publish messages to topic, if multiple messages are passed then they will be published as a batch if supported by the target platform.\n- `subscribeQueue` — `preflight (queue: Queue, props: TopicSubscribeQueueOptions?): void` — Subscribing queue to the topic.\n- `toString` — `preflight (): str` — Returns a string representation of this construct." sortText: gg|Topic insertText: Topic($1) insertTextFormat: 2 diff --git a/libs/wingc/src/lsp/snapshots/completions/partial_type_reference_annotation.snap b/libs/wingc/src/lsp/snapshots/completions/partial_type_reference_annotation.snap index bdc1a2b270b..7b6e8936156 100644 --- a/libs/wingc/src/lsp/snapshots/completions/partial_type_reference_annotation.snap +++ b/libs/wingc/src/lsp/snapshots/completions/partial_type_reference_annotation.snap @@ -71,7 +71,7 @@ source: libs/wingc/src/lsp/completions.rs kind: 7 documentation: kind: markdown - value: "```wing\nclass Topic\n```\n---\nA topic.\n\n### Initializer\n- `...props` — `TopicProps?`\n### Fields\n- `node` — `Node` — The tree node.\n### Methods\n- `isConstruct` — `preflight (x: any): bool` — Checks if `x` is a construct.\n- `onLift` — `preflight (host: IInflightHost, ops: Array): void` — A hook called by the Wing compiler once for each inflight host that needs to use this resource inflight.\n- `onLiftType` — `preflight (host: IInflightHost, ops: Array): void` — A hook called by the Wing compiler once for each inflight host that needs to use this type inflight.\n- `onMessage` — `preflight (inflight: inflight (event: str): void, props: TopicOnMessageOptions?): Function` — Run an inflight whenever an message is published to the topic.\n- `publish` — `inflight (message: str): void` — Publish message to topic.\n- `subscribeQueue` — `preflight (queue: Queue, props: TopicSubscribeQueueOptions?): void` — Subscribing queue to the topic.\n- `toString` — `preflight (): str` — Returns a string representation of this construct." + value: "```wing\nclass Topic\n```\n---\nA topic.\n\n### Initializer\n- `...props` — `TopicProps?`\n### Fields\n- `node` — `Node` — The tree node.\n### Methods\n- `isConstruct` — `preflight (x: any): bool` — Checks if `x` is a construct.\n- `onLift` — `preflight (host: IInflightHost, ops: Array): void` — A hook called by the Wing compiler once for each inflight host that needs to use this resource inflight.\n- `onLiftType` — `preflight (host: IInflightHost, ops: Array): void` — A hook called by the Wing compiler once for each inflight host that needs to use this type inflight.\n- `onMessage` — `preflight (inflight: inflight (event: str): void, props: TopicOnMessageOptions?): Function` — Run an inflight whenever an message is published to the topic.\n- `publish` — `inflight (...messages: Array?): void` — Publish messages to topic, if multiple messages are passed then they will be published as a batch if supported by the target platform.\n- `subscribeQueue` — `preflight (queue: Queue, props: TopicSubscribeQueueOptions?): void` — Subscribing queue to the topic.\n- `toString` — `preflight (): str` — Returns a string representation of this construct." sortText: gg|Topic - label: Website kind: 7 @@ -515,7 +515,7 @@ source: libs/wingc/src/lsp/completions.rs kind: 8 documentation: kind: markdown - value: "```wing\ninterface ITopicClient\n```\n---\nInflight interface for `Topic`.\n### Methods\n- `publish` — `inflight (message: str): void` — Publish message to topic." + value: "```wing\ninterface ITopicClient\n```\n---\nInflight interface for `Topic`.\n### Methods\n- `publish` — `inflight (...messages: Array?): void` — Publish messages to topic, if multiple messages are passed then they will be published as a batch if supported by the target platform." sortText: ii|ITopicClient - label: ITopicOnMessageHandler kind: 8 diff --git a/libs/wingc/src/lsp/snapshots/completions/variable_type_annotation_namespace.snap b/libs/wingc/src/lsp/snapshots/completions/variable_type_annotation_namespace.snap index bdc1a2b270b..7b6e8936156 100644 --- a/libs/wingc/src/lsp/snapshots/completions/variable_type_annotation_namespace.snap +++ b/libs/wingc/src/lsp/snapshots/completions/variable_type_annotation_namespace.snap @@ -71,7 +71,7 @@ source: libs/wingc/src/lsp/completions.rs kind: 7 documentation: kind: markdown - value: "```wing\nclass Topic\n```\n---\nA topic.\n\n### Initializer\n- `...props` — `TopicProps?`\n### Fields\n- `node` — `Node` — The tree node.\n### Methods\n- `isConstruct` — `preflight (x: any): bool` — Checks if `x` is a construct.\n- `onLift` — `preflight (host: IInflightHost, ops: Array): void` — A hook called by the Wing compiler once for each inflight host that needs to use this resource inflight.\n- `onLiftType` — `preflight (host: IInflightHost, ops: Array): void` — A hook called by the Wing compiler once for each inflight host that needs to use this type inflight.\n- `onMessage` — `preflight (inflight: inflight (event: str): void, props: TopicOnMessageOptions?): Function` — Run an inflight whenever an message is published to the topic.\n- `publish` — `inflight (message: str): void` — Publish message to topic.\n- `subscribeQueue` — `preflight (queue: Queue, props: TopicSubscribeQueueOptions?): void` — Subscribing queue to the topic.\n- `toString` — `preflight (): str` — Returns a string representation of this construct." + value: "```wing\nclass Topic\n```\n---\nA topic.\n\n### Initializer\n- `...props` — `TopicProps?`\n### Fields\n- `node` — `Node` — The tree node.\n### Methods\n- `isConstruct` — `preflight (x: any): bool` — Checks if `x` is a construct.\n- `onLift` — `preflight (host: IInflightHost, ops: Array): void` — A hook called by the Wing compiler once for each inflight host that needs to use this resource inflight.\n- `onLiftType` — `preflight (host: IInflightHost, ops: Array): void` — A hook called by the Wing compiler once for each inflight host that needs to use this type inflight.\n- `onMessage` — `preflight (inflight: inflight (event: str): void, props: TopicOnMessageOptions?): Function` — Run an inflight whenever an message is published to the topic.\n- `publish` — `inflight (...messages: Array?): void` — Publish messages to topic, if multiple messages are passed then they will be published as a batch if supported by the target platform.\n- `subscribeQueue` — `preflight (queue: Queue, props: TopicSubscribeQueueOptions?): void` — Subscribing queue to the topic.\n- `toString` — `preflight (): str` — Returns a string representation of this construct." sortText: gg|Topic - label: Website kind: 7 @@ -515,7 +515,7 @@ source: libs/wingc/src/lsp/completions.rs kind: 8 documentation: kind: markdown - value: "```wing\ninterface ITopicClient\n```\n---\nInflight interface for `Topic`.\n### Methods\n- `publish` — `inflight (message: str): void` — Publish message to topic." + value: "```wing\ninterface ITopicClient\n```\n---\nInflight interface for `Topic`.\n### Methods\n- `publish` — `inflight (...messages: Array?): void` — Publish messages to topic, if multiple messages are passed then they will be published as a batch if supported by the target platform." sortText: ii|ITopicClient - label: ITopicOnMessageHandler kind: 8 diff --git a/libs/wingsdk/src/cloud/topic.md b/libs/wingsdk/src/cloud/topic.md index 95a104215af..fae2cba4e32 100644 --- a/libs/wingsdk/src/cloud/topic.md +++ b/libs/wingsdk/src/cloud/topic.md @@ -56,7 +56,7 @@ topic.subscribeQueue(queue); ### Publishing to a topic -The inflight method `publish` sends a message to all of the topic's subscribers. +The inflight method `publish` sends messages to all of the topic's subscribers. ```js bring cloud; @@ -64,7 +64,10 @@ bring cloud; let topic = new cloud.Topic(); inflight () => { - topic.publish("Hello World!"); + topic.publish( + "Topics can now publish", + "multiple messages at once" + ); }; ``` diff --git a/libs/wingsdk/src/cloud/topic.ts b/libs/wingsdk/src/cloud/topic.ts index 47ac73f97d7..aec98e83025 100644 --- a/libs/wingsdk/src/cloud/topic.ts +++ b/libs/wingsdk/src/cloud/topic.ts @@ -77,11 +77,12 @@ export interface TopicSubscribeQueueOptions extends QueueProps {} */ export interface ITopicClient { /** - * Publish message to topic - * @param message Payload to publish to Topic + * Publish messages to topic, if multiple messages are passed then they + * will be published as a batch if supported by the target platform + * @param messages Payload to publish to Topic * @inflight */ - publish(message: string): Promise; + publish(...messages: string[]): void; } /** diff --git a/libs/wingsdk/src/shared-aws/topic.inflight.ts b/libs/wingsdk/src/shared-aws/topic.inflight.ts index a4e741abd05..bba4cdc1dd5 100644 --- a/libs/wingsdk/src/shared-aws/topic.inflight.ts +++ b/libs/wingsdk/src/shared-aws/topic.inflight.ts @@ -1,5 +1,18 @@ -import { SNSClient, PublishCommand } from "@aws-sdk/client-sns"; +import { + SNSClient, + PublishBatchCommand, + PublishBatchRequestEntry, + InvalidBatchEntryIdException, +} from "@aws-sdk/client-sns"; import { ITopicClient } from "../cloud"; +import { Util } from "../util/util"; + +/** + * Topics in AWS can receive up to 10 messages at a time + * using the PublishBatchCommand, this constant is used + * to generate batches respecting the limits. + */ +const CHUNK_SIZE = 10; export class TopicClient implements ITopicClient { constructor( @@ -7,11 +20,49 @@ export class TopicClient implements ITopicClient { private readonly client: SNSClient = new SNSClient({}) ) {} - public async publish(message: string): Promise { - const command = new PublishCommand({ - Message: message, - TopicArn: this.topicArn, - }); - await this.client.send(command); + public async publish(...messages: string[]): Promise { + if (messages.includes("")) { + throw new Error("Empty messages are not allowed"); + } + + let batchMessages: Array = []; + for (let i = 0; i < messages.length; i += CHUNK_SIZE) { + const chunk = messages.slice(i, i + CHUNK_SIZE); + batchMessages.push(this.processBatchMessages(chunk, i)); + } + + for (const batch of batchMessages) { + try { + const command = new PublishBatchCommand({ + TopicArn: this.topicArn, + PublishBatchRequestEntries: batch, + }); + await this.client.send(command); + } catch (e) { + if (e instanceof InvalidBatchEntryIdException) { + throw new Error( + `The Id of a batch entry in a batch request doesn't abide by the specification. (message=${messages}): ${ + (e as Error).stack + })}` + ); + } + throw new Error((e as Error).stack); + } + } + } + + private processBatchMessages( + messages: string[], + idx: number + ): PublishBatchRequestEntry[] { + let batchMessages: Array = []; + let index = idx; + for (const message of messages) { + batchMessages.push({ + Id: Util.sha256(`${message}-${++index}`), + Message: message, + }); + } + return batchMessages; } } diff --git a/libs/wingsdk/src/target-sim/topic.inflight.ts b/libs/wingsdk/src/target-sim/topic.inflight.ts index b81c0d91f5e..b91c35495d4 100644 --- a/libs/wingsdk/src/target-sim/topic.inflight.ts +++ b/libs/wingsdk/src/target-sim/topic.inflight.ts @@ -81,17 +81,20 @@ export class Topic } } - public async publish(message: string): Promise { - this.context.addTrace({ - data: { - message: `Publish (message=${message}).`, + public publish(...messages: string[]): Promise { + return this.context.withTrace({ + message: `Publish (messages=${messages}).`, + activity: async () => { + if (messages.includes("")) { + throw new Error("Empty messages are not allowed"); + } + let publishAll: Array> = []; + for (const message of messages) { + publishAll.push(this.publishMessage(message)); + } + + return Promise.all(publishAll); }, - sourcePath: this.context.resourcePath, - sourceType: TOPIC_FQN, - type: TraceType.RESOURCE, - timestamp: new Date().toISOString(), }); - - return this.publishMessage(message); } } diff --git a/libs/wingsdk/test/shared-aws/topic.inflight.test.ts b/libs/wingsdk/test/shared-aws/topic.inflight.test.ts index f4a911dd92d..29872d19821 100644 --- a/libs/wingsdk/test/shared-aws/topic.inflight.test.ts +++ b/libs/wingsdk/test/shared-aws/topic.inflight.test.ts @@ -1,8 +1,9 @@ -import { SNSClient, PublishCommand } from "@aws-sdk/client-sns"; +import { SNSClient, PublishBatchCommand } from "@aws-sdk/client-sns"; import { mockClient } from "aws-sdk-client-mock"; import "aws-sdk-client-mock-jest"; -import { test, expect, beforeEach, vi } from "vitest"; +import { test, expect, beforeEach } from "vitest"; import { TopicClient } from "../../src/shared-aws/topic.inflight"; +import { Util } from "../../src/util"; const snsMock = mockClient(SNSClient); @@ -14,16 +15,53 @@ test("publish - happy path", async () => { // GIVEN const TOPIC_ARN = "SOME:TOPIC_ARN:that-is/fake"; const MESSAGE = "SOME MESSAGE"; - snsMock.on(PublishCommand).resolves({ $metadata: { httpStatusCode: 200 } }); + snsMock + .on(PublishBatchCommand) + .resolves({ $metadata: { httpStatusCode: 200 } }); // WHEN const client = new TopicClient(TOPIC_ARN); await client.publish(MESSAGE); // THEN - expect(snsMock).toHaveReceivedCommandTimes(PublishCommand, 1); - expect(snsMock).toHaveReceivedCommandWith(PublishCommand, { - Message: MESSAGE, + expect(snsMock).toHaveReceivedCommandTimes(PublishBatchCommand, 1); + expect(snsMock).toHaveReceivedCommandWith(PublishBatchCommand, { TopicArn: TOPIC_ARN, + PublishBatchRequestEntries: [ + { + Id: Util.sha256(`${MESSAGE}-1`), + Message: MESSAGE, + }, + ], + }); +}); + +test("publish multiple messages", async () => { + // GIVEN + const TOPIC_ARN = "SOME:TOPIC_ARN:that-is/fake"; + const FIRST_MESSAGE = "FIRST MESSAGE"; + const SECOND_MESSAGE = "SECOND MESSAGE"; + snsMock + .on(PublishBatchCommand) + .resolves({ $metadata: { httpStatusCode: 200 } }); + + // WHEN + const client = new TopicClient(TOPIC_ARN); + await client.publish(FIRST_MESSAGE, SECOND_MESSAGE); + + // THEN + expect(snsMock).toHaveReceivedCommandTimes(PublishBatchCommand, 1); + expect(snsMock).toHaveReceivedCommandWith(PublishBatchCommand, { + TopicArn: TOPIC_ARN, + PublishBatchRequestEntries: [ + { + Id: Util.sha256(`${FIRST_MESSAGE}-1`), + Message: FIRST_MESSAGE, + }, + { + Id: Util.sha256(`${SECOND_MESSAGE}-2`), + Message: SECOND_MESSAGE, + }, + ], }); }); diff --git a/libs/wingsdk/test/target-sim/__snapshots__/bucket.test.ts.snap b/libs/wingsdk/test/target-sim/__snapshots__/bucket.test.ts.snap index 3635f30aff2..e3c65b39c6b 100644 --- a/libs/wingsdk/test/target-sim/__snapshots__/bucket.test.ts.snap +++ b/libs/wingsdk/test/target-sim/__snapshots__/bucket.test.ts.snap @@ -18,23 +18,23 @@ exports[`bucket on event creates 3 topics, and sends the right event and key in "root/my_bucket/ondelete/Policy started", "root/my_bucket/ondelete/TopicEventMapping0 started", "root/log_bucket/Policy started", - "Publish (message=a).", "Sending message (message=a, subscriber=sim-6).", "InvokeAsync (payload="a").", + "Publish (messages=a).", "Put (key=a).", "Put (key=a).", "I am done", "Get (key=a).", - "Publish (message=a).", "Sending message (message=a, subscriber=sim-9).", "InvokeAsync (payload="a").", + "Publish (messages=a).", "Put (key=a).", "Put (key=a).", "I am done", "Get (key=a).", - "Publish (message=a).", "Sending message (message=a, subscriber=sim-12).", "InvokeAsync (payload="a").", + "Publish (messages=a).", "Delete (key=a).", "Put (key=a).", "I am done", @@ -836,9 +836,9 @@ exports[`removing a key will call onDelete method 1`] = ` "root/my_bucket/ondelete/Policy started", "root/my_bucket/ondelete/TopicEventMapping0 started", "Put (key=unknown.txt).", - "Publish (message=unknown.txt).", "Sending message (message=unknown.txt, subscriber=sim-3).", "InvokeAsync (payload="unknown.txt").", + "Publish (messages=unknown.txt).", "Delete (key=unknown.txt).", "Received unknown.txt", "root/my_bucket/Policy stopped", @@ -858,9 +858,9 @@ exports[`update an object in bucket 1`] = ` "root/my_bucket/oncreate/OnMessage0 started", "root/my_bucket/oncreate/Policy started", "root/my_bucket/oncreate/TopicEventMapping0 started", - "Publish (message=1.txt).", "Sending message (message=1.txt, subscriber=sim-3).", "InvokeAsync (payload="1.txt").", + "Publish (messages=1.txt).", "Put (key=1.txt).", "Put (key=1.txt).", "I am done", diff --git a/libs/wingsdk/test/target-sim/__snapshots__/topic-producer.test.ts.snap b/libs/wingsdk/test/target-sim/__snapshots__/topic-producer.test.ts.snap index c70c689e3d1..169c45a9fed 100644 --- a/libs/wingsdk/test/target-sim/__snapshots__/topic-producer.test.ts.snap +++ b/libs/wingsdk/test/target-sim/__snapshots__/topic-producer.test.ts.snap @@ -7,9 +7,9 @@ exports[`publishing messages to topic 1`] = ` "root/TopicTester/MyTopic/Policy started", "root/TopicTester/MyTopic/TopicEventMapping0 started", "root/TopicTester/Function started", - "Publish (message=ABC).", "Sending message (message=ABC, subscriber=sim-1).", "InvokeAsync (payload="ABC").", + "Publish (messages=ABC).", "Invoke (payload="ABC").", "Message received", "root/TopicTester/MyTopic/Policy stopped", @@ -19,3 +19,26 @@ exports[`publishing messages to topic 1`] = ` "root/TopicTester/MyTopic stopped", ] `; + +exports[`publishing multiple messages to topic 1`] = ` +[ + "root/TopicTester/MyTopic started", + "root/TopicTester/MyTopic/OnMessage0 started", + "root/TopicTester/MyTopic/Policy started", + "root/TopicTester/MyTopic/TopicEventMapping0 started", + "root/TopicTester/Function started", + "Sending message (message=A, subscriber=sim-1).", + "Sending message (message=B, subscriber=sim-1).", + "Sending message (message=C, subscriber=sim-1).", + "InvokeAsync (payload="A").", + "InvokeAsync (payload="B").", + "InvokeAsync (payload="C").", + "Publish (messages=A,B,C).", + "Invoke (payload="ABC").", + "root/TopicTester/MyTopic/Policy stopped", + "root/TopicTester/MyTopic/TopicEventMapping0 stopped", + "root/TopicTester/MyTopic/OnMessage0 stopped", + "root/TopicTester/Function stopped", + "root/TopicTester/MyTopic stopped", +] +`; diff --git a/libs/wingsdk/test/target-sim/topic-producer.test.ts b/libs/wingsdk/test/target-sim/topic-producer.test.ts index afe43c76cb4..c5d041e867f 100644 --- a/libs/wingsdk/test/target-sim/topic-producer.test.ts +++ b/libs/wingsdk/test/target-sim/topic-producer.test.ts @@ -54,3 +54,53 @@ test("publishing messages to topic", async () => { expect(listMessages(s)).toMatchSnapshot(); }); + +test("publishing multiple messages to topic", async () => { + // GIVEN + class TopicTest extends Construct { + constructor(scope: Construct, id: string) { + super(scope, id); + + const topic = new cloud.Topic(this, "MyTopic"); + const publisher = Testing.makeHandler( + `async handle(event) { + await this.topic.publish(...event.split("")); + }`, + { + topic: { + obj: topic, + ops: [cloud.TopicInflightMethods.PUBLISH], + }, + } + ); + new cloud.Function(this, "Function", publisher); + + const processor = Testing.makeHandler(`async handle(event) { + if (event.message === "") throw new Error("No message recieved"); + console.log("event"); + }`); + topic.onMessage(processor); + } + } + + const app = new SimApp(); + new TopicTest(app, "TopicTester"); + + const s = await app.startSimulator(); + + const publisher = s.getResource( + "/TopicTester/Function" + ) as cloud.IFunctionClient; + + // WHEN + await publisher.invoke("ABC"); + + await waitUntilTraceCount(s, 1, (trace) => + trace.data.message.includes("A", "B", "C") + ); + + // THEN + await s.stop(); + + expect(listMessages(s)).toMatchSnapshot(); +}); diff --git a/tools/hangar/__snapshots__/test_corpus/sdk_tests/topic/variadic-parameter.test.w_compile_tf-aws.md b/tools/hangar/__snapshots__/test_corpus/sdk_tests/topic/variadic-parameter.test.w_compile_tf-aws.md new file mode 100644 index 00000000000..2bc3d594eaf --- /dev/null +++ b/tools/hangar/__snapshots__/test_corpus/sdk_tests/topic/variadic-parameter.test.w_compile_tf-aws.md @@ -0,0 +1,185 @@ +# [variadic-parameter.test.w](../../../../../../examples/tests/sdk_tests/topic/variadic-parameter.test.w) | compile | tf-aws + +## main.tf.json +```json +{ + "//": { + "metadata": { + "backend": "local", + "stackName": "root", + "version": "0.20.3" + }, + "outputs": {} + }, + "provider": { + "aws": [ + {} + ] + }, + "resource": { + "aws_cloudwatch_log_group": { + "Topic-OnMessage0_CloudwatchLogGroup_DE4DF0A1": { + "//": { + "metadata": { + "path": "root/Default/Default/Topic-OnMessage0/CloudwatchLogGroup", + "uniqueId": "Topic-OnMessage0_CloudwatchLogGroup_DE4DF0A1" + } + }, + "name": "/aws/lambda/Topic-OnMessage0-c85d7820", + "retention_in_days": 30 + } + }, + "aws_dynamodb_table": { + "Counter": { + "//": { + "metadata": { + "path": "root/Default/Default/Counter/Default", + "uniqueId": "Counter" + } + }, + "attribute": [ + { + "name": "id", + "type": "S" + } + ], + "billing_mode": "PAY_PER_REQUEST", + "hash_key": "id", + "name": "wing-counter-Counter-c824ef62" + } + }, + "aws_iam_role": { + "Topic-OnMessage0_IamRole_64DD36FA": { + "//": { + "metadata": { + "path": "root/Default/Default/Topic-OnMessage0/IamRole", + "uniqueId": "Topic-OnMessage0_IamRole_64DD36FA" + } + }, + "assume_role_policy": "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Action\":\"sts:AssumeRole\",\"Principal\":{\"Service\":\"lambda.amazonaws.com\"},\"Effect\":\"Allow\"}]}" + } + }, + "aws_iam_role_policy": { + "Topic-OnMessage0_IamRolePolicy_F5EE09D8": { + "//": { + "metadata": { + "path": "root/Default/Default/Topic-OnMessage0/IamRolePolicy", + "uniqueId": "Topic-OnMessage0_IamRolePolicy_F5EE09D8" + } + }, + "policy": "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Action\":[\"dynamodb:UpdateItem\"],\"Resource\":[\"${aws_dynamodb_table.Counter.arn}\"],\"Effect\":\"Allow\"}]}", + "role": "${aws_iam_role.Topic-OnMessage0_IamRole_64DD36FA.name}" + } + }, + "aws_iam_role_policy_attachment": { + "Topic-OnMessage0_IamRolePolicyAttachment_091E665D": { + "//": { + "metadata": { + "path": "root/Default/Default/Topic-OnMessage0/IamRolePolicyAttachment", + "uniqueId": "Topic-OnMessage0_IamRolePolicyAttachment_091E665D" + } + }, + "policy_arn": "arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole", + "role": "${aws_iam_role.Topic-OnMessage0_IamRole_64DD36FA.name}" + } + }, + "aws_lambda_function": { + "Topic-OnMessage0": { + "//": { + "metadata": { + "path": "root/Default/Default/Topic-OnMessage0/Default", + "uniqueId": "Topic-OnMessage0" + } + }, + "architectures": [ + "arm64" + ], + "environment": { + "variables": { + "DYNAMODB_TABLE_NAME_6cb5a3a4": "${aws_dynamodb_table.Counter.name}", + "NODE_OPTIONS": "--enable-source-maps", + "WING_FUNCTION_NAME": "Topic-OnMessage0-c85d7820", + "WING_TARGET": "tf-aws" + } + }, + "function_name": "Topic-OnMessage0-c85d7820", + "handler": "index.handler", + "memory_size": 1024, + "publish": true, + "role": "${aws_iam_role.Topic-OnMessage0_IamRole_64DD36FA.arn}", + "runtime": "nodejs20.x", + "s3_bucket": "${aws_s3_bucket.Code.bucket}", + "s3_key": "${aws_s3_object.Topic-OnMessage0_S3Object_D41E9C10.key}", + "timeout": 60, + "vpc_config": { + "security_group_ids": [], + "subnet_ids": [] + } + } + }, + "aws_lambda_permission": { + "Topic-OnMessage0_InvokePermission-c8228fb70d825c2a5610c610e5246d5313ea6bd1a2_2E2D0106": { + "//": { + "metadata": { + "path": "root/Default/Default/Topic-OnMessage0/InvokePermission-c8228fb70d825c2a5610c610e5246d5313ea6bd1a2", + "uniqueId": "Topic-OnMessage0_InvokePermission-c8228fb70d825c2a5610c610e5246d5313ea6bd1a2_2E2D0106" + } + }, + "action": "lambda:InvokeFunction", + "function_name": "${aws_lambda_function.Topic-OnMessage0.function_name}", + "principal": "sns.amazonaws.com", + "source_arn": "${aws_sns_topic.Topic.arn}" + } + }, + "aws_s3_bucket": { + "Code": { + "//": { + "metadata": { + "path": "root/Default/Code", + "uniqueId": "Code" + } + }, + "bucket_prefix": "code-c84a50b1-" + } + }, + "aws_s3_object": { + "Topic-OnMessage0_S3Object_D41E9C10": { + "//": { + "metadata": { + "path": "root/Default/Default/Topic-OnMessage0/S3Object", + "uniqueId": "Topic-OnMessage0_S3Object_D41E9C10" + } + }, + "bucket": "${aws_s3_bucket.Code.bucket}", + "key": "", + "source": "" + } + }, + "aws_sns_topic": { + "Topic": { + "//": { + "metadata": { + "path": "root/Default/Default/Topic/Default", + "uniqueId": "Topic" + } + }, + "name": "Topic-c8228fb7" + } + }, + "aws_sns_topic_subscription": { + "Topic_TopicSubscription0_0EA5CC90": { + "//": { + "metadata": { + "path": "root/Default/Default/Topic/TopicSubscription0", + "uniqueId": "Topic_TopicSubscription0_0EA5CC90" + } + }, + "endpoint": "${aws_lambda_function.Topic-OnMessage0.arn}", + "protocol": "lambda", + "topic_arn": "${aws_sns_topic.Topic.arn}" + } + } + } +} +``` + diff --git a/tools/hangar/__snapshots__/test_corpus/sdk_tests/topic/variadic-parameter.test.w_test_sim.md b/tools/hangar/__snapshots__/test_corpus/sdk_tests/topic/variadic-parameter.test.w_test_sim.md new file mode 100644 index 00000000000..3d25b2d907f --- /dev/null +++ b/tools/hangar/__snapshots__/test_corpus/sdk_tests/topic/variadic-parameter.test.w_test_sim.md @@ -0,0 +1,12 @@ +# [variadic-parameter.test.w](../../../../../../examples/tests/sdk_tests/topic/variadic-parameter.test.w) | test | sim + +## stdout.log +```log +pass ─ variadic-parameter.test.wsim » root/env0/test:publish message array to topic + +Tests 1 passed (1) +Snapshots 1 skipped +Test Files 1 passed (1) +Duration +``` +