-
Notifications
You must be signed in to change notification settings - Fork 6
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
287e925
commit dce8755
Showing
7 changed files
with
288 additions
and
7 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1 +1,20 @@ | ||
# Candi Plugin | ||
|
||
Extend candi server or worker | ||
|
||
## [GCP PubSub](https://github.com/agungdwiprasetyo/candi-plugin/tree/master/gcppubsub) | ||
|
||
<p align="center"> | ||
<img src="https://storage.googleapis.com/agungdp/static/logo/google-cloud-pub-sub.svg" width="80" alt="gcp pubsub logo" /> | ||
</p> | ||
|
||
Google Cloud PubSub | ||
|
||
## [STOMP](https://github.com/agungdwiprasetyo/candi-plugin/tree/master/stomp_worker) | ||
|
||
<p align="center"> | ||
<img src="https://storage.googleapis.com/agungdp/static/logo/stomp.png" width="120" alt="stomp logo" /> | ||
<img src="https://storage.googleapis.com/agungdp/static/logo/amq.png" width="120" alt="amq logo" /> | ||
</p> | ||
|
||
Can be used for AMQ Consumer |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,25 @@ | ||
package stompworker | ||
|
||
import ( | ||
"time" | ||
|
||
"github.com/go-stomp/stomp/v3" | ||
) | ||
|
||
type stompBroker struct { | ||
conn *stomp.Conn | ||
} | ||
|
||
// InitDefaultConnection stomp | ||
func InitDefaultConnection(broker, username, password string) *stomp.Conn { | ||
conn, err := stomp.Dial("tcp", broker, | ||
stomp.ConnOpt.Login(username, password), | ||
stomp.ConnOpt.Host("/"), | ||
stomp.ConnOpt.HeartBeatError(360*time.Second), | ||
stomp.ConnOpt.HeartBeatGracePeriodMultiplier(3), | ||
) | ||
if err != nil { | ||
panic("STOMP: cannot connect to server broker: " + err.Error()) | ||
} | ||
return conn | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,8 @@ | ||
package stompworker | ||
|
||
import "pkg.agungdp.dev/candi/codebase/factory/types" | ||
|
||
const ( | ||
// STOMPConsumer types | ||
STOMPConsumer types.Worker = "stomp_consumer" | ||
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,45 @@ | ||
package stompworker | ||
|
||
import ( | ||
"context" | ||
|
||
"github.com/go-stomp/stomp/v3" | ||
"pkg.agungdp.dev/candi/candihelper" | ||
"pkg.agungdp.dev/candi/candishared" | ||
"pkg.agungdp.dev/candi/codebase/interfaces" | ||
"pkg.agungdp.dev/candi/tracer" | ||
) | ||
|
||
const ( | ||
// StompContentTypeKey for context key | ||
StompContentTypeKey = candishared.ContextKey("stompContentType") | ||
) | ||
|
||
// publisher instance | ||
type publisher struct { | ||
conn *stomp.Conn | ||
} | ||
|
||
// NewStompPublisher constructor | ||
func NewStompPublisher(conn *stomp.Conn) interfaces.Publisher { | ||
return &publisher{ | ||
conn: conn, | ||
} | ||
} | ||
|
||
// PublishMessage method | ||
func (s *publisher) PublishMessage(ctx context.Context, args *candishared.PublisherArgument) (err error) { | ||
trace, ctx := tracer.StartTraceWithContext(ctx, "StompPublisher:PublishMessage") | ||
defer trace.Finish() | ||
|
||
contentType, ok := candishared.GetValueFromContext(ctx, StompContentTypeKey).(string) | ||
if !ok { | ||
contentType = "text/plain" | ||
} | ||
trace.SetTag("content-type", contentType) | ||
trace.SetTag("topic", args.Topic) | ||
trace.SetTag("key", args.Key) | ||
trace.Log("data", args.Data) | ||
|
||
return s.conn.Send(args.Topic, contentType, candihelper.ToBytes(args.Data)) | ||
} |
Oops, something went wrong.