Skip to content

Commit

Permalink
[breaking] Better auth: allow sources to be added multiple times (#26)
Browse files Browse the repository at this point in the history
## 📝 Summary

Allow any source to be added multiple times.

⚠️ WARNING - this is a breaking change to the collector, all flags and
env vars have changed and need to be double-checked!

---

## ✅ I have run these commands

* [x] `make lint`
* [x] `make test`
* [x] `go mod tidy`
  • Loading branch information
metachris authored Oct 3, 2023
1 parent b3a5d43 commit 8ac0621
Show file tree
Hide file tree
Showing 11 changed files with 199 additions and 154 deletions.
17 changes: 7 additions & 10 deletions .env.example
Original file line number Diff line number Diff line change
@@ -1,19 +1,16 @@
# bloXroute
export BLX_AUTH_HEADER=""
# export BLX_URI="" # either wss:// or IP:port (for gRPC bloxroute gateway), default: 127.0.0.1:1001 (alternatives: https://docs.bloxroute.com/introduction/cloud-api-ips)
# bloXroute auth (Websocket or gRPC. format: "token" or "token@url")
export BLX_AUTH=""

# chainbound
export CHAINBOUND_API_KEY=""
# export CHAINBOUND_URI="" # default: beta.fiberapi.io:8080
# Chainbound auth (format: "api-key" or "api-key@url")
export CHAINBOUND_AUTH=""

# eden
export EDEN_AUTH_HEADER=""
# export EDEN_URI="" # default: wss://speed-eu-west.edennetwork.io
# Eden auth (format: "token" or "token@url")
export EDEN_AUTH=""

# Source aliases
export SRC_ALIASES="local=ws://localhost:8546" # comma-separated list of alias=url

# Node URL, used to check transactions for already being included
# Node URL for checking transactions inclusion status
export CHECK_NODE_URI=""

# Cloudflare R2 account ID (for upload.sh)
Expand Down
13 changes: 8 additions & 5 deletions cmd/analyze/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,14 @@ var (

// CLI flags
cliFlags = []cli.Flag{
&cli.StringSliceFlag{
Name: "input-parquet",
Usage: "input parquet files",
},
&cli.StringSliceFlag{
Name: "input-sourcelog",
Usage: "input sourcelog files",
},
&cli.StringFlag{
Name: "out",
Usage: "output filename",
Expand All @@ -38,11 +46,6 @@ var (
Name: "cmp",
Usage: "compare these sources",
},
&cli.StringSliceFlag{
Name: "input-parquet",
}, &cli.StringSliceFlag{
Name: "input-sourcelog",
},
}
)

Expand Down
174 changes: 93 additions & 81 deletions cmd/collect/main.go
Original file line number Diff line number Diff line change
@@ -1,105 +1,116 @@
package main

import (
"flag"
"fmt"
"log"
"os"
"os/signal"
"strings"
"syscall"

"github.com/flashbots/mempool-dumpster/collector"
"github.com/flashbots/mempool-dumpster/common"
"github.com/lithammer/shortuuid"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"github.com/urfave/cli/v2"
)

var (
version = "dev" // is set during build process

// Default values
defaultDebug = os.Getenv("DEBUG") == "1"
defaultLogProd = os.Getenv("LOG_PROD") == "1"
defaultLogService = os.Getenv("LOG_SERVICE")
defaultCheckNodeURI = os.Getenv("CHECK_NODE_URI")

// API keys
defaultblxAuthToken = os.Getenv("BLX_AUTH_HEADER")
defaultEdenAuthToken = os.Getenv("EDEN_AUTH_HEADER")
defaultChainboundAPIKey = os.Getenv("CHAINBOUND_API_KEY")

// Flags
printVersion = flag.Bool("version", false, "only print version")
debugPtr = flag.Bool("debug", defaultDebug, "print debug output")
logProdPtr = flag.Bool("log-prod", defaultLogProd, "log in production mode (json)")
logServicePtr = flag.String("log-service", defaultLogService, "'service' tag to logs")
nodesPtr = flag.String("nodes", "ws://localhost:8546", "comma separated list of EL nodes")
checkNodeURI = flag.String("check-node", defaultCheckNodeURI, "node to use for checking incoming transactions")
outDirPtr = flag.String("out", "", "path to collect raw transactions into")
uidPtr = flag.String("uid", "", "collector uid (part of output CSV filename)")

blxAuthToken = flag.String("blx-token", defaultblxAuthToken, "bloXroute auth token (optional)")
edenAuthToken = flag.String("eden-token", defaultEdenAuthToken, "Eden auth token (optional)")
chainboundAPIKey = flag.String("chainbound-api-key", defaultChainboundAPIKey, "Chainbound API key (optional)")
cliFlags = []cli.Flag{
// Collector configuration
&cli.BoolFlag{
Name: "debug",
EnvVars: []string{"DEBUG"},
Usage: "enable debug logging",
Category: "Collector Configuration",
},
&cli.StringFlag{
Name: "out",
EnvVars: []string{"OUT"},
Required: true,
Usage: "output base directory",
Category: "Collector Configuration",
},
&cli.StringFlag{
Name: "uid",
EnvVars: []string{"UID"},
Usage: "collector uid, part of output CSV filenames (default: random)",
Category: "Collector Configuration",
},
&cli.StringFlag{
Name: "check-node",
EnvVars: []string{"CHECK_NODE"},
Usage: "EL node URL to check incoming transactions",
Category: "Collector Configuration",
},

// Sources
&cli.StringSliceFlag{
Name: "node",
Aliases: []string{"nodes"},
EnvVars: []string{"NODE", "NODES"},
Usage: "EL node URL(s)",
Category: "Sources Configuration",
},
&cli.StringSliceFlag{
Name: "blx",
EnvVars: []string{"BLX_AUTH"},
Usage: "bloXroute auth-header (or auth-header@url)",
Category: "Sources Configuration",
},
&cli.StringSliceFlag{
Name: "eden",
EnvVars: []string{"EDEN_AUTH"},
Usage: "Eden auth-header (or auth-header@url)",
Category: "Sources Configuration",
},
&cli.StringSliceFlag{
Name: "chainbound",
EnvVars: []string{"CHAINBOUND_AUTH"},
Usage: "Chainbound API key (or api-key@url)",
Category: "Sources Configuration",
},
}
)

func main() {
flag.Parse()

// perhaps only print the version
if *printVersion {
fmt.Printf("mempool-collector %s\n", version)
return
app := &cli.App{
Name: "mempool-dumpster/collector",
Usage: "Collect mempool transactions from various sources",
Version: version,
Flags: cliFlags,
Action: runCollector,
}

// Logger setup
var logger *zap.Logger
zapLevel := zap.NewAtomicLevel()
if *debugPtr {
zapLevel.SetLevel(zap.DebugLevel)
}
if *logProdPtr {
encoderCfg := zap.NewProductionEncoderConfig()
encoderCfg.EncodeTime = zapcore.ISO8601TimeEncoder
logger = zap.New(zapcore.NewCore(
zapcore.NewJSONEncoder(encoderCfg),
zapcore.Lock(os.Stdout),
zapLevel,
))
} else {
logger = zap.New(zapcore.NewCore(
zapcore.NewConsoleEncoder(zap.NewDevelopmentEncoderConfig()),
zapcore.Lock(os.Stdout),
zapLevel,
))
if err := app.Run(os.Args); err != nil {
log.Fatal(err)
}
}

defer func() { _ = logger.Sync() }()
log := logger.Sugar()
func runCollector(cCtx *cli.Context) error {
var (
debug = cCtx.Bool("debug")
outDir = cCtx.String("out")
uid = cCtx.String("uid")
checkNodeURI = cCtx.String("check-node")
nodeURIs = cCtx.StringSlice("node")
blxAuth = cCtx.StringSlice("blx")
edenAuth = cCtx.StringSlice("eden")
chainboundAuth = cCtx.StringSlice("chainbound")
)

if *logServicePtr != "" {
log = log.With("service", *logServicePtr)
}

if *outDirPtr == "" {
log.Fatal("No output directory set (use -out <path>)")
}
// Logger setup
log := common.GetLogger(debug, false)
defer func() { _ = log.Sync() }()

if *uidPtr == "" {
*uidPtr = shortuuid.New()[:6]
if uid == "" {
uid = shortuuid.New()[:6]
}

if *nodesPtr == "" && *blxAuthToken == "" && *edenAuthToken == "" {
if len(nodeURIs) == 0 && len(blxAuth) == 0 && len(edenAuth) == 0 && len(chainboundAuth) == 0 {
log.Fatal("No nodes, bloxroute, or eden token set (use -nodes <url1>,<url2> / -blx-token <token> / -eden-token <token>)")
}

nodes := []string{}
if *nodesPtr != "" {
nodes = strings.Split(*nodesPtr, ",")
}

log.Infow("Starting mempool-collector", "version", version, "outDir", *outDirPtr, "uid", *uidPtr)
log.Infow("Starting mempool-collector", "version", version, "outDir", outDir, "uid", uid)

aliases := common.SourceAliasesFromEnv()
if len(aliases) > 0 {
Expand All @@ -108,14 +119,14 @@ func main() {

// Start service components
opts := collector.CollectorOpts{
Log: log,
UID: *uidPtr,
Nodes: nodes,
OutDir: *outDirPtr,
BloxrouteAuthToken: *blxAuthToken,
EdenAuthToken: *edenAuthToken,
ChainboundAPIKey: *chainboundAPIKey,
CheckNodeURI: *checkNodeURI,
Log: log,
UID: uid,
OutDir: outDir,
CheckNodeURI: checkNodeURI,
Nodes: nodeURIs,
BloxrouteAuth: blxAuth,
EdenAuth: edenAuth,
ChainboundAuth: chainboundAuth,
}

collector.Start(&opts)
Expand All @@ -125,4 +136,5 @@ func main() {
signal.Notify(exit, os.Interrupt, syscall.SIGTERM)
<-exit
log.Info("bye")
return nil
}
64 changes: 29 additions & 35 deletions collector/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,64 +13,58 @@ type CollectorOpts struct {
OutDir string
CheckNodeURI string

BloxrouteAuthToken string
EdenAuthToken string
ChainboundAPIKey string
BloxrouteAuth []string
EdenAuth []string
ChainboundAuth []string
}

// Start kicks off all the service components in the background
func Start(opts *CollectorOpts) {
processor := NewTxProcessor(TxProcessorOpts{
Log: opts.Log,
OutDir: opts.OutDir,
UID: opts.UID,
OutDir: opts.OutDir,
CheckNodeURI: opts.CheckNodeURI,
})
go processor.Start()

// Regular nodes
for _, node := range opts.Nodes {
conn := NewNodeConnection(opts.Log, node, processor.txC)
conn.StartInBackground()
}

if opts.BloxrouteAuthToken != "" {
blxOpts := BlxNodeOpts{ //nolint:exhaustruct
// Bloxroute
for _, auth := range opts.BloxrouteAuth {
token, url := common.GetAuthTokenAndURL(auth)
startBloxrouteConnection(BlxNodeOpts{
TxC: processor.txC,
Log: opts.Log,
AuthHeader: opts.BloxrouteAuthToken,
URL: blxDefaultURL, // URL is taken from ENV vars
}

// start Websocket or gRPC subscription depending on URL
if common.IsWebsocketProtocol(blxOpts.URL) {
blxConn := NewBlxNodeConnection(blxOpts, processor.txC)
go blxConn.Start()
} else {
blxConn := NewBlxNodeConnectionGRPC(blxOpts, processor.txC)
go blxConn.Start()
}
AuthHeader: token,
URL: url,
})
}

if opts.EdenAuthToken != "" {
edenOpts := EdenNodeOpts{ //nolint:exhaustruct
// Eden
for _, auth := range opts.EdenAuth {
token, url := common.GetAuthTokenAndURL(auth)
startEdenConnection(EdenNodeOpts{
TxC: processor.txC,
Log: opts.Log,
AuthHeader: opts.EdenAuthToken,
URL: edenDefaultURL, // URL is taken from ENV vars
}
if common.IsWebsocketProtocol(edenOpts.URL) {
edenConn := NewEdenNodeConnection(edenOpts, processor.txC)
go edenConn.Start()
} else {
edenConn := NewEdenNodeConnectionGRPC(edenOpts, processor.txC)
go edenConn.Start()
}
AuthHeader: token,
URL: url,
})
}

if opts.ChainboundAPIKey != "" {
opts := ChainboundNodeOpts{ //nolint:exhaustruct
// Chainbound
for _, auth := range opts.ChainboundAuth {
token, url := common.GetAuthTokenAndURL(auth)
chainboundConn := NewChainboundNodeConnection(ChainboundNodeOpts{
TxC: processor.txC,
Log: opts.Log,
APIKey: opts.ChainboundAPIKey,
}
chainboundConn := NewChainboundNodeConnection(opts, processor.txC)
APIKey: token,
URL: url,
})
go chainboundConn.Start()
}
}
6 changes: 3 additions & 3 deletions collector/consts.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ const (
)

var (
// Bloxroute URL - local Gateway GRPC port or websocket URI (https://docs.bloxroute.com/introduction/cloud-api-ips)
blxDefaultURL = common.GetEnv("BLX_URI", "127.0.0.1:1001")
// Bloxroute URL - Websocket URI or Gateway GRPC URI (https://docs.bloxroute.com/introduction/cloud-api-ips)
blxDefaultURL = common.GetEnv("BLX_URI", "wss://germany.eth.blxrbdn.com/ws")

// Eden URL: https://docs.edennetwork.io/eden-rpc/speed-rpc
// Eden URL - https://docs.edennetwork.io/eden-rpc/speed-rpc
edenDefaultURL = common.GetEnv("EDEN_URI", "wss://speed-eu-west.edennetwork.io")

// Chainbound Fiber URL
Expand Down
Loading

0 comments on commit 8ac0621

Please sign in to comment.