Skip to content

Commit

Permalink
pretty output
Browse files Browse the repository at this point in the history
  • Loading branch information
metachris committed Aug 31, 2023
1 parent f4a10a7 commit 0940402
Showing 1 changed file with 92 additions and 12 deletions.
104 changes: 92 additions & 12 deletions scripts/analyze-source-stats/main.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,37 @@
package main

//
// Source-Stats Summarizer takes the source-stats CSV files from the collector and summarizes them into a single CSV file.
//
// Output:
//
// date, hour, source, tx_count, tx_count_first, tx_count_unique, tx_count_unseen
//
/**
Source-Stats Summarizer takes the source-stats CSV files from the collector and summarizes them into a single CSV file.
Output (currently):
2023-08-30T20:34:47.253Z INFO Processed all input files {"files": 22, "txTotal": "627,891", "memUsedMiB": "594"}
2023-08-30T20:34:47.648Z INFO Overall tx count {"infura": "578,606", "alchemy": "568,790", "ws://localhost:8546": "593,046", "blx": "419,725"}
2023-08-30T20:34:47.696Z INFO Unique tx count {"blx": "22,403", "ws://localhost:8546": "9,962", "alchemy": "2,940", "infura": "4,658", "unique": "39,963 / 627,891"}
2023-08-30T20:34:47.816Z INFO Not seen by local node {"blx": "23,895", "infura": "9,167", "alchemy": "7,039", "notSeenByRef": "34,845 / 627,891"}
Total unique tx: 627,891
Transactions received:
- alchemy: 568,790
- blx: 419,725
- infura: 578,606
- ws://localhost:8546: 593,046
Unique tx (sole sender):
- alchemy: 2,940
- blx: 22,403
- infura: 4,658
- ws://localhost:8546: 9,962
Transactions not seen by local node: 34,845 / 627,891
- alchemy: 7,039
- blx: 23,895
- infura: 9,167
more insight ideas?
- who sent first
*/

import (
"bufio"
Expand All @@ -18,7 +43,9 @@ import (
"path/filepath"
"strconv"
"strings"
"time"

"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/flashbots/mempool-dumpster/common"
"go.uber.org/zap"
"golang.org/x/text/language"
Expand Down Expand Up @@ -92,7 +119,7 @@ func main() {
}

// summarizeSourceStats parses all input CSV files into one output CSV and one output Parquet file.
func summarizeSourceStats(files []string) { //nolint:gocognit
func summarizeSourceStats(files []string) { //nolint:gocognit,gocyclo,maintidx
// Prepare output file paths, and make sure they don't exist yet
fnStats := filepath.Join(*outDirPtr, "source-stats.csv")
if *outDatePtr != "" {
Expand Down Expand Up @@ -128,8 +155,11 @@ func summarizeSourceStats(files []string) { //nolint:gocognit
txs := make(map[string]map[string]int64) // [hash][srcTag]timestampMs
sources := make(map[string]bool)

timestampFirst, timestampLast := int64(0), int64(0)

// Collect transactions from all input files to memory
cntProcessedFiles := 0
cntProcessedRecords := 0
for _, filename := range files {
log.Infof("Processing: %s", filename)
cntProcessedFiles += 1
Expand Down Expand Up @@ -160,8 +190,6 @@ func summarizeSourceStats(files []string) { //nolint:gocognit
continue
}

// todo: check items[1] is a valid hash

cntTxInFileTotal += 1

ts, err := strconv.Atoi(items[0])
Expand All @@ -173,6 +201,25 @@ func summarizeSourceStats(files []string) { //nolint:gocognit
txHash := items[1]
txSrcTag := items[2]

// that it's a valid hash
if len(txHash) != 66 {
log.Errorw("invalid hash length", "hash", txHash)
continue
}
if _, err = hexutil.Decode(txHash); err != nil {
log.Errorw("hexutil.Decode", "error", err, "line", l)
continue
}

cntProcessedRecords += 1

if timestampFirst == 0 || txTimestamp < timestampFirst {
timestampFirst = txTimestamp
}
if txTimestamp > timestampLast {
timestampLast = txTimestamp
}

// Add source to map
sources[txSrcTag] = true

Expand All @@ -192,7 +239,12 @@ func summarizeSourceStats(files []string) { //nolint:gocognit
// break
}

log.Infow("Processed all input files", "files", cntProcessedFiles, "txTotal", printer.Sprintf("%d", len(txs)), "memUsedMiB", printer.Sprintf("%d", common.GetMemUsageMb()))
log.Infow("Processed all input files",
"files", cntProcessedFiles,
"records", printer.Sprintf("%d", cntProcessedRecords),
"txTotal", printer.Sprintf("%d", len(txs)),
"memUsedMiB", printer.Sprintf("%d", common.GetMemUsageMb()),
)

// step 1: get overall tx / source
srcCntOverallTxs := make(map[string]int64)
Expand Down Expand Up @@ -249,5 +301,33 @@ func summarizeSourceStats(files []string) { //nolint:gocognit
}
l.Infow("Not seen by local node", "notSeenByRef", printer.Sprintf("%d / %d", nNotSeenByRef, len(txs)))

// log.Infof("Finished processing %s CSV files, wrote %s transactions", printer.Sprintf("%d", cntProcessedFiles), printer.Sprintf("%d", cntTxWritten))
// convert timestamps to duration
d := time.Duration(timestampLast-timestampFirst) * time.Millisecond
t1 := time.Unix(timestampFirst/1000, 0).UTC()
t2 := time.Unix(timestampLast/1000, 0).UTC()

fmt.Println("")
fmt.Println("Input:")
fmt.Println("- Files:", printer.Sprintf("%d", cntProcessedFiles))
fmt.Printf("- Records: %s \n", printer.Sprintf("%d", cntProcessedRecords))
fmt.Printf("- Unique transactions: %s \n", printer.Sprintf("%d", len(txs)))
fmt.Printf("- Time:\n - From: %s \n - To: %s \n - Dur: %s \n", t1.String(), t2.String(), d.String())
fmt.Println("")
fmt.Printf("Transactions received (%s total) \n", printer.Sprintf("%d", len(txs)))
for srcTag, cnt := range srcCntOverallTxs {
fmt.Printf("- %-20s %10s\n", srcTag, printer.Sprintf("%d", cnt))
}
// fmt.Printf("- total unique %10s\n", )

fmt.Println("")
fmt.Println("Unique tx (sole sender):")
for srcTag, cnt := range srcCntUniqueTxs {
fmt.Printf("- %-20s %10s\n", srcTag, printer.Sprintf("%d", cnt))
}

fmt.Println("")
fmt.Println("Transactions not seen by local node:", printer.Sprintf("%d / %d", nNotSeenByRef, len(txs)))
for srcTag, cnt := range srcNotSeenByRef {
fmt.Printf("- %-20s %10s\n", srcTag, printer.Sprintf("%d", cnt))
}
}

0 comments on commit 0940402

Please sign in to comment.