Skip to content

Commit

Permalink
source stats
Browse files Browse the repository at this point in the history
  • Loading branch information
metachris committed Aug 31, 2023
1 parent 3e67169 commit a14e404
Show file tree
Hide file tree
Showing 7 changed files with 236 additions and 182 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ all: build
v:
@echo "Version: ${VERSION}"

clean: clean-dev
clean-build:
rm -rf build/

clean-dev:
Expand Down
2 changes: 1 addition & 1 deletion collector/node_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func (nc *NodeConnection) Start() {
time.Sleep(5 * time.Second)
}
case tx := <-txC:
nc.txC <- TxIn{time.Now().UTC(), tx, nc.uri, nc.uriTag}
nc.txC <- TxIn{time.Now().UTC(), tx, nc.uriTag}
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion collector/node_conn_bloxroute.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,6 @@ func (nc *BlxNodeConnection) connect() {
continue
}

nc.txC <- TxIn{time.Now().UTC(), &tx, blxURI, srcTag}
nc.txC <- TxIn{time.Now().UTC(), &tx, srcTag}
}
}
6 changes: 3 additions & 3 deletions collector/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ import (
)

type TxIn struct {
T time.Time
Tx *types.Transaction
URI string
T time.Time
Tx *types.Transaction
// URI string
URITag string
}

Expand Down
1 change: 1 addition & 0 deletions scripts/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,6 @@ journalctl -u mempool-collector -o cat --since "10m ago" | grep "source_stats_al
# only specific ones, side by side
journalctl -u mempool-collector -o cat --since "10m ago" | grep "source_stats_all" | awk '{ $1=""; $2=""; $3=""; print $0}' | jq '.local + " " + .apool'

# who sent tx first
journalctl -u mempool-collector -o cat --since "1h ago" | grep "source_stats_first" | awk '{ $1=""; $2=""; $3=""; print $0}' | jq
```
204 changes: 204 additions & 0 deletions scripts/analyze-source-stats/analyzer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
package main

import (
"fmt"
"sort"
"strings"
"time"

"github.com/flashbots/mempool-dumpster/common"
)

const (
referenceSource = "local"
referenceSource2 = "apool"
referenceBloxroute = "blx"
)

var bucketsMS = []int64{1, 5, 10, 50, 100, 250, 500, 1000, 2000} // note: 0 would be equal timestamps

func prettyInt(i int) string {
return printer.Sprintf("%d", i)
}

func prettyInt64(i int64) string {
return printer.Sprintf("%d", i)
}

type Analyzer struct {
txs map[string]map[string]int64 // [hash][src] = timestamp

sources []string // sorted alphabetically
nUniqueTx int
nAllTx int

nTransactionsPerSource map[string]int64
nUniqueTxPerSource map[string]int64

nNotSeenLocalPerSource map[string]int64
nOverallNotSeenLocal int64
// nSeenOnlyByRef map[string]int64 // todo

nTxSeenBySingleSource int64

timestampFirst int64
timestampLast int64
timeFirst time.Time
timeLast time.Time
duration time.Duration

bloxrouteTxBeforeLocal map[int64]int64 // [bucket_ms] = count
nbloxrouteSeenBeforeLocalTotal int64
}

func NewAnalyzer(transactions map[string]map[string]int64) *Analyzer {
a := &Analyzer{ //nolint:exhaustruct
txs: transactions,
nTransactionsPerSource: make(map[string]int64),
nUniqueTxPerSource: make(map[string]int64),
nNotSeenLocalPerSource: make(map[string]int64),
bloxrouteTxBeforeLocal: make(map[int64]int64),
}

a.init()
return a
}

// Init does some efficient initial data analysis and preparation for later use
func (a *Analyzer) init() { //nolint:gocognit
// unique tx
a.nUniqueTx = len(a.txs)

// iterate over tx to
for _, sources := range a.txs {
// count all tx
a.nAllTx += len(sources)

// number of unique tx -- special case for local+apool
if len(sources) == 2 {
if sources[referenceSource] != 0 && sources[referenceSource2] != 0 {
a.nUniqueTxPerSource[referenceSource] += 1
a.nTxSeenBySingleSource += 1
}
}

if sources[referenceSource] == 0 {
a.nOverallNotSeenLocal += 1
}

// iterate over all sources for a given hash
for src, timestamp := range sources {
// get number of unique transactions by any single source
if len(sources) == 1 {
a.nUniqueTxPerSource[src] += 1
a.nTxSeenBySingleSource += 1
}

// remember if this transaction was not seen by the reference source
if sources[referenceSource] == 0 {
a.nNotSeenLocalPerSource[src] += 1
}

// count number of tx per source
a.nTransactionsPerSource[src] += 1

// find first and last timestamp
if a.timestampFirst == 0 || timestamp < a.timestampFirst {
a.timestampFirst = timestamp
}
if a.timestampLast == 0 || timestamp > a.timestampLast {
a.timestampLast = timestamp
}
}
}

// convert timestamps to duration and UTC time
a.duration = time.Duration(a.timestampLast-a.timestampFirst) * time.Millisecond
a.timeFirst = time.Unix(a.timestampFirst/1000, 0).UTC()
a.timeLast = time.Unix(a.timestampLast/1000, 0).UTC()

// get sorted list of sources
for src := range a.nTransactionsPerSource {
a.sources = append(a.sources, src)
}
sort.Strings(a.sources)

// bloxroute specific analysis
a.initBlx()
}

func (a *Analyzer) initBlx() {
// How much earlier were transactions received by blx vs. the local node?
for _, sources := range a.txs {
if len(sources) == 1 {
continue
}

// ensure seen by both local and blx
if _, seenByBlx := sources[referenceBloxroute]; !seenByBlx {
continue
}
if _, seenLocally := sources[referenceSource]; !seenLocally {
continue
}

blxTS := sources[referenceBloxroute]
refTS := sources[referenceSource]
diff := blxTS - refTS

if diff > 0 {
a.nbloxrouteSeenBeforeLocalTotal += 1
for _, thresholdMS := range bucketsMS {
if diff >= thresholdMS {
a.bloxrouteTxBeforeLocal[thresholdMS] += 1
}
}
}
}
}

func (a *Analyzer) Print() {
fmt.Println("")
fmt.Printf("From: %s \n", a.timeFirst.String())
fmt.Printf("To: %s \n", a.timeLast.String())
fmt.Printf(" (%s) \n", a.duration.String())
fmt.Println("")
fmt.Printf("Sources: %s \n", strings.Join(a.sources, ", "))
fmt.Println("")
fmt.Printf("- Unique transactions: %8s \n", prettyInt(a.nUniqueTx))
fmt.Printf("- All transactions: %8s \n", prettyInt(a.nAllTx))

fmt.Println("")
fmt.Printf("All transactions received: %s \n", prettyInt(a.nAllTx))
for _, src := range a.sources { // sorted iteration
if a.nTransactionsPerSource[src] > 0 {
fmt.Printf("- %-8s %10s\n", src, prettyInt64(a.nTransactionsPerSource[src]))
}
}

fmt.Println("")
fmt.Printf("Exclusive tx (single source): %s / %s (%s) \n", prettyInt64(a.nTxSeenBySingleSource), prettyInt(a.nUniqueTx), common.Int64DiffPercentFmt(a.nTxSeenBySingleSource, int64(a.nUniqueTx)))
for _, src := range a.sources {
if a.nTransactionsPerSource[src] > 0 {
cnt := a.nUniqueTxPerSource[src]
fmt.Printf("- %-8s %10s\n", src, prettyInt(int(cnt)))
}
}

fmt.Println("")
fmt.Printf("Transactions not seen by local node: %s / %s (%s)\n", prettyInt64(a.nOverallNotSeenLocal), prettyInt(a.nUniqueTx), common.Int64DiffPercentFmt(a.nOverallNotSeenLocal, int64(a.nUniqueTx)))
for _, src := range a.sources {
if a.nTransactionsPerSource[src] > 0 && src != referenceSource {
cnt := a.nNotSeenLocalPerSource[src]
fmt.Printf("- %-8s %10s\n", src, prettyInt64(cnt))
}
}

fmt.Println("")
fmt.Printf("Bloxroute transactions received before local node: %s / %s (%s) \n", prettyInt64(a.nbloxrouteSeenBeforeLocalTotal), prettyInt(a.nUniqueTx), common.Int64DiffPercentFmt(a.nbloxrouteSeenBeforeLocalTotal, int64(a.nUniqueTx)))
for _, bucketMS := range bucketsMS {
s := fmt.Sprintf("%d ms", bucketMS)
cnt := a.bloxrouteTxBeforeLocal[bucketMS]
fmt.Printf(" - %-8s %8s \n", s, prettyInt64(cnt))
}
}
Loading

0 comments on commit a14e404

Please sign in to comment.