Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

contrib: skeleton app structure & Dogstatsd nsqd addon #909

Open
wants to merge 22 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 20 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions apps/nsqd/nsqd.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/BurntSushi/toml"
"github.com/judwhite/go-svc/svc"
"github.com/mreiferson/go-options"
"github.com/nsqio/nsq/contrib"
"github.com/nsqio/nsq/internal/app"
"github.com/nsqio/nsq/internal/version"
"github.com/nsqio/nsq/nsqd"
Expand Down Expand Up @@ -144,6 +145,9 @@ func nsqdFlagSet(opts *nsqd.Options) *flag.FlagSet {
flagSet.Int("max-deflate-level", opts.MaxDeflateLevel, "max deflate compression level a client can negotiate (> values == > nsqd CPU usage)")
flagSet.Bool("snappy", opts.SnappyEnabled, "enable snappy feature negotiation (client compression)")

optModulesOptions := app.StringArray{}
flagSet.Var(&optModulesOptions, "mod-opt", "optional module options, of form: --mod-opt={{moduleName}}={{moduleOpt}}={{moduleOptValue}}")

return flagSet
}

Expand Down Expand Up @@ -232,6 +236,10 @@ func (p *program) Start() error {
}
nsqd.Main()

// hook into addons
addons := contrib.NewEnabledNSQDAddons(opts.ModOpt, nsqd)
addons.Start()

p.nsqd = nsqd
return nil
}
Expand Down
28 changes: 28 additions & 0 deletions contrib/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
## Optional/Contrib Modules

Contrib modules are a way to add functionality to nsqd, in a decoupled way.


The modules currently available are:

- Datadog


### Architecture

Contrib modules are initialized by passing in `--mod-opt=` to nsqd. This may
be provided multiple times. An array of `mod-opt`s are then passed to the
contrib module initializer (during nsqd initialization). Each module is then
passed its options to see if valid options were provided, after which it is
initialized and added to the nsqd waitGroup.


### Datadog

Datadog contrib module, reports nsqd statistics to a datadog daemon. The options
it exposes are:

- `--mod-opt=-dogstatsd-address=<IP:PORT>`
- `--mod-opt=-dogstatsd-interval=<INT_SECONDS>`
- `--mod-opt=-dogstatsd-prefix=<STRING>`

83 changes: 83 additions & 0 deletions contrib/datadog_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package contrib

import (
"errors"
"fmt"
"net"
"strings"
"time"
)

type DataDogClient struct {
conn net.Conn
addr string
prefix string
}

type DataDogTag struct {
k string
v string
}

type DataDogTags struct {
tags []*DataDogTag
}

// returns dogstatd compatible string
// "#tag1:value1,tag2:value2
func (ddt *DataDogTags) String() string {
ts := []string{}
for _, tag := range ddt.tags {
ts = append(ts, fmt.Sprintf("%s:%s", tag.k, tag.v))
}
return "#" + strings.Join(ts, ",")
}

func NewDataDogClient(addr string, prefix string) *DataDogClient {
return &DataDogClient{
addr: addr,
prefix: prefix,
}
}

func (c *DataDogClient) String() string {
return c.addr
}

func (c *DataDogClient) CreateSocket() error {
conn, err := net.DialTimeout("udp", c.addr, time.Second)
if err != nil {
return err
}
c.conn = conn
return nil
}

func (c *DataDogClient) Close() error {
return c.conn.Close()
}

func (c *DataDogClient) Incr(stat string, count int64, tags *DataDogTags) error {
return c.send(stat, "%d|c", count, tags)
}

func (c *DataDogClient) Decr(stat string, count int64, tags *DataDogTags) error {
return c.send(stat, "%d|c", -count, tags)
}

func (c *DataDogClient) Timing(stat string, delta int64, tags *DataDogTags) error {
return c.send(stat, "%d|ms", delta, tags)
}

func (c *DataDogClient) Gauge(stat string, value int64, tags *DataDogTags) error {
return c.send(stat, "%d|g", value, tags)
}

func (c *DataDogClient) send(stat string, format string, value int64, tags *DataDogTags) error {
if c.conn == nil {
return errors.New("not connected")
}
format = fmt.Sprintf("%s%s:%s|%s", c.prefix, stat, format, tags.String())
_, err := fmt.Fprintf(c.conn, format, value)
return err
}
58 changes: 58 additions & 0 deletions contrib/datadog_client_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package contrib

import (
"github.com/nsqio/nsq/internal/test"
"net"
"testing"
)

func TestDDTagsStringNoTags(t *testing.T) {
test.Equal(
t,
(&DataDogTags{}).String(),
"#",
)
}

func TestDDTagsStringSingleString(t *testing.T) {
test.Equal(
t,
(&DataDogTags{
tags: []*DataDogTag{
{k: "topic_name", v: "test_topic"},
},
}).String(),
"#topic_name:test_topic",
)
}

func TestDDTagsStringMultipleStrings(t *testing.T) {
test.Equal(
t,
(&DataDogTags{
tags: []*DataDogTag{
{k: "topic_name", v: "test_topic"},
{k: "channel_name", v: "test_channel"},
},
}).String(),
"#topic_name:test_topic,channel_name:test_channel",
)
}

func TestDDCSend(t *testing.T) {
r, w := net.Pipe()
b := make([]byte, len("nsq.topic.depth:100|t|#"))

go func() {
ddc := &DataDogClient{
conn: w,
addr: "test",
prefix: "nsq.",
}
testValue := int64(100)
ddc.send("topic.depth", "%d|t", testValue, &DataDogTags{})
}()

r.Read(b)
test.Equal(t, string(b), "nsq.topic.depth:100|t|#")
}
Loading