-
Notifications
You must be signed in to change notification settings - Fork 83
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #7 from vtrifonov/master
Added support for sending messages to AMQP
- Loading branch information
Showing
16 changed files
with
291 additions
and
39 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,96 @@ | ||
package amqp | ||
|
||
import ( | ||
"log" | ||
"time" | ||
|
||
"fmt" | ||
|
||
"github.com/jmartin82/mmock/definition" | ||
"github.com/jmartin82/mmock/parse" | ||
"github.com/streadway/amqp" | ||
) | ||
|
||
//MessageSender sends message to RabbitMQ | ||
type MessageSender struct { | ||
Parser parse.ResponseParser | ||
} | ||
|
||
//Send message to rabbitMQ if needed | ||
func (msender MessageSender) Send(per *definition.Persist, req *definition.Request, res *definition.Response) bool { | ||
if per.AMQP.URL == "" { | ||
return true | ||
} | ||
|
||
per.AMQP.Body = msender.Parser.ParseBody(req, res, per.AMQP.Body, per.AMQP.BodyAppend) | ||
|
||
if per.AMQP.Delay > 0 { | ||
log.Printf("Adding a delay before sending message") | ||
time.Sleep(time.Duration(per.AMQP.Delay) * time.Second) | ||
} | ||
|
||
return sendMessage(per.AMQP, res) | ||
} | ||
|
||
//NewMessageSender creates a new MessageSender | ||
func NewMessageSender(parser parse.ResponseParser) *MessageSender { | ||
result := MessageSender{Parser: parser} | ||
return &result | ||
} | ||
|
||
func sendMessage(publishInfo definition.AMQPPublishing, res *definition.Response) bool { | ||
conn, err := amqp.Dial(publishInfo.URL) | ||
|
||
if hasError(err, "Failed to connect to RabbitMQ", res) { | ||
return false | ||
} | ||
defer conn.Close() | ||
|
||
ch, err := conn.Channel() | ||
if hasError(err, "Failed to open a channel", res) { | ||
return false | ||
} | ||
defer ch.Close() | ||
|
||
err = ch.Publish( | ||
publishInfo.Exchange, // exchange | ||
publishInfo.RoutingKey, // routing key | ||
false, // mandatory | ||
false, // immediate | ||
amqp.Publishing{ | ||
Body: []byte(publishInfo.Body), | ||
ContentType: publishInfo.ContentType, | ||
ContentEncoding: publishInfo.ContentEncoding, | ||
Priority: publishInfo.Priority, | ||
CorrelationId: publishInfo.CorrelationID, | ||
ReplyTo: publishInfo.ReplyTo, | ||
Expiration: publishInfo.Expiration, | ||
MessageId: publishInfo.Expiration, | ||
Timestamp: publishInfo.Timestamp, | ||
Type: publishInfo.Type, | ||
UserId: publishInfo.UserID, | ||
AppId: publishInfo.AppID, | ||
DeliveryMode: 2, | ||
}) | ||
|
||
if hasError(err, "Failed to publish a message", res) { | ||
return false | ||
} | ||
log.Printf(" [x] Sent %s", publishInfo.Body) | ||
return true | ||
} | ||
|
||
func failOnError(err error, msg string) { | ||
if err != nil { | ||
log.Fatalf("%s: %s", msg, err) | ||
} | ||
} | ||
|
||
func hasError(err error, msg string, res *definition.Response) bool { | ||
if err != nil { | ||
log.Print(err) | ||
res.Body = fmt.Errorf("%s: %s", msg, err).Error() | ||
res.StatusCode = 500 | ||
} | ||
return err != nil | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,9 @@ | ||
package amqp | ||
|
||
import "github.com/jmartin82/mmock/definition" | ||
|
||
//Sender sends messages to AMQP server | ||
type Sender interface { | ||
//Send sends to amqp | ||
Send(per *definition.Persist, req *definition.Request, res *definition.Response) bool | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,26 @@ | ||
{ | ||
"request": { | ||
"method": "POST", | ||
"path": "/amqp/*", | ||
"body": "*" | ||
}, | ||
"response": { | ||
"statusCode": 202, | ||
"headers": { | ||
"Content-Type":["application/json"] | ||
}, | ||
"body": "{{ request.body }}", | ||
"bodyAppend": "{ \"id\": {{request.url./amqp/(?P<value>\\d+)}}, \"city\": \"{{ fake.City }}\" }" | ||
}, | ||
"persist" : { | ||
"name" : "/user-{{request.url./amqp/(?P<value>\\d+)}}.json", | ||
"amqp": { | ||
"url": "amqp://guest:guest@localhost:5672/myVHost", | ||
"body": "{{ response.body }}", | ||
"delay": 2, | ||
"exchange": "myExchange", | ||
"type": "MockType", | ||
"correlationId": "9782b88f-0c6e-4879-8c23-4699785e6a95" | ||
} | ||
} | ||
} |
Oops, something went wrong.