-
Notifications
You must be signed in to change notification settings - Fork 198
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat(sdk): cloud.Topic publish() support variadic parameter (#6281)
Now we can send multiple messages at once to a topic. Closes #6072 ## Checklist - [x] Title matches [Winglang's style guide](https://www.winglang.io/contributing/start-here/pull_requests#how-are-pull-request-titles-formatted) - [x] Description explains motivation and solution - [x] Tests added (always) - [ ] Docs updated (only required for features) - [ ] Added `pr/e2e-full` label if this feature requires end-to-end testing *By submitting this pull request, I confirm that my contribution is made under the terms of the [Wing Cloud Contribution License](https://github.com/winglang/wing/blob/main/CONTRIBUTION_LICENSE.md)*.
- Loading branch information
1 parent
bb52687
commit 8329f47
Showing
17 changed files
with
435 additions
and
49 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,17 @@ | ||
bring cloud; | ||
bring util; | ||
|
||
let c = new cloud.Counter(); | ||
let t = new cloud.Topic(); | ||
t.onMessage(inflight (msg: str) => { | ||
c.inc(); | ||
}); | ||
|
||
test "publish message array to topic" { | ||
t.publish("msg1", "msg2", "msg3", | ||
"msg4", "msg5", "msg6", "msg7", | ||
"msg8", "msg9", "msg10", "msg11", | ||
"msg12", "msg13", "msg14", "msg15"); | ||
|
||
assert(util.waitUntil(inflight () => { return c.peek() == 15; })); | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,17 +1,68 @@ | ||
import { SNSClient, PublishCommand } from "@aws-sdk/client-sns"; | ||
import { | ||
SNSClient, | ||
PublishBatchCommand, | ||
PublishBatchRequestEntry, | ||
InvalidBatchEntryIdException, | ||
} from "@aws-sdk/client-sns"; | ||
import { ITopicClient } from "../cloud"; | ||
import { Util } from "../util/util"; | ||
|
||
/** | ||
* Topics in AWS can receive up to 10 messages at a time | ||
* using the PublishBatchCommand, this constant is used | ||
* to generate batches respecting the limits. | ||
*/ | ||
const CHUNK_SIZE = 10; | ||
|
||
export class TopicClient implements ITopicClient { | ||
constructor( | ||
private readonly topicArn: string, | ||
private readonly client: SNSClient = new SNSClient({}) | ||
) {} | ||
|
||
public async publish(message: string): Promise<void> { | ||
const command = new PublishCommand({ | ||
Message: message, | ||
TopicArn: this.topicArn, | ||
}); | ||
await this.client.send(command); | ||
public async publish(...messages: string[]): Promise<void> { | ||
if (messages.includes("")) { | ||
throw new Error("Empty messages are not allowed"); | ||
} | ||
|
||
let batchMessages: Array<PublishBatchRequestEntry[]> = []; | ||
for (let i = 0; i < messages.length; i += CHUNK_SIZE) { | ||
const chunk = messages.slice(i, i + CHUNK_SIZE); | ||
batchMessages.push(this.processBatchMessages(chunk, i)); | ||
} | ||
|
||
for (const batch of batchMessages) { | ||
try { | ||
const command = new PublishBatchCommand({ | ||
TopicArn: this.topicArn, | ||
PublishBatchRequestEntries: batch, | ||
}); | ||
await this.client.send(command); | ||
} catch (e) { | ||
if (e instanceof InvalidBatchEntryIdException) { | ||
throw new Error( | ||
`The Id of a batch entry in a batch request doesn't abide by the specification. (message=${messages}): ${ | ||
(e as Error).stack | ||
})}` | ||
); | ||
} | ||
throw new Error((e as Error).stack); | ||
} | ||
} | ||
} | ||
|
||
private processBatchMessages( | ||
messages: string[], | ||
idx: number | ||
): PublishBatchRequestEntry[] { | ||
let batchMessages: Array<PublishBatchRequestEntry> = []; | ||
let index = idx; | ||
for (const message of messages) { | ||
batchMessages.push({ | ||
Id: Util.sha256(`${message}-${++index}`), | ||
Message: message, | ||
}); | ||
} | ||
return batchMessages; | ||
} | ||
} |
Oops, something went wrong.