Skip to content

Commit

Permalink
bump amqplib version
Browse files Browse the repository at this point in the history
  • Loading branch information
MasterOdin committed Jul 16, 2021
1 parent acbf7b2 commit fc24f67
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 11 deletions.
9 changes: 5 additions & 4 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -49,21 +49,22 @@
"@types/node": "^12.0.3",
"@types/request": "^2.48.1",
"@types/uuid": "^3.4.4",
"amqplib": "^0.5.2",
"amqplib": "^0.8.0",
"ioredis": "^4.14.1",
"mongoose": "^5.11.0",
"node-fetch": "^2.6.1",
"uuid": "^3.3.2"
},
"devDependencies": {
"@masterodin/eslint-config-typescript": "^1.0.0",
"@types/jest": "^25.2.2",
"@types/jest": "^26.0.4",
"@types/node-fetch": "^2.5.7",
"@typescript-eslint/eslint-plugin": "^3.0.2",
"@typescript-eslint/parser": "^3.0.2",
"bluebird": "^3.7.2",
"eslint": "^7.1.0",
"jest": "^25.5.4",
"ts-jest": "^25.5.1",
"jest": "^27.0.6",
"ts-jest": "^27.0.3",
"tsc-publish": "^0.5.0",
"typescript": "^4.3.5"
}
Expand Down
44 changes: 37 additions & 7 deletions src/rabbit.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import { TLSSocketOptions } from 'tls';

import { RabbitMessage, RabbitOptions, RabbitOnTopicOptions, RabbitOnRpcOptions, RabbitOnQueueOptions, RabbitContentType } from './types';

import type Bluebird from 'bluebird';

interface Subscription extends amqplib.Replies.Consume {
unsubscribe: () => void;
}
Expand All @@ -29,7 +31,7 @@ export class Rabbit {
public options: RabbitOptions;

private conn: amqplib.Connection | null;
private pch: Promise<amqplib.Channel>;
private pch: Bluebird<amqplib.Channel>;
private mgmturl: string;
private vhost: string;
private prefix?: string;
Expand Down Expand Up @@ -110,7 +112,7 @@ export class Rabbit {
});

// Make a shared channel for publishing and subscribe
this.pch = pconn.then((conn: amqplib.Connection): Promise<amqplib.Channel> => conn.createChannel());
this.pch = pconn.then((conn: amqplib.Connection) => conn.createChannel());
this.mgmturl = `http://${this.options.username}:${this.options.password}@${this.options.hostname}:15672/api`;
this.vhost = this.options.vhost === '/' ? '%2f' : (this.options.vhost || '');
this.prefix = this.options.prefix;
Expand All @@ -119,10 +121,14 @@ export class Rabbit {
}

public close(): Promise<void> {
if (this.conn) {
return this.conn.close();
}
return new Promise((resolve) => resolve());
return new Promise((resolve) => {
if (!this.conn) {
return resolve();
}
this.conn.close().then(() => {
resolve();
});
});
}

private resolveTopicName(topic_name: string): string {
Expand Down Expand Up @@ -309,7 +315,31 @@ export class Rabbit {

channel.sendToQueue(queueName, this.encodeContent(content), options);
if (replyTo) {
resolve();
resolve({
content: null,
fields: {
deliveryTag: 0,
redelivered: false,
exchange: this.exchange,
routingKey: queue.queue,
},
properties: {
contentType: null,
contentEncoding: null,
headers: {},
deliveryMode: null,
priority: 0,
correlationId: options.correlationId,
replyTo: options.replyTo,
expiration: 0,
timestamp: 0,
messageId: null,
type: null,
userId: null,
appId: null,
clusterId: null,
},
});
}
});
}
Expand Down

0 comments on commit fc24f67

Please sign in to comment.