From 8ac06216403f54f701a2d19b0a9fb6d8d0032d60 Mon Sep 17 00:00:00 2001 From: Chris Hager Date: Tue, 3 Oct 2023 09:16:59 +0200 Subject: [PATCH] [breaking] Better auth: allow sources to be added multiple times (#26) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## 📝 Summary Allow any source to be added multiple times. ⚠️ WARNING - this is a breaking change to the collector, all flags and env vars have changed and need to be double-checked! --- ## ✅ I have run these commands * [x] `make lint` * [x] `make test` * [x] `go mod tidy` --- .env.example | 17 ++- cmd/analyze/main.go | 13 ++- cmd/collect/main.go | 174 ++++++++++++++++-------------- collector/collector.go | 64 +++++------ collector/consts.go | 6 +- collector/node_conn_bloxroute.go | 20 +++- collector/node_conn_chainbound.go | 5 +- collector/node_conn_eden.go | 20 +++- common/utils.go | 9 ++ scripts/check-systemd-errors.sh | 2 +- scripts/subscibe-test/main.go | 23 ++-- 11 files changed, 199 insertions(+), 154 deletions(-) diff --git a/.env.example b/.env.example index 1a26964..9dbf8e4 100644 --- a/.env.example +++ b/.env.example @@ -1,19 +1,16 @@ -# bloXroute -export BLX_AUTH_HEADER="" -# export BLX_URI="" # either wss:// or IP:port (for gRPC bloxroute gateway), default: 127.0.0.1:1001 (alternatives: https://docs.bloxroute.com/introduction/cloud-api-ips) +# bloXroute auth (Websocket or gRPC. format: "token" or "token@url") +export BLX_AUTH="" -# chainbound -export CHAINBOUND_API_KEY="" -# export CHAINBOUND_URI="" # default: beta.fiberapi.io:8080 +# Chainbound auth (format: "api-key" or "api-key@url") +export CHAINBOUND_AUTH="" -# eden -export EDEN_AUTH_HEADER="" -# export EDEN_URI="" # default: wss://speed-eu-west.edennetwork.io +# Eden auth (format: "token" or "token@url") +export EDEN_AUTH="" # Source aliases export SRC_ALIASES="local=ws://localhost:8546" # comma-separated list of alias=url -# Node URL, used to check transactions for already being included +# Node URL for checking transactions inclusion status export CHECK_NODE_URI="" # Cloudflare R2 account ID (for upload.sh) diff --git a/cmd/analyze/main.go b/cmd/analyze/main.go index 9cf2a9e..1401ae0 100644 --- a/cmd/analyze/main.go +++ b/cmd/analyze/main.go @@ -22,6 +22,14 @@ var ( // CLI flags cliFlags = []cli.Flag{ + &cli.StringSliceFlag{ + Name: "input-parquet", + Usage: "input parquet files", + }, + &cli.StringSliceFlag{ + Name: "input-sourcelog", + Usage: "input sourcelog files", + }, &cli.StringFlag{ Name: "out", Usage: "output filename", @@ -38,11 +46,6 @@ var ( Name: "cmp", Usage: "compare these sources", }, - &cli.StringSliceFlag{ - Name: "input-parquet", - }, &cli.StringSliceFlag{ - Name: "input-sourcelog", - }, } ) diff --git a/cmd/collect/main.go b/cmd/collect/main.go index 61737c8..f2d891c 100644 --- a/cmd/collect/main.go +++ b/cmd/collect/main.go @@ -1,105 +1,116 @@ package main import ( - "flag" - "fmt" + "log" "os" "os/signal" - "strings" "syscall" "github.com/flashbots/mempool-dumpster/collector" "github.com/flashbots/mempool-dumpster/common" "github.com/lithammer/shortuuid" - "go.uber.org/zap" - "go.uber.org/zap/zapcore" + "github.com/urfave/cli/v2" ) var ( version = "dev" // is set during build process - // Default values - defaultDebug = os.Getenv("DEBUG") == "1" - defaultLogProd = os.Getenv("LOG_PROD") == "1" - defaultLogService = os.Getenv("LOG_SERVICE") - defaultCheckNodeURI = os.Getenv("CHECK_NODE_URI") - - // API keys - defaultblxAuthToken = os.Getenv("BLX_AUTH_HEADER") - defaultEdenAuthToken = os.Getenv("EDEN_AUTH_HEADER") - defaultChainboundAPIKey = os.Getenv("CHAINBOUND_API_KEY") - - // Flags - printVersion = flag.Bool("version", false, "only print version") - debugPtr = flag.Bool("debug", defaultDebug, "print debug output") - logProdPtr = flag.Bool("log-prod", defaultLogProd, "log in production mode (json)") - logServicePtr = flag.String("log-service", defaultLogService, "'service' tag to logs") - nodesPtr = flag.String("nodes", "ws://localhost:8546", "comma separated list of EL nodes") - checkNodeURI = flag.String("check-node", defaultCheckNodeURI, "node to use for checking incoming transactions") - outDirPtr = flag.String("out", "", "path to collect raw transactions into") - uidPtr = flag.String("uid", "", "collector uid (part of output CSV filename)") - - blxAuthToken = flag.String("blx-token", defaultblxAuthToken, "bloXroute auth token (optional)") - edenAuthToken = flag.String("eden-token", defaultEdenAuthToken, "Eden auth token (optional)") - chainboundAPIKey = flag.String("chainbound-api-key", defaultChainboundAPIKey, "Chainbound API key (optional)") + cliFlags = []cli.Flag{ + // Collector configuration + &cli.BoolFlag{ + Name: "debug", + EnvVars: []string{"DEBUG"}, + Usage: "enable debug logging", + Category: "Collector Configuration", + }, + &cli.StringFlag{ + Name: "out", + EnvVars: []string{"OUT"}, + Required: true, + Usage: "output base directory", + Category: "Collector Configuration", + }, + &cli.StringFlag{ + Name: "uid", + EnvVars: []string{"UID"}, + Usage: "collector uid, part of output CSV filenames (default: random)", + Category: "Collector Configuration", + }, + &cli.StringFlag{ + Name: "check-node", + EnvVars: []string{"CHECK_NODE"}, + Usage: "EL node URL to check incoming transactions", + Category: "Collector Configuration", + }, + + // Sources + &cli.StringSliceFlag{ + Name: "node", + Aliases: []string{"nodes"}, + EnvVars: []string{"NODE", "NODES"}, + Usage: "EL node URL(s)", + Category: "Sources Configuration", + }, + &cli.StringSliceFlag{ + Name: "blx", + EnvVars: []string{"BLX_AUTH"}, + Usage: "bloXroute auth-header (or auth-header@url)", + Category: "Sources Configuration", + }, + &cli.StringSliceFlag{ + Name: "eden", + EnvVars: []string{"EDEN_AUTH"}, + Usage: "Eden auth-header (or auth-header@url)", + Category: "Sources Configuration", + }, + &cli.StringSliceFlag{ + Name: "chainbound", + EnvVars: []string{"CHAINBOUND_AUTH"}, + Usage: "Chainbound API key (or api-key@url)", + Category: "Sources Configuration", + }, + } ) func main() { - flag.Parse() - - // perhaps only print the version - if *printVersion { - fmt.Printf("mempool-collector %s\n", version) - return + app := &cli.App{ + Name: "mempool-dumpster/collector", + Usage: "Collect mempool transactions from various sources", + Version: version, + Flags: cliFlags, + Action: runCollector, } - // Logger setup - var logger *zap.Logger - zapLevel := zap.NewAtomicLevel() - if *debugPtr { - zapLevel.SetLevel(zap.DebugLevel) - } - if *logProdPtr { - encoderCfg := zap.NewProductionEncoderConfig() - encoderCfg.EncodeTime = zapcore.ISO8601TimeEncoder - logger = zap.New(zapcore.NewCore( - zapcore.NewJSONEncoder(encoderCfg), - zapcore.Lock(os.Stdout), - zapLevel, - )) - } else { - logger = zap.New(zapcore.NewCore( - zapcore.NewConsoleEncoder(zap.NewDevelopmentEncoderConfig()), - zapcore.Lock(os.Stdout), - zapLevel, - )) + if err := app.Run(os.Args); err != nil { + log.Fatal(err) } +} - defer func() { _ = logger.Sync() }() - log := logger.Sugar() +func runCollector(cCtx *cli.Context) error { + var ( + debug = cCtx.Bool("debug") + outDir = cCtx.String("out") + uid = cCtx.String("uid") + checkNodeURI = cCtx.String("check-node") + nodeURIs = cCtx.StringSlice("node") + blxAuth = cCtx.StringSlice("blx") + edenAuth = cCtx.StringSlice("eden") + chainboundAuth = cCtx.StringSlice("chainbound") + ) - if *logServicePtr != "" { - log = log.With("service", *logServicePtr) - } - - if *outDirPtr == "" { - log.Fatal("No output directory set (use -out )") - } + // Logger setup + log := common.GetLogger(debug, false) + defer func() { _ = log.Sync() }() - if *uidPtr == "" { - *uidPtr = shortuuid.New()[:6] + if uid == "" { + uid = shortuuid.New()[:6] } - if *nodesPtr == "" && *blxAuthToken == "" && *edenAuthToken == "" { + if len(nodeURIs) == 0 && len(blxAuth) == 0 && len(edenAuth) == 0 && len(chainboundAuth) == 0 { log.Fatal("No nodes, bloxroute, or eden token set (use -nodes , / -blx-token / -eden-token )") } - nodes := []string{} - if *nodesPtr != "" { - nodes = strings.Split(*nodesPtr, ",") - } - - log.Infow("Starting mempool-collector", "version", version, "outDir", *outDirPtr, "uid", *uidPtr) + log.Infow("Starting mempool-collector", "version", version, "outDir", outDir, "uid", uid) aliases := common.SourceAliasesFromEnv() if len(aliases) > 0 { @@ -108,14 +119,14 @@ func main() { // Start service components opts := collector.CollectorOpts{ - Log: log, - UID: *uidPtr, - Nodes: nodes, - OutDir: *outDirPtr, - BloxrouteAuthToken: *blxAuthToken, - EdenAuthToken: *edenAuthToken, - ChainboundAPIKey: *chainboundAPIKey, - CheckNodeURI: *checkNodeURI, + Log: log, + UID: uid, + OutDir: outDir, + CheckNodeURI: checkNodeURI, + Nodes: nodeURIs, + BloxrouteAuth: blxAuth, + EdenAuth: edenAuth, + ChainboundAuth: chainboundAuth, } collector.Start(&opts) @@ -125,4 +136,5 @@ func main() { signal.Notify(exit, os.Interrupt, syscall.SIGTERM) <-exit log.Info("bye") + return nil } diff --git a/collector/collector.go b/collector/collector.go index c4b62a4..3f7c628 100644 --- a/collector/collector.go +++ b/collector/collector.go @@ -13,64 +13,58 @@ type CollectorOpts struct { OutDir string CheckNodeURI string - BloxrouteAuthToken string - EdenAuthToken string - ChainboundAPIKey string + BloxrouteAuth []string + EdenAuth []string + ChainboundAuth []string } // Start kicks off all the service components in the background func Start(opts *CollectorOpts) { processor := NewTxProcessor(TxProcessorOpts{ Log: opts.Log, - OutDir: opts.OutDir, UID: opts.UID, + OutDir: opts.OutDir, CheckNodeURI: opts.CheckNodeURI, }) go processor.Start() + // Regular nodes for _, node := range opts.Nodes { conn := NewNodeConnection(opts.Log, node, processor.txC) conn.StartInBackground() } - if opts.BloxrouteAuthToken != "" { - blxOpts := BlxNodeOpts{ //nolint:exhaustruct + // Bloxroute + for _, auth := range opts.BloxrouteAuth { + token, url := common.GetAuthTokenAndURL(auth) + startBloxrouteConnection(BlxNodeOpts{ + TxC: processor.txC, Log: opts.Log, - AuthHeader: opts.BloxrouteAuthToken, - URL: blxDefaultURL, // URL is taken from ENV vars - } - - // start Websocket or gRPC subscription depending on URL - if common.IsWebsocketProtocol(blxOpts.URL) { - blxConn := NewBlxNodeConnection(blxOpts, processor.txC) - go blxConn.Start() - } else { - blxConn := NewBlxNodeConnectionGRPC(blxOpts, processor.txC) - go blxConn.Start() - } + AuthHeader: token, + URL: url, + }) } - if opts.EdenAuthToken != "" { - edenOpts := EdenNodeOpts{ //nolint:exhaustruct + // Eden + for _, auth := range opts.EdenAuth { + token, url := common.GetAuthTokenAndURL(auth) + startEdenConnection(EdenNodeOpts{ + TxC: processor.txC, Log: opts.Log, - AuthHeader: opts.EdenAuthToken, - URL: edenDefaultURL, // URL is taken from ENV vars - } - if common.IsWebsocketProtocol(edenOpts.URL) { - edenConn := NewEdenNodeConnection(edenOpts, processor.txC) - go edenConn.Start() - } else { - edenConn := NewEdenNodeConnectionGRPC(edenOpts, processor.txC) - go edenConn.Start() - } + AuthHeader: token, + URL: url, + }) } - if opts.ChainboundAPIKey != "" { - opts := ChainboundNodeOpts{ //nolint:exhaustruct + // Chainbound + for _, auth := range opts.ChainboundAuth { + token, url := common.GetAuthTokenAndURL(auth) + chainboundConn := NewChainboundNodeConnection(ChainboundNodeOpts{ + TxC: processor.txC, Log: opts.Log, - APIKey: opts.ChainboundAPIKey, - } - chainboundConn := NewChainboundNodeConnection(opts, processor.txC) + APIKey: token, + URL: url, + }) go chainboundConn.Start() } } diff --git a/collector/consts.go b/collector/consts.go index 8fea554..dd92217 100644 --- a/collector/consts.go +++ b/collector/consts.go @@ -19,10 +19,10 @@ const ( ) var ( - // Bloxroute URL - local Gateway GRPC port or websocket URI (https://docs.bloxroute.com/introduction/cloud-api-ips) - blxDefaultURL = common.GetEnv("BLX_URI", "127.0.0.1:1001") + // Bloxroute URL - Websocket URI or Gateway GRPC URI (https://docs.bloxroute.com/introduction/cloud-api-ips) + blxDefaultURL = common.GetEnv("BLX_URI", "wss://germany.eth.blxrbdn.com/ws") - // Eden URL: https://docs.edennetwork.io/eden-rpc/speed-rpc + // Eden URL - https://docs.edennetwork.io/eden-rpc/speed-rpc edenDefaultURL = common.GetEnv("EDEN_URI", "wss://speed-eu-west.edennetwork.io") // Chainbound Fiber URL diff --git a/collector/node_conn_bloxroute.go b/collector/node_conn_bloxroute.go index 0c29898..7b95277 100644 --- a/collector/node_conn_bloxroute.go +++ b/collector/node_conn_bloxroute.go @@ -24,12 +24,24 @@ import ( ) type BlxNodeOpts struct { + TxC chan TxIn Log *zap.SugaredLogger AuthHeader string URL string // optional override, default: blxDefaultURL SourceTag string // optional override, default: "blx" (common.BloxrouteTag) } +// startBloxrouteConnection starts a Websocket or gRPC subscription (depending on URL) in the background +func startBloxrouteConnection(opts BlxNodeOpts) { + if common.IsWebsocketProtocol(opts.URL) { + blxConn := NewBlxNodeConnection(opts) + go blxConn.Start() + } else { + blxConn := NewBlxNodeConnectionGRPC(opts) + go blxConn.Start() + } +} + type BlxNodeConnection struct { log *zap.SugaredLogger authHeader string @@ -39,7 +51,7 @@ type BlxNodeConnection struct { backoffSec int } -func NewBlxNodeConnection(opts BlxNodeOpts, txC chan TxIn) *BlxNodeConnection { +func NewBlxNodeConnection(opts BlxNodeOpts) *BlxNodeConnection { url := opts.URL if url == "" { url = blxDefaultURL @@ -55,7 +67,7 @@ func NewBlxNodeConnection(opts BlxNodeOpts, txC chan TxIn) *BlxNodeConnection { authHeader: opts.AuthHeader, url: url, srcTag: srcTag, - txC: txC, + txC: opts.TxC, backoffSec: initialBackoffSec, } } @@ -157,7 +169,7 @@ type BlxNodeConnectionGRPC struct { backoffSec int } -func NewBlxNodeConnectionGRPC(opts BlxNodeOpts, txC chan TxIn) *BlxNodeConnectionGRPC { +func NewBlxNodeConnectionGRPC(opts BlxNodeOpts) *BlxNodeConnectionGRPC { url := opts.URL if url == "" { url = blxDefaultURL @@ -168,7 +180,7 @@ func NewBlxNodeConnectionGRPC(opts BlxNodeOpts, txC chan TxIn) *BlxNodeConnectio authHeader: opts.AuthHeader, url: url, srcTag: common.SourceTagBloxroute, - txC: txC, + txC: opts.TxC, backoffSec: initialBackoffSec, } } diff --git a/collector/node_conn_chainbound.go b/collector/node_conn_chainbound.go index 8ca2c12..aeeab74 100644 --- a/collector/node_conn_chainbound.go +++ b/collector/node_conn_chainbound.go @@ -13,6 +13,7 @@ import ( ) type ChainboundNodeOpts struct { + TxC chan TxIn Log *zap.SugaredLogger APIKey string URL string // optional override, default: ChainboundDefaultURL @@ -29,7 +30,7 @@ type ChainboundNodeConnection struct { backoffSec int } -func NewChainboundNodeConnection(opts ChainboundNodeOpts, txC chan TxIn) *ChainboundNodeConnection { +func NewChainboundNodeConnection(opts ChainboundNodeOpts) *ChainboundNodeConnection { url := opts.URL if url == "" { url = chainboundDefaultURL @@ -46,7 +47,7 @@ func NewChainboundNodeConnection(opts ChainboundNodeOpts, txC chan TxIn) *Chainb url: url, srcTag: srcTag, fiberC: make(chan *fiber.Transaction), - txC: txC, + txC: opts.TxC, backoffSec: initialBackoffSec, } } diff --git a/collector/node_conn_eden.go b/collector/node_conn_eden.go index a15ef9f..4324c6e 100644 --- a/collector/node_conn_eden.go +++ b/collector/node_conn_eden.go @@ -22,12 +22,24 @@ import ( ) type EdenNodeOpts struct { + TxC chan TxIn Log *zap.SugaredLogger AuthHeader string URL string // optional override, default: edenDefaultURL SourceTag string // optional override, default: "eden" (common.SourceTagEden) } +// startEdenConnection starts a Websocket or gRPC subscription (depending on URL) in the background +func startEdenConnection(opts EdenNodeOpts) { + if common.IsWebsocketProtocol(opts.URL) { + edenConn := NewEdenNodeConnection(opts) + go edenConn.Start() + } else { + edenConn := NewEdenNodeConnectionGRPC(opts) + go edenConn.Start() + } +} + type EdenNodeConnection struct { log *zap.SugaredLogger authHeader string @@ -37,7 +49,7 @@ type EdenNodeConnection struct { backoffSec int } -func NewEdenNodeConnection(opts EdenNodeOpts, txC chan TxIn) *EdenNodeConnection { +func NewEdenNodeConnection(opts EdenNodeOpts) *EdenNodeConnection { url := opts.URL if url == "" { url = edenDefaultURL @@ -53,7 +65,7 @@ func NewEdenNodeConnection(opts EdenNodeOpts, txC chan TxIn) *EdenNodeConnection authHeader: opts.AuthHeader, url: url, srcTag: srcTag, - txC: txC, + txC: opts.TxC, backoffSec: initialBackoffSec, } } @@ -155,7 +167,7 @@ type EdenNodeConnectionGRPC struct { backoffSec int } -func NewEdenNodeConnectionGRPC(opts EdenNodeOpts, txC chan TxIn) *EdenNodeConnectionGRPC { +func NewEdenNodeConnectionGRPC(opts EdenNodeOpts) *EdenNodeConnectionGRPC { url := opts.URL if url == "" { url = edenDefaultURL @@ -166,7 +178,7 @@ func NewEdenNodeConnectionGRPC(opts EdenNodeOpts, txC chan TxIn) *EdenNodeConnec authHeader: opts.AuthHeader, url: url, srcTag: common.SourceTagEden, - txC: txC, + txC: opts.TxC, backoffSec: initialBackoffSec, } } diff --git a/common/utils.go b/common/utils.go index 4547c0a..c67d09f 100644 --- a/common/utils.go +++ b/common/utils.go @@ -186,3 +186,12 @@ func TitleStrings(s []string) []string { } return ret } + +// GetAuthTokenAndURL takes in auth strings like "token" or "token@url" and returns token and url separately +func GetAuthTokenAndURL(auth string) (string, string) { + parts := strings.Split(auth, "@") + if len(parts) < 2 { + return auth, "" + } + return parts[0], parts[1] +} diff --git a/scripts/check-systemd-errors.sh b/scripts/check-systemd-errors.sh index 31e5a77..b67d13b 100755 --- a/scripts/check-systemd-errors.sh +++ b/scripts/check-systemd-errors.sh @@ -41,7 +41,7 @@ function reset() { } date -journalctl -u mempool-collector -o cat --since "$SINCE" | grep ERROR | tee /tmp/mempool-collector-errors.log +journalctl -u mempool-collector -o cat --since "$SINCE" | grep ERROR | tee /tmp/mempool-collector-errors.log || true lines=$(wc -l /tmp/mempool-collector-errors.log | awk '{print $1}') # echo "Found $lines errors in mempool-collector service" diff --git a/scripts/subscibe-test/main.go b/scripts/subscibe-test/main.go index a7af88d..f88ad47 100644 --- a/scripts/subscibe-test/main.go +++ b/scripts/subscibe-test/main.go @@ -41,11 +41,14 @@ func MainGeneric() { func MainBlx() { txC := make(chan collector.TxIn) log := common.GetLogger(true, false) - blxOpts := collector.BlxNodeOpts{ //nolint:exhaustruct + token, url := common.GetAuthTokenAndURL(os.Getenv("BLX_AUTH")) + blxOpts := collector.BlxNodeOpts{ + TxC: txC, Log: log, - AuthHeader: os.Getenv("BLX_AUTH_HEADER"), + AuthHeader: token, + URL: url, } - nc := collector.NewBlxNodeConnectionGRPC(blxOpts, txC) + nc := collector.NewBlxNodeConnectionGRPC(blxOpts) go nc.Start() for tx := range txC { log.Infow("received tx", "tx", tx.Tx.Hash(), "src", tx.Source) @@ -55,13 +58,14 @@ func MainBlx() { func MainEden() { txC := make(chan collector.TxIn) log := common.GetLogger(true, false) + token, url := common.GetAuthTokenAndURL(os.Getenv("EDEN_AUTH")) blxOpts := collector.EdenNodeOpts{ + TxC: txC, Log: log, - AuthHeader: os.Getenv("EDEN_AUTH_HEADER"), - URL: "wss://speed-eu-west.edennetwork.io", - SourceTag: "eden", + AuthHeader: token, + URL: url, } - nc := collector.NewEdenNodeConnection(blxOpts, txC) + nc := collector.NewEdenNodeConnection(blxOpts) go nc.Start() for tx := range txC { log.Infow("received tx", "tx", tx.Tx.Hash(), "src", tx.Source) @@ -72,10 +76,11 @@ func MainChainbound() { txC := make(chan collector.TxIn) log := common.GetLogger(true, false) opts := collector.ChainboundNodeOpts{ //nolint:exhaustruct + TxC: txC, Log: log, - APIKey: os.Getenv("CHAINBOUND_API_KEY"), + APIKey: os.Getenv("CHAINBOUND_AUTH"), } - nc := collector.NewChainboundNodeConnection(opts, txC) + nc := collector.NewChainboundNodeConnection(opts) go nc.Start() for tx := range txC { log.Infow("received tx", "tx", tx.Tx.Hash(), "src", tx.Source)