diff --git a/package.json b/package.json index e995f0e..2eaf304 100644 --- a/package.json +++ b/package.json @@ -49,7 +49,7 @@ "@types/node": "^12.0.3", "@types/request": "^2.48.1", "@types/uuid": "^3.4.4", - "amqplib": "^0.5.2", + "amqplib": "^0.8.0", "ioredis": "^4.14.1", "mongoose": "^5.11.0", "node-fetch": "^2.6.1", @@ -57,13 +57,14 @@ }, "devDependencies": { "@masterodin/eslint-config-typescript": "^1.0.0", - "@types/jest": "^25.2.2", + "@types/jest": "^26.0.4", "@types/node-fetch": "^2.5.7", "@typescript-eslint/eslint-plugin": "^3.0.2", "@typescript-eslint/parser": "^3.0.2", + "bluebird": "^3.7.2", "eslint": "^7.1.0", - "jest": "^25.5.4", - "ts-jest": "^25.5.1", + "jest": "^27.0.6", + "ts-jest": "^27.0.3", "tsc-publish": "^0.5.0", "typescript": "^4.3.5" } diff --git a/src/rabbit.ts b/src/rabbit.ts index 66405f6..1815a51 100644 --- a/src/rabbit.ts +++ b/src/rabbit.ts @@ -8,6 +8,8 @@ import { TLSSocketOptions } from 'tls'; import { RabbitMessage, RabbitOptions, RabbitOnTopicOptions, RabbitOnRpcOptions, RabbitOnQueueOptions, RabbitContentType } from './types'; +import type Bluebird from 'bluebird'; + interface Subscription extends amqplib.Replies.Consume { unsubscribe: () => void; } @@ -29,7 +31,7 @@ export class Rabbit { public options: RabbitOptions; private conn: amqplib.Connection | null; - private pch: Promise; + private pch: Bluebird; private mgmturl: string; private vhost: string; private prefix?: string; @@ -110,7 +112,7 @@ export class Rabbit { }); // Make a shared channel for publishing and subscribe - this.pch = pconn.then((conn: amqplib.Connection): Promise => conn.createChannel()); + this.pch = pconn.then((conn: amqplib.Connection) => conn.createChannel()); this.mgmturl = `http://${this.options.username}:${this.options.password}@${this.options.hostname}:15672/api`; this.vhost = this.options.vhost === '/' ? '%2f' : (this.options.vhost || ''); this.prefix = this.options.prefix; @@ -119,10 +121,14 @@ export class Rabbit { } public close(): Promise { - if (this.conn) { - return this.conn.close(); - } - return new Promise((resolve) => resolve()); + return new Promise((resolve) => { + if (!this.conn) { + return resolve(); + } + this.conn.close().then(() => { + resolve(); + }); + }); } private resolveTopicName(topic_name: string): string { @@ -309,7 +315,31 @@ export class Rabbit { channel.sendToQueue(queueName, this.encodeContent(content), options); if (replyTo) { - resolve(); + resolve({ + content: null, + fields: { + deliveryTag: 0, + redelivered: false, + exchange: this.exchange, + routingKey: queue.queue, + }, + properties: { + contentType: null, + contentEncoding: null, + headers: {}, + deliveryMode: null, + priority: 0, + correlationId: options.correlationId, + replyTo: options.replyTo, + expiration: 0, + timestamp: 0, + messageId: null, + type: null, + userId: null, + appId: null, + clusterId: null, + }, + }); } }); }