diff --git a/cmd/collect/main.go b/cmd/collect/main.go index f2d891c..82295e1 100644 --- a/cmd/collect/main.go +++ b/cmd/collect/main.go @@ -69,6 +69,20 @@ var ( Usage: "Chainbound API key (or api-key@url)", Category: "Sources Configuration", }, + + // Tx receivers + &cli.StringSliceFlag{ + Name: "tx-receivers", + EnvVars: []string{"TX_RECEIVERS"}, + Usage: "URL(s) to send transactions to as octet-stream over http", + Category: "Tx Receivers Configuration", + }, + &cli.StringSliceFlag{ + Name: "tx-receivers-allowed-sources", + EnvVars: []string{"TX_RECEIVERS_ALLOWED_SOURCES"}, + Usage: "sources of txs to send to receivers", + Category: "Tx Receivers Configuration", + }, } ) @@ -88,14 +102,16 @@ func main() { func runCollector(cCtx *cli.Context) error { var ( - debug = cCtx.Bool("debug") - outDir = cCtx.String("out") - uid = cCtx.String("uid") - checkNodeURI = cCtx.String("check-node") - nodeURIs = cCtx.StringSlice("node") - blxAuth = cCtx.StringSlice("blx") - edenAuth = cCtx.StringSlice("eden") - chainboundAuth = cCtx.StringSlice("chainbound") + debug = cCtx.Bool("debug") + outDir = cCtx.String("out") + uid = cCtx.String("uid") + checkNodeURI = cCtx.String("check-node") + nodeURIs = cCtx.StringSlice("node") + blxAuth = cCtx.StringSlice("blx") + edenAuth = cCtx.StringSlice("eden") + chainboundAuth = cCtx.StringSlice("chainbound") + receivers = cCtx.StringSlice("tx-receivers") + receiversAllowedSources = cCtx.StringSlice("tx-receivers-allowed-sources") ) // Logger setup @@ -119,14 +135,16 @@ func runCollector(cCtx *cli.Context) error { // Start service components opts := collector.CollectorOpts{ - Log: log, - UID: uid, - OutDir: outDir, - CheckNodeURI: checkNodeURI, - Nodes: nodeURIs, - BloxrouteAuth: blxAuth, - EdenAuth: edenAuth, - ChainboundAuth: chainboundAuth, + Log: log, + UID: uid, + OutDir: outDir, + CheckNodeURI: checkNodeURI, + Nodes: nodeURIs, + BloxrouteAuth: blxAuth, + EdenAuth: edenAuth, + ChainboundAuth: chainboundAuth, + Receivers: receivers, + ReceiversAllowedSources: receiversAllowedSources, } collector.Start(&opts) diff --git a/collector/collector.go b/collector/collector.go index 3f7c628..62a3f65 100644 --- a/collector/collector.go +++ b/collector/collector.go @@ -16,15 +16,20 @@ type CollectorOpts struct { BloxrouteAuth []string EdenAuth []string ChainboundAuth []string + + Receivers []string + ReceiversAllowedSources []string } // Start kicks off all the service components in the background func Start(opts *CollectorOpts) { processor := NewTxProcessor(TxProcessorOpts{ - Log: opts.Log, - UID: opts.UID, - OutDir: opts.OutDir, - CheckNodeURI: opts.CheckNodeURI, + Log: opts.Log, + UID: opts.UID, + OutDir: opts.OutDir, + CheckNodeURI: opts.CheckNodeURI, + HTTPReceivers: opts.Receivers, + ReceiversAllowedSources: opts.ReceiversAllowedSources, }) go processor.Start() diff --git a/collector/receiver.go b/collector/receiver.go new file mode 100644 index 0000000..69f402e --- /dev/null +++ b/collector/receiver.go @@ -0,0 +1,43 @@ +package collector + +import ( + "bytes" + "context" + "io" + "net/http" +) + +type TxReceiver interface { + SendTx(ctx context.Context, tx *TxIn) error +} + +type HTTPReceiver struct { + url string +} + +func NewHTTPReceiver(url string) *HTTPReceiver { + return &HTTPReceiver{ + url: url, + } +} + +func (r *HTTPReceiver) SendTx(ctx context.Context, tx *TxIn) error { + rawTx, err := tx.Tx.MarshalBinary() + if err != nil { + return err + } + + req, err := http.NewRequestWithContext(ctx, http.MethodPost, r.url, bytes.NewReader(rawTx)) + if err != nil { + return err + } + req.Header.Set("Content-Type", "application/octet-stream") + + res, err := http.DefaultClient.Do(req) + if err != nil { + return err + } + defer res.Body.Close() + _, err = io.Copy(io.Discard, res.Body) + return err +} diff --git a/collector/tx_processor.go b/collector/tx_processor.go index 724cc65..e3f3299 100644 --- a/collector/tx_processor.go +++ b/collector/tx_processor.go @@ -20,11 +20,17 @@ import ( "go.uber.org/zap" ) +const ( + receiverTimeout = 5 * time.Second +) + type TxProcessorOpts struct { - Log *zap.SugaredLogger - OutDir string - UID string - CheckNodeURI string + Log *zap.SugaredLogger + OutDir string + UID string + CheckNodeURI string + HTTPReceivers []string + ReceiversAllowedSources []string } type TxProcessor struct { @@ -45,6 +51,9 @@ type TxProcessor struct { checkNodeURI string ethClient *ethclient.Client + receivers []TxReceiver + receiversAllowedSources []string + lastHealthCheckCall time.Time } @@ -55,6 +64,11 @@ type OutFiles struct { } func NewTxProcessor(opts TxProcessorOpts) *TxProcessor { + receivers := make([]TxReceiver, 0, len(opts.HTTPReceivers)) + for _, r := range opts.HTTPReceivers { + receivers = append(receivers, NewHTTPReceiver(r)) + } + return &TxProcessor{ //nolint:exhaustruct log: opts.Log, // .With("uid", uid), txC: make(chan TxIn, 100), @@ -67,6 +81,9 @@ func NewTxProcessor(opts TxProcessorOpts) *TxProcessor { srcMetrics: NewMetricsCounter(), checkNodeURI: opts.CheckNodeURI, + + receivers: receivers, + receiversAllowedSources: opts.ReceiversAllowedSources, } } @@ -94,10 +111,43 @@ func (p *TxProcessor) Start() { // start listening for transactions coming in through the channel p.log.Info("Waiting for transactions...") for txIn := range p.txC { + // send tx to receivers before processing it + // this will reduce the latency for the receivers but may lead to receivers getting the same tx multiple times + // or getting txs that are incorrect + go p.sendTxToReceivers(txIn) p.processTx(txIn) } } +func (p *TxProcessor) sendTxToReceivers(txIn TxIn) { + sourceOk := false + for _, allowedSource := range p.receiversAllowedSources { + if txIn.Source == allowedSource { + sourceOk = true + break + } + } + if !sourceOk { + return + } + + ctx, cancel := context.WithTimeout(context.Background(), receiverTimeout) + defer cancel() + + var wg sync.WaitGroup + for _, r := range p.receivers { + wg.Add(1) + go func(r TxReceiver) { + defer wg.Done() + err := r.SendTx(ctx, &txIn) + if err != nil { + p.log.Errorw("failed to send tx", "error", err) + } + }(r) + } + wg.Wait() +} + func (p *TxProcessor) processTx(txIn TxIn) { tx := txIn.Tx txHashLower := strings.ToLower(tx.Hash().Hex()) diff --git a/collector/tx_processor_test.go b/collector/tx_processor_test.go index 5c32d07..a660015 100644 --- a/collector/tx_processor_test.go +++ b/collector/tx_processor_test.go @@ -1,5 +1,13 @@ package collector +import ( + "context" + "testing" + "time" + + "github.com/flashbots/mempool-dumpster/common" +) + // var testLog = common.GetLogger(true, false) // func TestBuilderAliases(t *testing.T) { @@ -7,3 +15,43 @@ package collector // txp := NewTxProcessor(testLog, tempDir, "test1") // require.Equal(t, "collector", "collector") // } + +type MockTxReceiver struct { + ReceivedTx *TxIn +} + +func (r *MockTxReceiver) SendTx(ctx context.Context, tx *TxIn) error { + r.ReceivedTx = tx + return nil +} + +func TestTxProcessor_sendTxToReceivers(t *testing.T) { + receiver := MockTxReceiver{ReceivedTx: nil} + + processor := NewTxProcessor(TxProcessorOpts{ + Log: common.GetLogger(true, false), + OutDir: "", + UID: "", + CheckNodeURI: "", + HTTPReceivers: nil, + ReceiversAllowedSources: []string{"allowed"}, + }) + processor.receivers = append(processor.receivers, &receiver) + + tx := TxIn{ + T: time.Now(), + Tx: nil, + Source: "not-allowed", + } + processor.sendTxToReceivers(tx) + + if receiver.ReceivedTx != nil { + t.Errorf("expected nil, got %v", receiver.ReceivedTx) + } + + tx.Source = "allowed" + processor.sendTxToReceivers(tx) + if receiver.ReceivedTx == nil { + t.Errorf("expected tx, got nil") + } +}