Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
0.76.0: SNS -> publish to subscribed SQS queue, "Records" assumption leads to lost message, empty array. mj1618#134

Bump serverless version to v3
  • Loading branch information
huangjimmy committed Feb 24, 2023
1 parent 5eb5031 commit 24b9e5d
Show file tree
Hide file tree
Showing 9 changed files with 16,779 additions and 9,473 deletions.
14,683 changes: 11,429 additions & 3,254 deletions package-lock.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
"express": "^4.17.1",
"lodash": "^4.17.21",
"node-fetch": "^2.6.1",
"serverless": "^2.51.2",
"serverless": "^3.27.0",
"shelljs": "^0.8.4",
"uuid": "^8.3.2",
"xml": "^1.0.1"
Expand Down
12 changes: 9 additions & 3 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,10 @@ class ServerlessOfflineSns {
} else {
this.region = "us-east-1";
}
this.autoSubscribe = this.config.autoSubscribe === undefined ? true : this.config.autoSubscribe;
this.autoSubscribe =
this.config.autoSubscribe === undefined
? true
: this.config.autoSubscribe;
// Congure SNS client to be able to find us.
AWS.config.sns = {
endpoint: "http://127.0.0.1:" + this.localPort,
Expand Down Expand Up @@ -204,13 +207,16 @@ class ServerlessOfflineSns {
await this.unsubscribeAll();
this.debug("subscribing functions");
const subscribePromises: Array<Promise<any>> = [];
if(this.autoSubscribe) {
if (this.autoSubscribe) {
if (this.servicesDirectory) {
shell.cd(this.servicesDirectory);
for (const directory of shell.ls("-d", "*/")) {
shell.cd(directory);
const service = directory.split("/")[0];
const serverless = await loadServerlessConfig(shell.pwd().toString(), this.debug);
const serverless = await loadServerlessConfig(
shell.pwd().toString(),
this.debug
);
this.debug("Processing subscriptions for ", service);
this.debug("shell.pwd()", shell.pwd());
this.debug("serverless functions", serverless.service.functions);
Expand Down
6 changes: 3 additions & 3 deletions src/sls-config-parser.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import * as fs from "fs";
import * as path from "path";
import * as Serverless from "serverless";
import * as findConfigPath from 'serverless/lib/cli/resolve-configuration-path';
import * as findConfigPath from "serverless/lib/cli/resolve-configuration-path";

export async function loadServerlessConfig(cwd = process.cwd(), debug) {
console.log("debug loadServerlessConfig", cwd);
Expand All @@ -10,8 +10,8 @@ export async function loadServerlessConfig(cwd = process.cwd(), debug) {
cwd = path.dirname(cwd);
}

const configurationPath = await findConfigPath({cwd});
const serverless = new Serverless({configurationPath});
const configurationPath = await findConfigPath({ cwd });
const serverless = new Serverless({ configurationPath });
await serverless.init();
return serverless;
}
4 changes: 3 additions & 1 deletion src/sns-adapter.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import * as AWS from "aws-sdk";
import {
ListSubscriptionsResponse, ListTopicsResponse, MessageAttributeMap
ListSubscriptionsResponse,
ListTopicsResponse,
MessageAttributeMap,
} from "aws-sdk/clients/sns.d";
import * as _ from "lodash";
import fetch from "node-fetch";
Expand Down
47 changes: 25 additions & 22 deletions src/sns-server.ts
Original file line number Diff line number Diff line change
@@ -1,24 +1,26 @@
import { SQS } from "aws-sdk";
import { TopicsList, Subscription } from "aws-sdk/clients/sns";
import fetch from "node-fetch";
import { URL } from "url";
import { IDebug, ISNSServer } from "./types";
import * as bodyParser from "body-parser";
import * as _ from "lodash";
import * as bodyParser from "body-parser";
import * as xml from "xml";

import { IDebug, ISNSServer } from "./types";
import { Subscription, TopicsList } from "aws-sdk/clients/sns";
import {
arrayify,
createAttr,
createMessageId,
createMetadata,
createSnsTopicEvent,
parseMessageAttributes,
formatMessageAttributes,
parseAttributes,
createMessageId,
validatePhoneNumber,
parseMessageAttributes,
topicArnFromName,
formatMessageAttributes,
validatePhoneNumber,
} from "./helpers";

import { SQS } from "aws-sdk";
import { URL } from "url";
import fetch from "node-fetch";

export class SNSServer implements ISNSServer {
private topics: TopicsList;
private subscriptions: Subscription[];
Expand Down Expand Up @@ -309,18 +311,19 @@ export class SNSServer implements ISNSServer {
})
.promise();
} else {
const records = JSON.parse(event).Records ?? [];
const messagePromises = records.map((record) => {
return sqs
.sendMessage({
QueueUrl: sub.Endpoint,
MessageBody: JSON.stringify(record.Sns),
MessageAttributes: formatMessageAttributes(messageAttributes),
...(messageGroupId && { MessageGroupId: messageGroupId }),
})
.promise();
});
return Promise.all(messagePromises);
const record = {
awsRegion: this.region,
eventSource: "aws:sqs",
body: event,
};
return sqs
.sendMessage({
QueueUrl: sub.Endpoint,
MessageBody: JSON.stringify({ Records: [record] }),
MessageAttributes: formatMessageAttributes(messageAttributes),
...(messageGroupId && { MessageGroupId: messageGroupId }),
})
.promise();
}
}

Expand Down
1 change: 0 additions & 1 deletion src/types.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,4 +45,3 @@ export interface IMessageAttribute {
Type: string;
Value: string;
}

4 changes: 2 additions & 2 deletions test/spec/sns.ts
Original file line number Diff line number Diff line change
Expand Up @@ -457,8 +457,8 @@ describe("test", () => {
const createServerless = (
accountId: number,
handlerName: string = "pongHandler",
host: string|null = null,
subscribeEndpoint: string|null = null
host: string | null = null,
subscribeEndpoint: string | null = null
) => {
return {
config: {
Expand Down
Loading

0 comments on commit 24b9e5d

Please sign in to comment.