From 581daf6dbc3e9822c6e514c82dbbe99e5adf3e67 Mon Sep 17 00:00:00 2001 From: Jason Hancock Date: Wed, 23 Aug 2023 17:02:08 -0700 Subject: [PATCH] tune up, modernization --- .github/workflows/pullrequests.yaml | 17 +++ .gitignore | 1 + .travis.yml | 9 -- README.md | 128 ++++++++----------- cmd/amproxy/main.go | 64 ---------- cmd/client/cmd.go | 20 +++ cmd/client/signature.go | 47 +++++++ cmd/client/testclient.go | 119 +++++++++++++++++ cmd/root.go | 36 ++++++ cmd/server/cmd.go | 96 ++++++++++++++ docker-compose.yml | 10 ++ go.mod | 28 ++++ go.sum | 46 +++++++ main.go | 43 +++++++ message.go | 66 ---------- message_test.go | 39 ------ packaging/redhat/auth_file.yaml | 4 +- pkg/amproxy/message.go | 83 ++++++++++++ pkg/amproxy/message_test.go | 48 +++++++ client.go => pkg/client/client.go | 14 +- pkg/client/client_test.go | 68 ++++++++++ pkg/lineserver/server.go | 102 +++++++++++++++ pkg/lineserver/server_test.go | 44 +++++++ server/auth.go | 17 +-- server/auth_provider_static.go | 80 ++++++++---- server/auth_provider_static_test.go | 55 ++++---- server/auth_test.go | 16 +-- server/line_protocol_server.go | 114 ----------------- server/line_protocol_server_test.go | 41 ------ server/metric_writer.go | 4 +- server/metric_writer_carbon.go | 25 ++-- server/metric_writer_carbon_test.go | 40 +++--- server/server.go | 74 +++-------- server/server_test.go | 190 +++++++++++++++++++++------- 34 files changed, 1151 insertions(+), 637 deletions(-) create mode 100644 .github/workflows/pullrequests.yaml delete mode 100644 .travis.yml delete mode 100644 cmd/amproxy/main.go create mode 100644 cmd/client/cmd.go create mode 100644 cmd/client/signature.go create mode 100644 cmd/client/testclient.go create mode 100644 cmd/root.go create mode 100644 cmd/server/cmd.go create mode 100644 docker-compose.yml create mode 100644 go.mod create mode 100644 go.sum create mode 100644 main.go delete mode 100644 message.go delete mode 100644 message_test.go create mode 100644 pkg/amproxy/message.go create mode 100644 pkg/amproxy/message_test.go rename client.go => pkg/client/client.go (80%) create mode 100644 pkg/client/client_test.go create mode 100644 pkg/lineserver/server.go create mode 100644 pkg/lineserver/server_test.go delete mode 100644 server/line_protocol_server.go delete mode 100644 server/line_protocol_server_test.go diff --git a/.github/workflows/pullrequests.yaml b/.github/workflows/pullrequests.yaml new file mode 100644 index 0000000..c90109a --- /dev/null +++ b/.github/workflows/pullrequests.yaml @@ -0,0 +1,17 @@ +name: Tests +on: [pull_request] + +jobs: + test: + name: Test + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v3 + + - name: Setup Go + uses: actions/setup-go@v3 + with: + go-version: stable + + - name: Go Tests + run: go test -v ./... diff --git a/.gitignore b/.gitignore index 97360c4..fbe0546 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,2 @@ +amproxy packaging/*.rpm diff --git a/.travis.yml b/.travis.yml deleted file mode 100644 index 5d7b1bf..0000000 --- a/.travis.yml +++ /dev/null @@ -1,9 +0,0 @@ -language: go - -go: - - 1.9 - - tip - -install: make - -script: make test diff --git a/README.md b/README.md index 586e420..6c0ce6c 100644 --- a/README.md +++ b/README.md @@ -1,81 +1,67 @@ # amproxy - Authenticated Metrics Proxy -[![GoDoc](https://godoc.org/github.com/jasonhancock/amproxy?status.svg)](https://godoc.org/github.com/jasonhancock/amproxy) -[![Build Status](https://travis-ci.org/jasonhancock/amproxy.svg?branch=master)](https://travis-ci.org/jasonhancock/amproxy) +[![Go Reference](https://pkg.go.dev/badge/github.com/jasonhancock/amproxy.svg)](https://pkg.go.dev/github.com/jasonhancock/amproxy) [![Go Report Card](https://goreportcard.com/badge/jasonhancock/amproxy)](https://goreportcard.com/report/jasonhancock/amproxy) -A proxy for Graphite's Carbon service that authenticates messages before passing -them on to Carbon or dropping them on the floor. - -This is a prototype and is just an example of what could be possible. It is -quite literally the first code beyond a "Hello World" that I've written in Go. +A proxy for Graphite's Carbon service that authenticates messages before passing them on to Carbon or dropping them on the floor. ## What does this do? -Carbon listens on port 2003 and doesn't offer any sort of authentication. -Usually this is manageable by firewalling off the service to only allow -connections from hosts you trust. The problem is that I want to build a device -that my friends/family run on their networks at home (without static IPs) and -report metrics to my Carbon server. I could run some sort of dynamic dns client -on each device and dynamically manage my firewall, but I don't really want to -deal with that. - -Instead, I run Carbon bound to 127.0.0.1:2003 and run amproxy on port 2005 -exposed to the internet. The client devices are each given a public/private key -pair that can be used to generate signed messages. These signed messages are -sent to amproxy which authenticates the message by validating the signature -and whether or not the specified metric is authorized for the given key pair, -and if so, forwards the metric on to Carbon. +Carbon listens on port 2003 and doesn't offer any sort of authentication. Usually this is manageable by firewalling off the service to only allow connections from hosts you trust. The problem is that I want to build a device that my friends/family run on their networks at home (without static IPs) and report metrics to my Carbon server. I could run some sort of dynamic dns client on each device and dynamically manage my firewall, but I don't really want to deal with that or with doing something like mTLS, or using something like MQTT. + +Instead, I run Carbon bound to 127.0.0.1:2003 and run amproxy on port 2005 exposed to the internet. The client devices are each given a public/private key pair that can be used to generate signed messages. These signed messages are sent to amproxy which authenticates the message by validating the signature and whether or not the specified metric is authorized for the given key pair, and if so, forwards the metric on to Carbon. ## Configuration -All configuration is done via flags (will be updated for env vars soon): +```none +$ amproxy server --help +Starts the server + +Usage: + amproxy server [flags] -$ ./amproxy --help -Usage of ./amproxy: - -addr string - interface/port to bind to (default ":2005") - -auth-file string - Location of auth file (default "/etc/amproxy/auth_file.yaml") - -carbon-addr string - Carbon address:port (default "127.0.0.1:2003") - -skew float - amount of clock skew tolerated in seconds (default 300) +Flags: + --addr string The interface and port to bind the server to. Can be set with the ADDR env variable (default "127.0.0.1:2005") + --auth-file string The path to the auth file. Can be set with the AUTH_FILE env variable (default "/etc/amproxy/auth_file.yaml") + --carbon-addr string The address of the carbon server. Can be set with the CARBON_ADDR env variable (default "127.0.0.1:2003") + -h, --help help for server + --log-format string The format of log messages. (logfmt|json) (default "logfmt") + --log-level string Log level (all|err|warn|info|debug (default "info") + --skew duration The amount of clock skew tolerated. Can be set with the MAX_SKEW env variable (default 5m0s) +``` ## Auth File Format -``` +```yaml --- apikeys: my_public_key: secret_key: my_secret_key metrics: - metric1: 1 - metric2: 1 + - metric1 + - metric2 my_public_key2: secret_key: my_secret_key2 metrics: - metric3: 1 - metric4: 1 + - metric3 + - metric4 ``` -In the example above, my_public_key is authorized for metric1 and metric2 and -uses the `my_secret_key` private key. +In the example above, `my_public_key` is authorized for `metric1` and `metric2` and uses the `my_secret_key` private key. -If the AUTH_FILE is updated on disk, it will automatically get reloaded within -60 seconds. +If the `AUTH_FILE` is updated on disk, it will automatically get reloaded within 60 seconds. ## Protocol Messages going over the wire are in the form: -``` +```none metric value timestamp public_key base64_signature ``` -### Example: +### Example -``` +```none metric = foo value = 1234 timestamp = 1425059762 @@ -85,60 +71,48 @@ secret_key = my_secret_key The message for which we will generate the signature becomes -``` +```none foo 1234 1425059762 my_public_key ``` -We can generate a signature with some perl code: +We can generate a signature: -``` -#!/usr/bin/perl - -use strict; -use warnings; -use Digest::SHA qw(hmac_sha256_base64); - -my $digest = hmac_sha256_base64('foo 1234 1425059762 my_public_key', 'my_secret_key'); - -# Fix padding of Base64 digests -while (length($digest) % 4) { - $digest .= '='; -} - -print $digest; +```shell +KEY_PRIVATE=my_secret_key amproxy client signature "foo 1234 1425059762 my_public_key" ``` Which outputs the following: -``` + +```none lT9zOeBVNfTdogqKE5J7p3XWprfu/gOI5D7aWRzjJtc= ``` The message going over the wire becomes: -``` +```none foo 1234 1425059762 my_public_key lT9zOeBVNfTdogqKE5J7p3XWprfu/gOI5D7aWRzjJtc= ``` -## Building/Testing +## Testing -To build in the vagrant environment, do the following: +To start a graphite/carbon stack, run: -``` -cd /vagrant/src/amproxy/amproxy -go install +```shell +docker-compose up ``` -This will generate the `/vagrant/bin/amproxy` binary. You can then run the binary: +Then access the graphite webui at [localhost:8080](http://localhost:8080). Username and password are both `root`. -``` -AUTH=public_key1:private_key1 /vagrant/bin/amproxy +Start the server in another terminal: + +```shell +AUTH_FILE=packaging/redhat/auth_file.yaml go run main.go server ``` -And ship your signed metrics to localhost:2005 +Start the test client in another terminal: -## Ideas +``` +KEY_PUBLIC=my_public_key KEY_PRIVATE=my_secret_key go run main.go client test-client +``` -This was just a proof of concept. Ideas for the future would be some sort of -pluggable backend to fetch the public/private keypairs from. As I'm still -prototyping, I didn't want to build out a complicated system that tied into -MySQL, Redis, Memcached, or some other backend API. +The test-client will send a metric named `metric1` every 60 seconds with a random value between 30 and 100. diff --git a/cmd/amproxy/main.go b/cmd/amproxy/main.go deleted file mode 100644 index 37c7217..0000000 --- a/cmd/amproxy/main.go +++ /dev/null @@ -1,64 +0,0 @@ -package main - -import ( - "flag" - "os" - "os/signal" - "syscall" - "time" - - "github.com/go-kit/kit/log" - "github.com/jasonhancock/amproxy/server" -) - -func main() { - var ( - logger log.Logger - - addr = flag.String("addr", ":2005", "interface/port to bind to") - carbonAddr = flag.String("carbon-addr", "127.0.0.1:2003", "Carbon address:port") - authFile = flag.String("auth-file", "/etc/amproxy/auth_file.yaml", "Location of auth file") - skew = flag.Float64("skew", 300, "amount of clock skew tolerated in seconds") - ) - flag.Parse() - - logger = log.NewLogfmtLogger(log.NewSyncWriter(os.Stderr)) - logger = log.With(logger, "ts", log.DefaultTimestampUTC) - - if *authFile == "" { - logger.Log("msg", "missing_auth_file") - os.Exit(1) - } - - ap, err := server.NewAuthProviderStaticFile(log.With(logger, "component", "auth_provider"), *authFile, 1*time.Minute) - if err != nil { - logger.Log("msg", "constructing_auth_provider_error", "error", err) - os.Exit(1) - } - - mw, err := server.NewMetricWriterCarbon(log.With(logger, "component", "metric_writer"), *carbonAddr, 0, 30) - if err != nil { - logger.Log("msg", "constructing_metric_writer_error", "error", err) - os.Exit(1) - } - - s, err := server.NewServer(log.With(logger, "component", "server"), *addr, *skew, ap, mw) - if err != nil { - logger.Log("msg", "constructing_server_error", "error", err) - os.Exit(1) - } - - // subscribe to signals to shut down the server - stopChan := make(chan os.Signal) - signal.Notify(stopChan, os.Interrupt) - signal.Notify(stopChan, os.Kill) - signal.Notify(stopChan, syscall.SIGTERM) - - err = s.Run() - if err != nil { - logger.Log("msg", "running_server_error", "error", err) - os.Exit(1) - } - <-stopChan // wait for signals - s.Stop() -} diff --git a/cmd/client/cmd.go b/cmd/client/cmd.go new file mode 100644 index 0000000..05d5485 --- /dev/null +++ b/cmd/client/cmd.go @@ -0,0 +1,20 @@ +package client + +import ( + "github.com/spf13/cobra" +) + +// NewCmd initializes a new command and sub-commands. +func NewCmd() *cobra.Command { + cmd := &cobra.Command{ + Use: "client", + Short: "Client operations.", + } + + cmd.AddCommand( + cmdSignature(), + cmdTestClient(), + ) + + return cmd +} diff --git a/cmd/client/signature.go b/cmd/client/signature.go new file mode 100644 index 0000000..0c64b8f --- /dev/null +++ b/cmd/client/signature.go @@ -0,0 +1,47 @@ +package client + +import ( + "errors" + "fmt" + "os" + + "github.com/jasonhancock/amproxy/pkg/amproxy" + "github.com/jasonhancock/go-helpers" + "github.com/spf13/cobra" +) + +const envKeyPrivate = "KEY_PRIVATE" + +func cmdSignature() *cobra.Command { + var privateKey string + + cmd := &cobra.Command{ + Use: "signature ", + Short: "Generates a signature for the provided input message", + SilenceUsage: true, + Args: cobra.ExactArgs(1), + RunE: func(cmd *cobra.Command, args []string) error { + if privateKey == "" { + return errors.New("private key not specified") + } + + msg, err := amproxy.Parse(args[0]) + if err != nil { + return err + } + + fmt.Println(msg.ComputeSignature(privateKey)) + + return nil + }, + } + + cmd.Flags().StringVar( + &privateKey, + "key-private", + os.Getenv(envKeyPrivate), + helpers.EnvDesc("The API private key.", envKeyPrivate), + ) + + return cmd +} diff --git a/cmd/client/testclient.go b/cmd/client/testclient.go new file mode 100644 index 0000000..0ca4cc3 --- /dev/null +++ b/cmd/client/testclient.go @@ -0,0 +1,119 @@ +package client + +import ( + "errors" + "fmt" + "log" + "math/rand" + "os" + "time" + + "github.com/jasonhancock/amproxy/pkg/amproxy" + "github.com/jasonhancock/amproxy/pkg/client" + "github.com/jasonhancock/go-env" + "github.com/jasonhancock/go-helpers" + "github.com/spf13/cobra" +) + +const envKeyPublic = "KEY_PUBLIC" + +func cmdTestClient() *cobra.Command { + var ( + privateKey string + publicKey string + metricName string + interval time.Duration + serverAddr string + ) + + cmd := &cobra.Command{ + Use: "test-client ", + Short: "sends messages for a metric for testing", + SilenceUsage: true, + Args: cobra.NoArgs, + RunE: func(cmd *cobra.Command, args []string) error { + if privateKey == "" { + return errors.New("private key not specified") + } + + if publicKey == "" { + return errors.New("public key not specified") + } + + ticker := time.NewTimer(0) + min := 30 + max := 100 + + cl := client.NewClient(publicKey, privateKey, serverAddr) + + for { + select { + case <-cmd.Context().Done(): + ticker.Stop() + return nil + case <-ticker.C: + log.Println("sending") + m := amproxy.Message{ + Name: metricName, + Value: fmt.Sprintf("%d", rand.Intn(max-min)+min), + Timestamp: time.Now(), + } + + if err := cl.Connect(); err != nil { + return err + } + + if err := cl.Write(m); err != nil { + return err + } + + if err := cl.Disconnect(); err != nil { + return err + } + + ticker.Reset(interval) + } + } + }, + } + + cmd.Flags().StringVar( + &privateKey, + "key-private", + os.Getenv(envKeyPrivate), + helpers.EnvDesc("The API private key.", envKeyPrivate), + ) + + cmd.Flags().StringVar( + &publicKey, + "key-public", + os.Getenv(envKeyPublic), + helpers.EnvDesc("The API public key.", envKeyPublic), + ) + + const envMetricName = "METRIC_NAME" + cmd.Flags().StringVar( + &metricName, + "metric", + env.String(envMetricName, "metric1"), + helpers.EnvDesc("The name of the metric to send.", envMetricName), + ) + + const envServerAddr = "SERVER_ADDR" + cmd.Flags().StringVar( + &serverAddr, + "server-addr", + env.String(envServerAddr, "127.0.0.1:2005"), + helpers.EnvDesc("The server address to send to.", envServerAddr), + ) + + const envInterval = "INTERVAL" + cmd.Flags().DurationVar( + &interval, + "interval", + env.Duration(envInterval, 1*time.Minute), + helpers.EnvDesc("How often to send a message", envInterval), + ) + + return cmd +} diff --git a/cmd/root.go b/cmd/root.go new file mode 100644 index 0000000..47b962e --- /dev/null +++ b/cmd/root.go @@ -0,0 +1,36 @@ +package cmd + +import ( + "context" + "os" + "sync" + + "github.com/jasonhancock/amproxy/cmd/client" + "github.com/jasonhancock/amproxy/cmd/server" + version "github.com/jasonhancock/cobra-version" + "github.com/spf13/cobra" +) + +// Execute adds all child commands to the root command and sets flags appropriately. +// This is called by main.main(). It only needs to happen once to the rootCmd. +func Execute(ctx context.Context, wg *sync.WaitGroup, info version.Info) { + rootCmd := newRootCmd(wg, info) + if err := rootCmd.ExecuteContext(ctx); err != nil { + os.Exit(1) + } +} + +func newRootCmd(wg *sync.WaitGroup, info version.Info) *cobra.Command { + cmd := &cobra.Command{ + Use: "amproxy", + Short: "Authenticating Metrics Proxy", + } + + cmd.AddCommand( + client.NewCmd(), + server.NewCmd(wg), + version.NewCmd(info), + ) + + return cmd +} diff --git a/cmd/server/cmd.go b/cmd/server/cmd.go new file mode 100644 index 0000000..48314ab --- /dev/null +++ b/cmd/server/cmd.go @@ -0,0 +1,96 @@ +package server + +import ( + "os" + "sync" + "time" + + "github.com/jasonhancock/amproxy/pkg/lineserver" + "github.com/jasonhancock/amproxy/server" + clog "github.com/jasonhancock/cobra-logger" + "github.com/jasonhancock/go-env" + "github.com/jasonhancock/go-helpers" + "github.com/spf13/cobra" +) + +const defaultAuthFile = "/etc/amproxy/auth_file.yaml" + +func NewCmd(wg *sync.WaitGroup) *cobra.Command { + var ( + addr string + carbonAddr string + authFile string + skew time.Duration + logConf *clog.Config + ) + + cmd := &cobra.Command{ + Use: "server", + Short: "Starts the server", + SilenceUsage: true, + Args: cobra.NoArgs, + RunE: func(cmd *cobra.Command, args []string) error { + l := logConf.Logger(os.Stdout) + + if authFile == "" { + l.Fatal("missing auth file") + } + + ap, err := server.NewAuthProviderStaticFile(cmd.Context(), l.New("auth_provider"), authFile, 1*time.Minute) + if err != nil { + l.Fatal("constructing_auth_provider_error", "error", err) + } + + mw, err := server.NewMetricWriterCarbon(l.New("metric_writer"), carbonAddr, 0, 30) + if err != nil { + l.Fatal("constructing_metric_writer_error", "error", err) + } + + s := server.NewServer(l.New("server"), skew, ap, mw) + + _, err = lineserver.New(cmd.Context(), l.New("line_protocol_server"), wg, addr, s.ProcessLine) + if err != nil { + l.Fatal("starting line protocol server error", "error", err) + } + + wg.Wait() + return nil + }, + } + + logConf = clog.NewConfig(cmd) + + const envAddr = "ADDR" + cmd.Flags().StringVar( + &addr, + "addr", + env.String(envAddr, "127.0.0.1:2005"), + helpers.EnvDesc("The interface and port to bind the server to.", envAddr), + ) + + const envCarbonAddr = "CARBON_ADDR" + cmd.Flags().StringVar( + &carbonAddr, + "carbon-addr", + env.String(envCarbonAddr, "127.0.0.1:2003"), + helpers.EnvDesc("The address of the carbon server.", envCarbonAddr), + ) + + const envAuthFile = "AUTH_FILE" + cmd.Flags().StringVar( + &authFile, + "auth-file", + env.String(envAuthFile, defaultAuthFile), + helpers.EnvDesc("The path to the auth file.", envAuthFile), + ) + + const envSkew = "MAX_SKEW" + cmd.Flags().DurationVar( + &skew, + "skew", + env.Duration(envSkew, 300*time.Second), + helpers.EnvDesc("The amount of clock skew tolerated.", envSkew), + ) + + return cmd +} diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..1e95706 --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,10 @@ +version: "3.9" +services: + graphite: + image: graphiteapp/graphite-statsd:1.1.10-5 + ports: + - "8080:80" + - "2003-2004:2003-2004" + - "2023-2024:2023-2024" + - "8125:8125/udp" + - "8126:8126" diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..24e005a --- /dev/null +++ b/go.mod @@ -0,0 +1,28 @@ +module github.com/jasonhancock/amproxy + +go 1.21 + +require ( + github.com/jasonhancock/cobra-logger v0.0.3 + github.com/jasonhancock/cobra-version v0.0.4 + github.com/jasonhancock/go-env v0.0.3 + github.com/jasonhancock/go-helpers v0.0.2 + github.com/jasonhancock/go-logger v0.0.3 + github.com/jasonhancock/go-testhelpers/generic v0.0.2 + github.com/spf13/cobra v1.7.0 + github.com/stretchr/testify v1.8.4 + gopkg.in/fatih/pool.v2 v2.0.0 + gopkg.in/yaml.v2 v2.3.0 +) + +require ( + github.com/davecgh/go-spew v1.1.1 // indirect + github.com/go-stack/stack v1.8.1 // indirect + github.com/hashicorp/errwrap v1.0.0 // indirect + github.com/hashicorp/go-multierror v1.1.1 // indirect + github.com/inconshreveable/mousetrap v1.1.0 // indirect + github.com/kr/text v0.1.0 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect + github.com/spf13/pflag v1.0.5 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..bf52c11 --- /dev/null +++ b/go.sum @@ -0,0 +1,46 @@ +github.com/cpuguy83/go-md2man/v2 v2.0.2/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/go-stack/stack v1.8.1 h1:ntEHSVwIt7PNXNpgPmVfMrNhLtgjlmnZha2kOpuRiDw= +github.com/go-stack/stack v1.8.1/go.mod h1:dcoOX6HbPZSZptuspn9bctJ+N/CnF5gGygcUP3XYfe4= +github.com/hashicorp/errwrap v1.0.0 h1:hLrqtEDnRye3+sgx6z4qVLNuviH3MR5aQ0ykNJa/UYA= +github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= +github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo= +github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM= +github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8= +github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw= +github.com/jasonhancock/cobra-logger v0.0.3 h1:SJrGf4TeAh5EbSoqZa2FzirOCk5uaWMr6S6M4sUM+mI= +github.com/jasonhancock/cobra-logger v0.0.3/go.mod h1:QuhiBvGrRFwUspKP08/0QVEiUgmh9IWxWuk2lSBB4Ms= +github.com/jasonhancock/cobra-version v0.0.4 h1:oxTyL3Gw/8gRCMl2iUmWybAuM2BgECSjpJwzaeu6Wbg= +github.com/jasonhancock/cobra-version v0.0.4/go.mod h1:2u5UC6RXYhob2XmDLn8jkJdk4s+0xkUUwpi0ERUdu2A= +github.com/jasonhancock/go-env v0.0.3 h1:Fimz0wUogkc/FstqevOAY+RdUiRQDeEzoexy/edlqbo= +github.com/jasonhancock/go-env v0.0.3/go.mod h1:5tS38RxYzmesLfb0J7tUbOHP4KYSmXxn0vA0rPsYmYg= +github.com/jasonhancock/go-helpers v0.0.2 h1:P397tFl/Z/3VH3lLs1pc4c/NmaCzSWXUaZK5aH5DR2c= +github.com/jasonhancock/go-helpers v0.0.2/go.mod h1:nVI/lbNsZwlTV4zfFMXKxdE4pfw7fJ9/E5LxugkX7tc= +github.com/jasonhancock/go-logger v0.0.3 h1:QNlbcP8Q6ltKbaqYnHdPn40I9YZRHY6LOOEWV1nttDY= +github.com/jasonhancock/go-logger v0.0.3/go.mod h1:0/7gOHEGqqVGABLmJk8HkGGYJqU4Lx1s33+sVG794RA= +github.com/jasonhancock/go-testhelpers/generic v0.0.2 h1:AsIFoiRK4MfDv7/Ysj5AKiCc3q/rDK29iTwu5rZgwOo= +github.com/jasonhancock/go-testhelpers/generic v0.0.2/go.mod h1:90kbMynYIF0ow32UPi12eNaqFPF2UP7CamexRxB9YA4= +github.com/kr/pretty v0.2.0 h1:s5hAObm+yFO5uHYt5dYjxi2rXrsnmRpJx4OYvIWUaQs= +github.com/kr/pretty v0.2.0/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= +github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= +github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= +github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= +github.com/spf13/cobra v1.7.0 h1:hyqWnYt1ZQShIddO5kBpj3vu05/++x6tJ6dg8EC572I= +github.com/spf13/cobra v1.7.0/go.mod h1:uLxZILRyS/50WlhOIKD7W6V5bgeIt+4sICxh6uRMrb0= +github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA= +github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= +github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= +github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo= +gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/fatih/pool.v2 v2.0.0 h1:xIFeWtxifuQJGk/IEPKsTduEKcKvPmhoiVDGpC40nKg= +gopkg.in/fatih/pool.v2 v2.0.0/go.mod h1:8xVGeu1/2jr2wm5V9SPuMht2H5AEmf5aFMGSQixtjTY= +gopkg.in/yaml.v2 v2.3.0 h1:clyUAQHOM3G0M3f5vQj7LuJrETvjVot3Z5el9nffUtU= +gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/main.go b/main.go new file mode 100644 index 0000000..bfd502d --- /dev/null +++ b/main.go @@ -0,0 +1,43 @@ +package main + +import ( + "context" + "os" + "os/signal" + "runtime" + "sync" + "syscall" + + "github.com/jasonhancock/amproxy/cmd" + ver "github.com/jasonhancock/cobra-version" +) + +// These variables are populated by goreleaser when the binary is built. +var ( + version = "dev" + commit = "none" + date = "unknown" +) + +func main() { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + done := make(chan os.Signal, 1) + signal.Notify(done, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) + + var wg sync.WaitGroup + + go func() { + defer close(done) + cmd.Execute(ctx, &wg, ver.Info{ + Version: version, + Commit: commit, + Date: date, + Go: runtime.Version(), + }) + }() + + <-done + cancel() + wg.Wait() +} diff --git a/message.go b/message.go deleted file mode 100644 index 5205eb9..0000000 --- a/message.go +++ /dev/null @@ -1,66 +0,0 @@ -package amproxy - -import ( - "crypto/hmac" - "crypto/sha256" - "encoding/base64" - "errors" - "fmt" - "strconv" - "strings" -) - -// ErrInvalidNumMessageComponents is the error returned when parsing a message that has an invaid number of items -var ErrInvalidNumMessageComponents = errors.New("Invalid number of components in message") - -// Message represents an amproxy metric -type Message struct { - Name string - Value string - Timestamp int - PublicKey string - Signature string -} - -// String returns the wire format of the message. The signature must have already been computed -func (m Message) String() string { - return fmt.Sprintf("%s %s %d %s %s", m.Name, m.Value, m.Timestamp, m.PublicKey, m.Signature) -} - -// Parse attempts to parse the given string into a Message -func Parse(str string) (*Message, error) { - m := &Message{} - pieces := strings.Split(strings.TrimSpace(str), " ") - - if len(pieces) != 5 { - return nil, ErrInvalidNumMessageComponents - } - - m.Name = pieces[0] - m.Value = pieces[1] - - timestamp, err := strconv.Atoi(pieces[2]) - if err != nil { - return nil, err - } - m.Timestamp = timestamp - - m.PublicKey = pieces[3] - m.Signature = pieces[4] - - return m, nil -} - -// ComputeSignature uses the private key to compute the HMAC SHA-256 signature for the message -func (m Message) ComputeSignature(secret string) string { - message := fmt.Sprintf("%s %s %d %s", m.Name, m.Value, m.Timestamp, m.PublicKey) - key := []byte(secret) - h := hmac.New(sha256.New, key) - h.Write([]byte(message)) - return base64.StdEncoding.EncodeToString(h.Sum(nil)) -} - -// MetricStr returns the carbon wire format of the message -func (m Message) MetricStr() string { - return fmt.Sprintf("%s %s %d", m.Name, m.Value, m.Timestamp) -} diff --git a/message_test.go b/message_test.go deleted file mode 100644 index e747c5e..0000000 --- a/message_test.go +++ /dev/null @@ -1,39 +0,0 @@ -package amproxy - -import ( - "testing" - - "github.com/cheekybits/is" -) - -func TestParse(t *testing.T) { - is := is.New(t) - - m, err := Parse("foo 1234 1425059762 my_public_key") - is.Err(err) - is.Equal(err, ErrInvalidNumMessageComponents) - - m, err = Parse("foo 1234 1425059762 my_public_key lT9zOeBVNfTdogqKE5J7p3XWprfu/gOI5D7aWRzjJtc=") - is.NoErr(err) - is.Equal(m.Name, "foo") - is.Equal(m.Value, "1234") - is.Equal(m.Timestamp, 1425059762) - is.Equal(m.PublicKey, "my_public_key") - is.Equal(m.Signature, "lT9zOeBVNfTdogqKE5J7p3XWprfu/gOI5D7aWRzjJtc=") -} - -func TestComputeSignature(t *testing.T) { - is := is.New(t) - - m, err := Parse("foo 1234 1425059762 my_public_key lT9zOeBVNfTdogqKE5J7p3XWprfu/gOI5D7aWRzjJtc=") - is.NoErr(err) - is.Equal(m.ComputeSignature("my_secret_key"), "lT9zOeBVNfTdogqKE5J7p3XWprfu/gOI5D7aWRzjJtc=") -} - -func TestMetricStr(t *testing.T) { - is := is.New(t) - - m, err := Parse("foo 1234 1425059762 my_public_key lT9zOeBVNfTdogqKE5J7p3XWprfu/gOI5D7aWRzjJtc=") - is.NoErr(err) - is.Equal(m.MetricStr(), "foo 1234 1425059762") -} diff --git a/packaging/redhat/auth_file.yaml b/packaging/redhat/auth_file.yaml index a4f92e2..f12dbcf 100644 --- a/packaging/redhat/auth_file.yaml +++ b/packaging/redhat/auth_file.yaml @@ -3,5 +3,5 @@ apikeys: my_public_key: secret_key: my_secret_key metrics: - metric1: 1 - metric2: 1 + - metric1 + - metric2 diff --git a/pkg/amproxy/message.go b/pkg/amproxy/message.go new file mode 100644 index 0000000..e6f4a10 --- /dev/null +++ b/pkg/amproxy/message.go @@ -0,0 +1,83 @@ +package amproxy + +import ( + "crypto/hmac" + "crypto/sha256" + "encoding/base64" + "errors" + "fmt" + "strconv" + "strings" + "time" +) + +// ErrInvalidNumMessageComponents is the error returned when parsing a message +// that has an invaid number of items. +var ErrInvalidNumMessageComponents = errors.New("invalid number of components in message") + +// Message represents an amproxy metric +type Message struct { + Name string + Value string + Timestamp time.Time + PublicKey string + Signature string +} + +// String returns the wire format of the message. The signature must have already been computed. +func (m Message) String() string { + return fmt.Sprintf("%s %s %d %s %s", m.Name, m.Value, m.Timestamp.Unix(), m.PublicKey, m.Signature) +} + +// Parse attempts to parse the given unsigned string into a Message. +func Parse(str string) (*Message, error) { + return parse(str, false) +} + +// ParseSigned attempts to parse the given signed string into a Message. +func ParseSigned(str string) (*Message, error) { + return parse(str, true) +} + +func parse(str string, signed bool) (*Message, error) { + pieces := strings.Split(strings.TrimSpace(str), " ") + expected := 5 + if !signed { + expected = 4 + } + if len(pieces) != expected { + return nil, ErrInvalidNumMessageComponents + } + + m := Message{ + Name: pieces[0], + Value: pieces[1], + PublicKey: pieces[3], + } + + if signed { + m.Signature = pieces[4] + } + + unixTime, err := strconv.ParseInt(pieces[2], 10, 64) + if err != nil { + return nil, err + } + + m.Timestamp = time.Unix(unixTime, 0) + + return &m, nil +} + +// ComputeSignature uses the private key to compute the HMAC SHA-256 signature for the message. +func (m Message) ComputeSignature(secret string) string { + message := fmt.Sprintf("%s %s %d %s", m.Name, m.Value, m.Timestamp.Unix(), m.PublicKey) + h := hmac.New(sha256.New, []byte(secret)) + h.Write([]byte(message)) + return base64.StdEncoding.EncodeToString(h.Sum(nil)) +} + +// MetricStr returns the carbon wire format of the message. +func (m Message) MetricStr() string { + return fmt.Sprintf("%s %s %d", m.Name, m.Value, m.Timestamp.Unix()) +} diff --git a/pkg/amproxy/message_test.go b/pkg/amproxy/message_test.go new file mode 100644 index 0000000..172ed4c --- /dev/null +++ b/pkg/amproxy/message_test.go @@ -0,0 +1,48 @@ +package amproxy + +import ( + "strconv" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func TestParseSigned(t *testing.T) { + + t.Run("invalid number of components", func(t *testing.T) { + _, err := ParseSigned("foo 1234 1425059762 my_public_key") + require.Equal(t, ErrInvalidNumMessageComponents, err) + }) + + t.Run("timestamp not a number", func(t *testing.T) { + _, err := ParseSigned("foo 1234 abc1425059762 my_public_key lT9zOeBVNfTdogqKE5J7p3XWprfu/gOI5D7aWRzjJtc=") + require.Error(t, err) + _, ok := err.(*strconv.NumError) + require.True(t, ok) + }) + + t.Run("normal", func(t *testing.T) { + m, err := ParseSigned("foo 1234 1425059762 my_public_key lT9zOeBVNfTdogqKE5J7p3XWprfu/gOI5D7aWRzjJtc=") + require.NoError(t, err) + require.Equal(t, "foo", m.Name) + require.Equal(t, "1234", m.Value) + require.Equal(t, time.Unix(1425059762, 0), m.Timestamp) + require.Equal(t, "my_public_key", m.PublicKey) + require.Equal(t, "lT9zOeBVNfTdogqKE5J7p3XWprfu/gOI5D7aWRzjJtc=", m.Signature) + }) +} + +func TestComputeSignature(t *testing.T) { + m, err := ParseSigned("foo 1234 1425059762 my_public_key lT9zOeBVNfTdogqKE5J7p3XWprfu/gOI5D7aWRzjJtc=") + require.NoError(t, err) + require.Equal(t, "lT9zOeBVNfTdogqKE5J7p3XWprfu/gOI5D7aWRzjJtc=", m.ComputeSignature("my_secret_key")) +} + +func TestStrings(t *testing.T) { + m, err := ParseSigned("foo 1234 1425059762 my_public_key lT9zOeBVNfTdogqKE5J7p3XWprfu/gOI5D7aWRzjJtc=") + require.NoError(t, err) + + require.Equal(t, "foo 1234 1425059762", m.MetricStr()) + require.Equal(t, "foo 1234 1425059762 my_public_key lT9zOeBVNfTdogqKE5J7p3XWprfu/gOI5D7aWRzjJtc=", m.String()) +} diff --git a/client.go b/pkg/client/client.go similarity index 80% rename from client.go rename to pkg/client/client.go index a37231a..7572042 100644 --- a/client.go +++ b/pkg/client/client.go @@ -1,14 +1,16 @@ -package amproxy +package client import ( + "errors" + "fmt" "net" - "github.com/pkg/errors" + "github.com/jasonhancock/amproxy/pkg/amproxy" ) // ErrorNotConnected is the error returned when Write is called but the client // isn't connected to the remote server yet -var ErrorNotConnected = errors.New("You must call Connect() before attempting to write metrics") +var ErrorNotConnected = errors.New("you must call Connect() before attempting to write metrics") // Client is a client that will sign metrics and send to an amproxy server type Client struct { @@ -28,7 +30,7 @@ func NewClient(apiKey, apiSecret, serverAddr string) *Client { } // Write computes the signature for the message and ships it over the wire -func (c *Client) Write(m Message) error { +func (c *Client) Write(m amproxy.Message) error { if c.conn == nil { return ErrorNotConnected } @@ -44,7 +46,7 @@ func (c *Client) Connect() error { if c.conn == nil { conn, err := net.Dial("tcp", c.addr) if err != nil { - return errors.Wrap(err, "dialing "+c.addr) + return fmt.Errorf("dialing %q: %w", c.addr, err) } c.conn = conn } @@ -57,7 +59,7 @@ func (c *Client) Disconnect() error { err := c.conn.Close() c.conn = nil if err != nil { - return errors.Wrap(err, "closing connection") + return fmt.Errorf("closing connection: %w", err) } } return nil diff --git a/pkg/client/client_test.go b/pkg/client/client_test.go new file mode 100644 index 0000000..b979c7d --- /dev/null +++ b/pkg/client/client_test.go @@ -0,0 +1,68 @@ +package client + +import ( + "context" + "sync" + "testing" + "time" + + "github.com/jasonhancock/amproxy/pkg/amproxy" + "github.com/jasonhancock/amproxy/pkg/lineserver" + "github.com/jasonhancock/go-logger" + helpers "github.com/jasonhancock/go-testhelpers/generic" + "github.com/stretchr/testify/require" +) + +func TestClient(t *testing.T) { + l := logger.Silence() + + t.Run("normal", func(t *testing.T) { + port := helpers.NewRandomPort(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + var messages []string + var wg sync.WaitGroup + _, err := lineserver.New(ctx, l, &wg, port, func(str string) { + messages = append(messages, str) + }) + require.NoError(t, err) + + c := NewClient("my_pub_key", "my_priv_key", port) + require.NoError(t, c.Connect()) + + require.NoError(t, c.Write(amproxy.Message{Name: "name1", Value: "value1", Timestamp: time.Unix(1234, 0)})) + require.NoError(t, c.Write(amproxy.Message{Name: "name2", Value: "value2", Timestamp: time.Unix(12345, 0)})) + require.NoError(t, c.Write(amproxy.Message{Name: "name3", Value: "value3", Timestamp: time.Unix(123456, 0)})) + require.NoError(t, c.Disconnect()) + time.Sleep(300 * time.Millisecond) + + require.Len(t, messages, 3) + require.Equal(t, "name1 value1 1234 my_pub_key ANX1Szr2bbcU04m/ZgAQKB3/OZ26pIZeIM2D+NfOGUY=", messages[0]) + require.Equal(t, "name2 value2 12345 my_pub_key J7AEAsGwrGr4SZdZzLnN48GSIgOpDj7IJ8rqKbu4vsU=", messages[1]) + require.Equal(t, "name3 value3 123456 my_pub_key wsLfIF/tpOWaJ6/JPjewLr17wG53LKN368QJagTfDlU=", messages[2]) + }) + + // This test seems a bit flaky....figure out the problem. + t.Run("write error", func(t *testing.T) { + t.Skip("flaky test...need to debug") + port := helpers.NewRandomPort(t) + var wg sync.WaitGroup + ctx, cancel := context.WithCancel(context.Background()) + _, err := lineserver.New(ctx, l, &wg, port, func(str string) {}) + require.NoError(t, err) + + c := NewClient("my_pub_key", "my_priv_key", port) + require.NoError(t, c.Connect()) + cancel() + wg.Wait() + + require.Error(t, c.Write(amproxy.Message{Name: "name1", Value: "value1", Timestamp: time.Unix(1234, 0)})) + }) + + t.Run("not connected error", func(t *testing.T) { + port := helpers.NewRandomPort(t) + c := NewClient("my_pub_key", "my_priv_key", port) + + require.Equal(t, ErrorNotConnected, c.Write(amproxy.Message{Name: "name1", Value: "value1", Timestamp: time.Unix(1234, 0)})) + }) +} diff --git a/pkg/lineserver/server.go b/pkg/lineserver/server.go new file mode 100644 index 0000000..6643722 --- /dev/null +++ b/pkg/lineserver/server.go @@ -0,0 +1,102 @@ +package lineserver + +import ( + "bufio" + "context" + "fmt" + "io" + "net" + "strings" + "sync" + + "github.com/jasonhancock/go-logger" +) + +// LineProtocolServer is a generic server that takes input one line at a time. +type LineProtocolServer struct { + logger *logger.L + listener net.Listener + processFn func(string) +} + +// New creates a new LineProtocolServer +func New(ctx context.Context, l *logger.L, wg *sync.WaitGroup, addr string, processFn func(string)) (*LineProtocolServer, error) { + s := &LineProtocolServer{ + logger: l, + processFn: processFn, + } + + var err error + s.listener, err = net.Listen("tcp", addr) + if err != nil { + return nil, fmt.Errorf("creating listener on %q: %w", addr, err) + } + s.logger.Info("listening", "addr", addr) + + go func() { + <-ctx.Done() + s.logger.Info("closing listener") + s.listener.Close() + }() + + wg.Add(1) + go func() { + defer wg.Done() + s.acceptConns(ctx) + }() + + return s, nil +} + +func (s *LineProtocolServer) acceptConns(ctx context.Context) error { + for { + conn, err := s.listener.Accept() + if err != nil { + select { + case <-ctx.Done(): + return nil + default: + s.logger.LogError("error_accepting_connection", err) + } + + continue + } + // Handle connections in a new goroutine. + go s.handleRequest(ctx, conn) + } +} + +func (s *LineProtocolServer) handleRequest(ctx context.Context, conn net.Conn) { + defer conn.Close() + s.logger.Info( + "new_connection", + "remote_ip", conn.RemoteAddr(), + ) + + r := bufio.NewReader(conn) + for { + select { + case <-ctx.Done(): + return + default: + } + + line, err := r.ReadString('\n') + if err != nil { + if err == io.EOF { + return + } + s.logger.LogError("error_reading_conn", err) + return + } + line = strings.TrimSpace(line) + + if len(line) == 0 { + continue + } + + if s.processFn != nil { + s.processFn(line) + } + } +} diff --git a/pkg/lineserver/server_test.go b/pkg/lineserver/server_test.go new file mode 100644 index 0000000..358d936 --- /dev/null +++ b/pkg/lineserver/server_test.go @@ -0,0 +1,44 @@ +package lineserver + +import ( + "context" + "fmt" + "net" + "sync" + "testing" + "time" + + "github.com/jasonhancock/go-logger" + "github.com/stretchr/testify/require" +) + +func TestLineProtocolServer(t *testing.T) { + var lines []string + fn := func(line string) { + lines = append(lines, line) + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + var wg sync.WaitGroup + + _, err := New(ctx, logger.Silence(), &wg, "localhost:8095", fn) + require.NoError(t, err) + time.Sleep(1 * time.Second) + + conn, err := net.Dial("tcp", "localhost:8095") + require.NoError(t, err) + fmt.Fprintf(conn, "this is line 1\n") + fmt.Fprintf(conn, "this is line 2\n") + fmt.Fprintf(conn, "\n") + conn.Close() + + time.Sleep(200 * time.Millisecond) + cancel() + wg.Wait() + + require.Len(t, lines, 2) + require.Equal(t, "this is line 1", lines[0]) + require.Equal(t, "this is line 2", lines[1]) +} diff --git a/server/auth.go b/server/auth.go index 36d9730..c2c1d60 100644 --- a/server/auth.go +++ b/server/auth.go @@ -1,8 +1,6 @@ package server -import ( - "github.com/pkg/errors" -) +import "errors" // ErrCredentialsNotFound is the error returned by an AuthProvider if credentials // corresponding to a given AccessKey cannot be located @@ -14,18 +12,13 @@ type AuthProvider interface { // If no corresponding credentials can be found, an ErrCredentialsNotFound will // be returned. CredsForKey(string) (*Creds, error) - - // Run will be called in a goroutine to allow the AuthProvider to perform - // background tasks. When he channel is closed the provider is expected to - // return - Run(done <-chan struct{}) } // Creds represents an api key set and the metrics they are allowed to access type Creds struct { - AccessKey string `yaml:"access_key"` - SecretKey string `yaml:"secret_key"` - Metrics map[string]uint8 `yaml:"metrics"` + AccessKey string `yaml:"access_key"` + SecretKey string `yaml:"secret_key"` + Metrics map[string]struct{} `yaml:"metrics"` } // AllowMetric returns true if a given metric is allowed for this set of credentials @@ -44,5 +37,3 @@ func (m *mockAuthProvider) CredsForKey(key string) (*Creds, error) { } panic("not implemented") } - -func (m *mockAuthProvider) Run(done <-chan struct{}) {} diff --git a/server/auth_provider_static.go b/server/auth_provider_static.go index 0ff2bc5..6d44463 100644 --- a/server/auth_provider_static.go +++ b/server/auth_provider_static.go @@ -1,13 +1,13 @@ package server import ( - "io/ioutil" + "context" + "fmt" "os" "sync" "time" - "github.com/go-kit/kit/log" - "github.com/pkg/errors" + "github.com/jasonhancock/go-logger" "gopkg.in/yaml.v2" ) @@ -20,13 +20,13 @@ type AuthProviderStaticFile struct { credentials map[string]Creds modTime time.Time lock sync.RWMutex - logger log.Logger + logger *logger.L } // NewAuthProviderStaticFile creates a new static file auth provider. filename is // the path to the file on disk. pollInterval is how frequently to check the file // for modification time updates. -func NewAuthProviderStaticFile(l log.Logger, filename string, pollInterval time.Duration) (*AuthProviderStaticFile, error) { +func NewAuthProviderStaticFile(ctx context.Context, l *logger.L, filename string, pollInterval time.Duration) (*AuthProviderStaticFile, error) { a := &AuthProviderStaticFile{ File: filename, credentials: make(map[string]Creds), @@ -34,27 +34,28 @@ func NewAuthProviderStaticFile(l log.Logger, filename string, pollInterval time. logger: l, } - err := a.loadFile() - if err != nil { - return nil, errors.Wrap(err, "loading auth file") + if err := a.loadFile(); err != nil { + return nil, fmt.Errorf("loading auth file: %w", err) } + go a.run(ctx) + return a, nil } -// Run starts the file watcher. To stop it, close the done channel -func (a *AuthProviderStaticFile) Run(done <-chan struct{}) { - a.logger.Log("msg", "running") +// run starts the file watcher. To stop it, cancel the context. +func (a *AuthProviderStaticFile) run(ctx context.Context) { + a.logger.Info("running") ticker := time.NewTicker(a.pollInterval) for { select { - case <-done: + case <-ctx.Done(): return case <-ticker.C: err := a.checkReload() if err != nil { - a.logger.Log("msg", "check_reload_error", "error", err) + a.logger.LogError("check_reload_error", err) } } } @@ -65,15 +66,14 @@ func (a *AuthProviderStaticFile) Run(done <-chan struct{}) { func (a *AuthProviderStaticFile) checkReload() error { info, err := os.Stat(a.File) if err != nil { - return errors.Wrap(err, "stat'ing auth file") + return fmt.Errorf("stat'ing auth file: %w", err) } ts := info.ModTime() if ts != a.modTime { - a.logger.Log("msg", "auth_file_reload") - err = a.loadFile() - if err != nil { - return errors.Wrap(err, "reloading auth file") + a.logger.Info("initiating auth file reload") + if err := a.loadFile(); err != nil { + return fmt.Errorf("reloading auth file: %w", err) } } return nil @@ -95,28 +95,54 @@ func (a *AuthProviderStaticFile) CredsForKey(accessKey string) (*Creds, error) { // loadFile loads the auth file into memory func (a *AuthProviderStaticFile) loadFile() error { + var config struct { - Apikeys map[string]Creds `yaml:"apikeys"` + Apikeys map[string]fileCreds `yaml:"apikeys"` } - info, err := os.Stat(a.File) + fh, err := os.Open(a.File) if err != nil { - return errors.Wrap(err, "stat'ing file") + return fmt.Errorf("opening %q: %w", a.File, err) } + defer fh.Close() - bytes, err := ioutil.ReadFile(a.File) + info, err := fh.Stat() if err != nil { - return errors.Wrap(err, "reading file") + return fmt.Errorf("stat'ing auth file: %w", err) } - err = yaml.Unmarshal(bytes, &config) - if err != nil { - return errors.Wrap(err, "unmarshaling yaml") + + if err = yaml.NewDecoder(fh).Decode(&config); err != nil { + return fmt.Errorf("unmarshaling yaml: %w", err) + } + + creds := make(map[string]Creds, len(config.Apikeys)) + for k, v := range config.Apikeys { + creds[k] = v.To() } a.lock.Lock() - a.credentials = config.Apikeys + a.credentials = creds a.modTime = info.ModTime() a.lock.Unlock() return nil } + +type fileCreds struct { + AccessKey string `yaml:"access_key"` + SecretKey string `yaml:"secret_key"` + Metrics []string `yaml:"metrics"` +} + +func (c fileCreds) To() Creds { + cr := Creds{ + SecretKey: c.SecretKey, + Metrics: make(map[string]struct{}, len(c.Metrics)), + } + + for _, v := range c.Metrics { + cr.Metrics[v] = struct{}{} + } + + return cr +} diff --git a/server/auth_provider_static_test.go b/server/auth_provider_static_test.go index f7b7537..be9611b 100644 --- a/server/auth_provider_static_test.go +++ b/server/auth_provider_static_test.go @@ -1,51 +1,44 @@ package server import ( - "io/ioutil" + "context" "os" "path/filepath" "testing" "time" - "github.com/cheekybits/is" + "github.com/jasonhancock/go-logger" + "github.com/stretchr/testify/require" ) func TestAuthProviderStaticFile(t *testing.T) { - is := is.New(t) - logger := testLogger() - - dir, err := ioutil.TempDir("", "") - is.NoErr(err) - defer os.RemoveAll(dir) - + dir := t.TempDir() file := filepath.Join(dir, "auth.yaml") - is.NoErr(ioutil.WriteFile(file, []byte(authData), 0644)) - - done := make(chan struct{}) + require.NoError(t, os.WriteFile(file, []byte(authData), 0644)) - interval := 1 * time.Second + interval := 100 * time.Millisecond + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() - a, err := NewAuthProviderStaticFile(logger, file, interval) - is.NoErr(err) - go a.Run(done) - defer close(done) + a, err := NewAuthProviderStaticFile(ctx, logger.Silence(), file, interval) + require.NoError(t, err) c, err := a.CredsForKey("apikey") - is.NoErr(err) - is.True(c.AllowMetric("metric1")) - is.False(c.AllowMetric("metric3")) + require.NoError(t, err) + require.True(t, c.AllowMetric("metric1")) + require.False(t, c.AllowMetric("metric3")) _, err = a.CredsForKey("apikey2") - is.Equal(err, ErrCredentialsNotFound) + require.Equal(t, err, ErrCredentialsNotFound) - time.Sleep(1 * time.Second) - is.NoErr(ioutil.WriteFile(file, []byte(authData2), 0644)) + time.Sleep(interval) + require.NoError(t, os.WriteFile(file, []byte(authData2), 0644)) time.Sleep(2 * interval) c, err = a.CredsForKey("apikey") - is.NoErr(err) - is.True(c.AllowMetric("metric1")) - is.True(c.AllowMetric("metric3")) + require.NoError(t, err) + require.True(t, c.AllowMetric("metric1")) + require.True(t, c.AllowMetric("metric3")) } const authData = ` @@ -54,8 +47,8 @@ apikeys: apikey: secret_key: blah metrics: - metric1: 1 - metric2: 1 + - metric1 + - metric2 ` const authData2 = ` @@ -64,7 +57,7 @@ apikeys: apikey: secret_key: blah metrics: - metric1: 1 - metric2: 1 - metric3: 1 + - metric1 + - metric2 + - metric3 ` diff --git a/server/auth_test.go b/server/auth_test.go index 8eb56a2..f9fb9ed 100644 --- a/server/auth_test.go +++ b/server/auth_test.go @@ -3,20 +3,18 @@ package server import ( "testing" - "github.com/cheekybits/is" + "github.com/stretchr/testify/require" ) func TestCredsAllowMetric(t *testing.T) { - is := is.New(t) - c := &Creds{ - Metrics: map[string]uint8{ - "metric1": 1, - "metric2": 1, + Metrics: map[string]struct{}{ + "metric1": {}, + "metric2": {}, }, } - is.True(c.AllowMetric("metric1")) - is.True(c.AllowMetric("metric2")) - is.False(c.AllowMetric("metric3")) + require.True(t, c.AllowMetric("metric1")) + require.True(t, c.AllowMetric("metric2")) + require.False(t, c.AllowMetric("metric3")) } diff --git a/server/line_protocol_server.go b/server/line_protocol_server.go deleted file mode 100644 index 0dc3f3a..0000000 --- a/server/line_protocol_server.go +++ /dev/null @@ -1,114 +0,0 @@ -package server - -import ( - "bufio" - "io" - "net" - "strings" - "sync" - - "github.com/go-kit/kit/log" - "github.com/pkg/errors" -) - -// LineProtocolServer is a generic server that takes input one line at a time -type LineProtocolServer struct { - logger log.Logger - addr string - doneChan chan struct{} - wg sync.WaitGroup - listener net.Listener - processFn func(string) -} - -// NewLineProtocolServer creates a new LineProtocolServer -func NewLineProtocolServer(l log.Logger, addr string, processFn func(string)) (*LineProtocolServer, error) { - s := &LineProtocolServer{ - logger: l, - addr: addr, - doneChan: make(chan struct{}), - processFn: processFn, - } - - return s, nil -} - -// Run starts the server -func (s *LineProtocolServer) Run() error { - var err error - s.listener, err = net.Listen("tcp", s.addr) - if err != nil { - return errors.Wrapf(err, "creating listener on %s", s.addr) - } - s.logger.Log("msg", "listening", "interface", s.addr) - - s.wg.Add(1) - go func() { - defer s.wg.Done() - s.acceptConns(s.doneChan) - }() - - return nil -} - -// Stop shuts the server down, including all background go routines -func (s *LineProtocolServer) Stop() { - s.logger.Log("msg", "stopping") - close(s.doneChan) - s.listener.Close() - s.wg.Wait() - s.logger.Log("msg", "stopped") -} - -func (s *LineProtocolServer) acceptConns(done <-chan struct{}) error { - for { - conn, err := s.listener.Accept() - if err != nil { - select { - case <-done: - return nil - default: - s.logger.Log( - "msg", "error_accepting_connection", - "error", err, - ) - } - - continue - } - // Handle connections in a new goroutine. - go s.handleRequest(conn) - } -} - -func (s *LineProtocolServer) handleRequest(conn net.Conn) { - defer conn.Close() - s.logger.Log( - "msg", "new_connection", - "remote_ip", conn.RemoteAddr(), - ) - - r := bufio.NewReader(conn) - for { - line, err := r.ReadString('\n') - if err != nil { - if err == io.EOF { - return - } - s.logger.Log( - "msg", "error_reading_conn", - "error", err, - ) - return - } - line = strings.TrimSpace(line) - - if len(line) == 0 { - continue - } - - if s.processFn != nil { - s.processFn(line) - } - } -} diff --git a/server/line_protocol_server_test.go b/server/line_protocol_server_test.go deleted file mode 100644 index 2cfb130..0000000 --- a/server/line_protocol_server_test.go +++ /dev/null @@ -1,41 +0,0 @@ -package server - -import ( - "fmt" - "net" - "testing" - "time" - - "github.com/cheekybits/is" - "github.com/go-kit/kit/log" -) - -func TestLineProtocolServer(t *testing.T) { - is := is.New(t) - - logger := testLogger() - - var lines []string - - fn := func(line string) { - lines = append(lines, line) - } - - s, err := NewLineProtocolServer(log.With(logger, "component", "line_protocol_server"), ":8095", fn) - is.NoErr(err) - is.NoErr(s.Run()) - defer s.Stop() - - conn, err := net.Dial("tcp", ":8095") - is.NoErr(err) - fmt.Fprintf(conn, "this is line 1\n") - fmt.Fprintf(conn, "this is line 2\n") - fmt.Fprintf(conn, "\n") - conn.Close() - - time.Sleep(300 * time.Millisecond) - - is.Equal(len(lines), 2) - is.Equal("this is line 1", lines[0]) - is.Equal("this is line 2", lines[1]) -} diff --git a/server/metric_writer.go b/server/metric_writer.go index e94b043..c61c147 100644 --- a/server/metric_writer.go +++ b/server/metric_writer.go @@ -1,8 +1,6 @@ package server -import ( - "github.com/jasonhancock/amproxy" -) +import "github.com/jasonhancock/amproxy/pkg/amproxy" // MetricWriter is the interface for a backend metrics store type MetricWriter interface { diff --git a/server/metric_writer_carbon.go b/server/metric_writer_carbon.go index 3c5c978..ebc833c 100644 --- a/server/metric_writer_carbon.go +++ b/server/metric_writer_carbon.go @@ -1,35 +1,33 @@ package server import ( + "fmt" "net" - "github.com/go-kit/kit/log" - "github.com/jasonhancock/amproxy" - "github.com/pkg/errors" + "github.com/jasonhancock/amproxy/pkg/amproxy" + "github.com/jasonhancock/go-logger" "gopkg.in/fatih/pool.v2" ) // MetricWriterCarbon will write metrics to a Carbon server. It pools connections. type MetricWriterCarbon struct { - logger log.Logger - pool pool.Pool + pool pool.Pool } // NewMetricWriterCarbon creates a new MetricWriterCarbon -func NewMetricWriterCarbon(l log.Logger, addr string, poolMin, poolMax int) (*MetricWriterCarbon, error) { +func NewMetricWriterCarbon(l *logger.L, addr string, poolMin, poolMax int) (*MetricWriterCarbon, error) { factory := func() (net.Conn, error) { - l.Log("msg", "establishing_connection", "addr", addr) + l.Info("establishing_connection", "addr", addr) return net.Dial("tcp", addr) } pool, err := pool.NewChannelPool(poolMin, poolMax, factory) if err != nil { - return nil, errors.Wrap(err, "creating pool") + return nil, fmt.Errorf("creating pool: %w", err) } m := &MetricWriterCarbon{ - logger: l, - pool: pool, + pool: pool, } return m, nil @@ -39,7 +37,7 @@ func NewMetricWriterCarbon(l log.Logger, addr string, poolMin, poolMax int) (*Me func (mw *MetricWriterCarbon) WriteMetric(m amproxy.Message) error { conn, err := mw.pool.Get() if err != nil { - return errors.Wrap(err, "getting connection from pool") + return fmt.Errorf("getting connection from pool: %w", err) } _, err = conn.Write([]byte(m.MetricStr() + "\n")) @@ -49,9 +47,8 @@ func (mw *MetricWriterCarbon) WriteMetric(m amproxy.Message) error { pc.MarkUnusable() pc.Close() } - return errors.Wrap(err, "writing to connection") + return fmt.Errorf("writing to connection: %w", err) } - conn.Close() - return nil + return conn.Close() } diff --git a/server/metric_writer_carbon_test.go b/server/metric_writer_carbon_test.go index 674d8aa..2afdbef 100644 --- a/server/metric_writer_carbon_test.go +++ b/server/metric_writer_carbon_test.go @@ -1,45 +1,51 @@ package server import ( + "context" + "sync" "testing" "time" - "github.com/cheekybits/is" - "github.com/go-kit/kit/log" - "github.com/jasonhancock/amproxy" + "github.com/jasonhancock/amproxy/pkg/amproxy" + "github.com/jasonhancock/amproxy/pkg/lineserver" + "github.com/jasonhancock/go-logger" + "github.com/jasonhancock/go-testhelpers/generic" + "github.com/stretchr/testify/require" ) func TestMetricWriterCarbon(t *testing.T) { - is := is.New(t) - logger := testLogger() + l := logger.Silence() var lines []string - fn := func(line string) { lines = append(lines, line) } - s, err := NewLineProtocolServer(log.With(logger, "component", "line_protocol_server"), ":8096", fn) - is.NoErr(err) - is.NoErr(s.Run()) - defer s.Stop() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + var wg sync.WaitGroup + addr := generic.NewRandomPort(t) + _, err := lineserver.New(ctx, l, &wg, addr, fn) + require.NoError(t, err) - mw, err := NewMetricWriterCarbon(logger, ":8096", 0, 3) - is.NoErr(err) + mw, err := NewMetricWriterCarbon(l, addr, 0, 3) + require.NoError(t, err) m := amproxy.Message{ Name: "test.metric.foo", Value: "1234", - Timestamp: 3456, + Timestamp: time.Unix(3456, 0), } mw.WriteMetric(m) m.Value = "4321" - m.Timestamp = 3457 + m.Timestamp = time.Unix(3457, 0) mw.WriteMetric(m) + time.Sleep(300 * time.Millisecond) - is.Equal(len(lines), 2) - is.Equal(lines[0], "test.metric.foo 1234 3456") - is.Equal(lines[1], "test.metric.foo 4321 3457") + require.Len(t, lines, 2) + require.Equal(t, "test.metric.foo 1234 3456", lines[0]) + require.Equal(t, "test.metric.foo 4321 3457", lines[1]) } diff --git a/server/server.go b/server/server.go index 78e4a1d..7c73762 100644 --- a/server/server.go +++ b/server/server.go @@ -1,107 +1,69 @@ package server import ( + "fmt" "math" - "net" - "sync" "time" - "github.com/go-kit/kit/log" - "github.com/jasonhancock/amproxy" - "github.com/pkg/errors" + "github.com/jasonhancock/amproxy/pkg/amproxy" + "github.com/jasonhancock/go-logger" ) // Server is an amproxy server. It uses a generic LineProtocolServer to handle // the connections, but implements the business logic itself. type Server struct { - logger log.Logger + logger *logger.L authProvider AuthProvider - skew float64 - doneChan chan struct{} - wg sync.WaitGroup - listener net.Listener + skew time.Duration metricWriter MetricWriter - server *LineProtocolServer } // NewServer creates a new Server -func NewServer(l log.Logger, addr string, skew float64, authProvider AuthProvider, mw MetricWriter) (*Server, error) { +func NewServer(l *logger.L, skew time.Duration, authProvider AuthProvider, mw MetricWriter) *Server { s := &Server{ logger: l, skew: skew, authProvider: authProvider, - doneChan: make(chan struct{}), metricWriter: mw, } - lp, err := NewLineProtocolServer(log.With(l, "component", "line_protocol_server"), addr, func(line string) { - if err := s.processLine(line); err != nil { - l.Log("msg", "process_line_error", "error", err) - } - }) - if err != nil { - return nil, errors.Wrap(err, "creating protocol server") - } - s.server = lp - - return s, nil + return s } -// Run starts the Server -func (s *Server) Run() error { - s.wg.Add(1) - go func() { - defer s.wg.Done() - s.authProvider.Run(s.doneChan) - }() - - err := s.server.Run() - if err != nil { - return errors.Wrapf(err, "running lp server") +func (s *Server) ProcessLine(line string) { + if err := s.processLine(line); err != nil { + s.logger.LogError("process_line_error", err) } - - return nil -} - -// Stop shuts the server down, including all background go routines -func (s *Server) Stop() { - s.logger.Log("msg", "stopping") - close(s.doneChan) - s.server.Stop() - s.wg.Wait() - s.logger.Log("msg", "stopped") } func (s *Server) processLine(line string) error { - - msg, err := amproxy.Parse(line) + msg, err := amproxy.ParseSigned(line) if err != nil { - return errors.Wrapf(err, "decomposing message %q", line) + return fmt.Errorf("decomposing message %q: %w", line, err) } creds, err := s.authProvider.CredsForKey(msg.PublicKey) if err != nil { - return errors.Wrapf(err, "pub key %q", msg.PublicKey) + return fmt.Errorf("pub key %q: %w", msg.PublicKey, err) } sig := msg.ComputeSignature(creds.SecretKey) - if sig != msg.Signature { - return errors.Errorf("computed signature %s doesn't match provided signature %s", sig, msg.Signature) + return fmt.Errorf("computed signature %s doesn't match provided signature %s", sig, msg.Signature) } - delta := math.Abs(float64(time.Now().Unix() - int64(msg.Timestamp))) + delta := time.Duration(math.Abs(float64(time.Since(msg.Timestamp)))) if delta > s.skew { - return errors.Errorf("delta = %.0f, max skew set to %.0f", delta, s.skew) + return fmt.Errorf("delta = %s, max skew set to %s", delta, s.skew) } // validate the metric is on the approved list if !creds.AllowMetric(msg.Name) { - return errors.Errorf("not an approved metric: %s", msg.Name) + return fmt.Errorf("not an approved metric: %s", msg.Name) } if err := s.metricWriter.WriteMetric(*msg); err != nil { - return errors.Wrap(err, "write metric") + return fmt.Errorf("write metric: %w", err) } return nil } diff --git a/server/server_test.go b/server/server_test.go index 72c28a9..6542aa7 100644 --- a/server/server_test.go +++ b/server/server_test.go @@ -1,69 +1,161 @@ package server import ( - "os" + "errors" "testing" "time" - "github.com/cheekybits/is" - "github.com/go-kit/kit/log" - "github.com/jasonhancock/amproxy" + "github.com/jasonhancock/amproxy/pkg/amproxy" + "github.com/jasonhancock/go-logger" + "github.com/stretchr/testify/require" ) func TestServer(t *testing.T) { - is := is.New(t) + const publicKey = "public_key" + const privateKey = "private_key" + const metricName = "foo.bar" - var observedMetrics []string - - logger := testLogger() - mw := &mockMetricWriter{ - WriteMetricFn: func(m amproxy.Message) error { - observedMetrics = append(observedMetrics, m.Name) - return nil + tests := []struct { + description string + inputMetricName string + inputSkew time.Duration + inputPubKey string + inputPrivateKey string + errMetricWriter error + err error + expected []amproxy.Message + }{ + { + "normal", + metricName, + 0 * time.Second, + publicKey, + privateKey, + nil, + nil, + []amproxy.Message{ + { + Name: metricName, + Value: "1234", + }, + }, + }, + { + "metric writer error", + metricName, + 0 * time.Second, + publicKey, + privateKey, + errors.New("some mw error"), + errors.New("write metric: some mw error"), + nil, + }, + { + "bad line", + "", + 0 * time.Second, + publicKey, + privateKey, + nil, + errors.New("decomposing message"), + nil, + }, + { + "bad metric name", + metricName + "foo", + 0 * time.Second, + publicKey, + privateKey, + nil, + errors.New("not an approved metric"), + nil, + }, + { + "bad clock skew", + metricName, + 600 * time.Second, + publicKey, + privateKey, + nil, + errors.New("max skew set to 5m0s"), + nil, + }, + { + "bad pub key", + metricName, + 0 * time.Second, + publicKey + "bad", + privateKey, + nil, + ErrCredentialsNotFound, + nil, + }, + { + "bad signature", + metricName, + 0 * time.Second, + publicKey, + privateKey + "bad", + nil, + errors.New("doesn't match provided signature"), + nil, }, } - publicKey := "public_key" - privateKey := "private_key" - metricName := "foo.bar" + for _, tt := range tests { + t.Run(tt.description, func(t *testing.T) { + var observedMetrics []amproxy.Message + mw := &mockMetricWriter{ + WriteMetricFn: func(m amproxy.Message) error { + observedMetrics = append(observedMetrics, m) + return tt.errMetricWriter + }, + } - creds := map[string]*Creds{ - publicKey: &Creds{ - SecretKey: privateKey, - Metrics: map[string]uint8{ - metricName: 1, - }, - }, - } + creds := map[string]*Creds{ + publicKey: { + SecretKey: privateKey, + Metrics: map[string]struct{}{ + metricName: {}, + }, + }, + } - ap := &mockAuthProvider{ - CredsForKeyFn: func(key string) (*Creds, error) { - c, ok := creds[key] - if !ok { - return nil, ErrCredentialsNotFound + ap := &mockAuthProvider{ + CredsForKeyFn: func(key string) (*Creds, error) { + c, ok := creds[key] + if !ok { + return nil, ErrCredentialsNotFound + } + return c, nil + }, } - return c, nil - }, - } - s, err := NewServer(log.With(logger, "component", "server"), ":8095", 300, ap, mw) - is.NoErr(err) - is.NoErr(s.Run()) - defer s.Stop() - time.Sleep(300 * time.Millisecond) + l := logger.Default() - client := amproxy.NewClient(publicKey, privateKey, ":8095") - m := amproxy.Message{ - Name: metricName, - Value: "1234", - Timestamp: int(time.Now().Unix()), - } - client.Write(m) - m.Name = "foo.bar.baz" - client.Write(m) - time.Sleep(300 * time.Millisecond) -} + s := NewServer(l, 300*time.Second, ap, mw) -func testLogger() log.Logger { - return log.NewLogfmtLogger(log.NewSyncWriter(os.Stderr)) + m := amproxy.Message{ + Name: tt.inputMetricName, + Value: "1234", + Timestamp: time.Now().Add(tt.inputSkew), + PublicKey: tt.inputPubKey, + } + m.Signature = m.ComputeSignature(tt.inputPrivateKey) + + err := s.processLine(m.String()) + if tt.err != nil { + require.Error(t, err) + require.ErrorContains(t, err, tt.err.Error()) + return + } + require.NoError(t, err) + + require.Len(t, observedMetrics, len(tt.expected)) + for i := range tt.expected { + require.Equal(t, tt.expected[i].Name, observedMetrics[0].Name) + require.Equal(t, tt.expected[i].Value, observedMetrics[0].Value) + } + }) + } }