Skip to content

Commit

Permalink
collector ⚡️
Browse files Browse the repository at this point in the history
  • Loading branch information
metachris committed Aug 8, 2023
1 parent bc63a52 commit 98a4dd5
Show file tree
Hide file tree
Showing 6 changed files with 92 additions and 225 deletions.
49 changes: 33 additions & 16 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,17 +1,20 @@
# Mempool Archiver
# Mempool Dumpster 🗑️♻️

[![Goreport status](https://goreportcard.com/badge/github.com/flashbots/mempool-archiver)](https://goreportcard.com/report/github.com/flashbots/mempool-archiver)
[![Test status](https://github.com/flashbots/mempool-archiver/workflows/Checks/badge.svg?branch=main)](https://github.com/flashbots/mempool-archiver/actions?query=workflow%3A%22Checks%22)

Collect mempool transactions from EL nodes, and archive them in [Parquet](https://github.com/apache/parquet-format)/CSV/JSON format.
Dump mempool transactions from EL nodes, and archive them in [Parquet](https://github.com/apache/parquet-format) and CSV format.

- Parquet: [Transaction metadata](summarizer/types.go) (timestamp in millis, hash, most relevant attributes)
- CSV: Raw transactions (RLP hex + timestamp in millis + tx hash)

---

**Notes:**

- This is work in progress and under heavy development.
- Seeing about 80k - 100k unique new mempool transactions per hour, on average ~1.2kb per rawTx (as of 2023-08-07).
- See also: [discussion about storage size and other considerations](https://github.com/flashbots/mempool-archiver/issues/1)
- Seeing about 90k - 140k unique new mempool transactions per hour, on average ~1.2kb per rawTx (as of 2023-08-07).
- See also: [discussion about compression](https://github.com/flashbots/mempool-archiver/issues/2) and [storage](https://github.com/flashbots/mempool-archiver/issues/1)

---

Expand All @@ -25,7 +28,7 @@ Collect mempool transactions from EL nodes, and archive them in [Parquet](https:

Default filename:

- Schema: `<out_dir>/<date>/transactions/tx-txs-<bucket_start_datetime>.csv`
- Schema: `<out_dir>/<date>/transactions/txs-<datetime>.csv`
- Example: `out/2023-08-07/transactions/txs-2023-08-07-10-00.csv`

**Running the mempool collector:**
Expand All @@ -40,9 +43,11 @@ go run cmd/collector/main.go -out ./out -nodes ws://server1.com:8546,ws://server

## Summarizer

(not yet working)
WIP

Iterates over an collector output directory, and creates summary file in Parquet / CSV format with extracted transaction data: `Timestamp`, `Hash`, `ChainID`, `From`, `To`, `Value`, `Nonce`, `Gas`, `GasPrice`, `GasTipCap`, `GasFeeCap`, `DataSize`, `Data4Bytes`
- Iterates over collector output directory
- Creates summary file in Parquet format with [key transaction attributes](summarizer/types.go)
- TODO: create archive from output of multiple collectors

```bash
go run cmd/summarizer/main.go -h
Expand All @@ -58,7 +63,12 @@ go run cmd/summarizer/main.go -h
- Keep it simple and stupid
- Vendor-agnostic (main flow should work on any server, independent of a cloud provider)
- Downtime-resilience to minimize any gaps in the archive
- Multiple collector instances can run concurrently without getting into each others way
- Multiple collector instances can run concurrently, without getting into each others way
- Summarizer script produces the final archive (based on the input of multiple collector outputs)
- The final archive:
- Includes (1) parquet file with transaction metadata, and (2) compressed file of raw transaction CSV files
- Compatible with [Clickhouse](https://clickhouse.com/docs/en/integrations/s3) and [S3 Select](https://docs.aws.amazon.com/AmazonS3/latest/userguide/selecting-content-from-objects.html) (Parquet using gzip compression)
- Easily distributable as torrent

## Mempool Collector

Expand Down Expand Up @@ -96,18 +106,25 @@ make fmt

---

## Open questions
## TODO

Storage & compression:
Lots, this is WIP

1. Summary files (CSV, Parquet)
a. Store with or without signature (~160b which is often about 50% of an entry)
b. Compress? (might impact usability as Clickhouse backend or S3 Select)
1. Parquet files: store with fields as strings (like in JSON), or in native data types? (native might be smaller size, but harder to query/parse)
maybe:

- http server to add/remove nodes, see stats?
- built-in torrent seeder?
- built-in p2p mempool listener?

---

## TODO
## License

Lots, this is WIP
MIT

---

## Maintainers

- [metachris](https://twitter.com/metachris)
- [0x416e746f6e](https://github.com/0x416e746f6e)
168 changes: 59 additions & 109 deletions cmd/summarizer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,20 @@ package main

import (
"bufio"
"compress/gzip"
"errors"
"flag"
"fmt"
"io"
"os"
"path/filepath"
"runtime"
"strconv"
"strings"

"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/rlp"
"github.com/flashbots/mempool-archiver/collector"
"github.com/flashbots/mempool-archiver/summarizer"
jsoniter "github.com/json-iterator/go"
"github.com/xitongsys/parquet-go-source/local"
"github.com/xitongsys/parquet-go/parquet"
"github.com/xitongsys/parquet-go/writer"
Expand Down Expand Up @@ -74,16 +73,18 @@ func main() {
defer func() { _ = logger.Sync() }()
log = logger.Sugar()

log.Infow("Starting mempool-archiver", "version", version, "dir", *dirPtr)
log.Infow("Starting mempool-summarizer", "version", version)

if *dirPtr == "" {
log.Fatal("-dir argument is required")
}
log.Infof("Input directory: %s", *dirPtr)

if *outDirPtr == "" {
*outDirPtr = *dirPtr
log.Infof("Using %s as output directory", *outDirPtr)
// log.Infof("Using %s as output directory", *outDirPtr)
}
log.Infof("Output directory: %s", *outDirPtr)

archiveDirectory()
}
Expand All @@ -102,37 +103,9 @@ func archiveDirectory() { //nolint:gocognit
return
}

// Create output files
fnFileList := filepath.Join(*outDirPtr, "filelist.txt.gz")
log.Infof("Writing file list to %s", fnFileList)
_fFileList, err := os.Create(fnFileList)
if err != nil {
log.Errorw("os.Create", "error", err)
return
}
_fFileListGz := gzip.NewWriter(_fFileList)
fFileListGz := bufio.NewWriter(_fFileListGz)

// var csvWriter *csv.Writer
// if *saveCSV {
// fnCSV := filepath.Join(*outDirPtr, "summary.csv")
// log.Infof("Writing CSV to %s", fnCSV)
// fCSV, err := os.Create(fnCSV)
// if err != nil {
// log.Errorw("os.Create", "error", err)
// return
// }
// csvWriter = csv.NewWriter(fCSV)
// err = csvWriter.Write(summarizer.CSVHeader)
// if err != nil {
// log.Errorw("csvWriter.Write", "error", err)
// return
// }
// }

// Setup parquet writer
fnParquet := filepath.Join(*outDirPtr, "summary.parquet")
log.Infof("Writing parquet to %s", fnParquet)
fnParquet := filepath.Join(*outDirPtr, "transactions.parquet")
log.Infof("Parquet output: %s", fnParquet)
fw, err := local.NewLocalFileWriter(fnParquet)
if err != nil {
log.Fatal("Can't create parquet file", "error", err)
Expand All @@ -141,93 +114,73 @@ func archiveDirectory() { //nolint:gocognit
if err != nil {
log.Fatal("Can't create parquet writer", "error", err)
}

// Parquet config: https://parquet.apache.org/docs/file-format/configurations/
pw.RowGroupSize = 128 * 1024 * 1024 // 128M
pw.PageSize = 8 * 1024 // 8K
pw.CompressionType = parquet.CompressionCodec_SNAPPY

log.Infof("Counting files...")
cnt := 0
err = filepath.Walk(*dirPtr, func(file string, fi os.FileInfo, err error) error {
if err != nil {
log.Errorw("filepath.Walk", "error", err)
return nil
}

if fi.IsDir() || filepath.Ext(file) != ".json" {
return nil
}

cnt += 1
return nil
})
if err != nil {
log.Errorw("filepath.Walk", "error", err)
}
log.Infof("Found %d files", cnt)
// Parquet compression: must be gzip for compatibility with both Clickhouse and S3 Select
pw.CompressionType = parquet.CompressionCodec_GZIP

// Process files
cntProcessed := 0
cntProcessedFiles := 0
cntProcessedTx := 0
err = filepath.Walk(*dirPtr, func(file string, fi os.FileInfo, err error) error {
if err != nil {
log.Errorw("filepath.Walk", "error", err)
return nil
}

if fi.IsDir() || filepath.Ext(file) != ".json" {
if fi.IsDir() || filepath.Ext(file) != ".csv" {
return nil
}

log.Debug(file)
cntProcessed += 1
if cntProcessed%10000 == 0 {
log.Infof("Processing file %d/%d", cntProcessed, cnt)
}
if cntProcessed%100000 == 0 {
PrintMemUsage()
}
log.Infof("Processing: %s", file)
cntProcessedFiles += 1

fn := strings.Replace(file, *dirPtr, "", 1)
_, err = fFileListGz.WriteString(fn + "\n")
readFile, err := os.Open(file)
if err != nil {
log.Errorw("fFileList.WriteString", "error", err)
}

dat, err := os.ReadFile(file)
if err != nil {
log.Errorw("os.ReadFile", "error", err)
log.Errorw("os.Open", "error", err, "file", file)
return nil
}
defer readFile.Close()

fileReader := bufio.NewReader(readFile)
for {
l, err := fileReader.ReadString('\n')
if len(l) == 0 && err != nil {
if errors.Is(err, io.EOF) {
break
}
log.Errorw("fileReader.ReadString", "error", err)
break
}

json := jsoniter.ConfigCompatibleWithStandardLibrary
var tx collector.TxDetail
err = json.Unmarshal(dat, &tx)
if err != nil {
if strings.HasPrefix(err.Error(), "Unmarshal: there are bytes left after unmarshal") { // this error still unmarshals correctly
log.Warnw("json.Unmarshal", "error", err, "fn", file)
} else {
log.Errorw("json.Unmarshal", "error", err, "fn", file)
return nil
l = strings.Trim(l, "\n")
items := strings.Split(l, ",")
if len(items) != 3 {
log.Errorw("invalid line", "line", l)
continue
}
}

txSummary, err := parseTx(tx)
if err != nil {
log.Errorw("parseTx", "error", err, "fn", file)
return nil
}
txSummary, err := parseTx(items[0], items[1], items[2])
if err != nil {
log.Errorw("parseTx", "error", err, "line", l)
continue
}

// if *saveCSV {
// err = csvWriter.Write(summarizer.TxDetailToCSV(tx, false))
// if err != nil {
// log.Errorw("csvWriter.Write", "error", err)
// }
// }
if err = pw.Write(txSummary); err != nil {
log.Errorw("parquet.Write", "error", err)
}

if err = pw.Write(txSummary); err != nil {
log.Errorw("parquet.Write", "error", err)
cntProcessedTx += 1
}

if *limit > 0 && cntProcessed%*limit == 0 {
// if err := fileScanner.Err(); err != nil {
// log.Errorw("fileScanner.Scan", "error", err)
// }

if *limit > 0 && cntProcessedFiles%*limit == 0 {
return errLimitReached
}
return nil
Expand All @@ -241,15 +194,7 @@ func archiveDirectory() { //nolint:gocognit
}
fw.Close()

fFileListGz.Flush()
_fFileListGz.Close()
_fFileList.Close()

// if *saveCSV {
// csvWriter.Flush()
// }

log.Infof("Finished processing %d JSON files", cntProcessed)
log.Infof("Finished processing %d CSV files, %d transactions", cntProcessedFiles, cntProcessedTx)
}

func PrintMemUsage() {
Expand All @@ -259,8 +204,13 @@ func PrintMemUsage() {
log.Info(s)
}

func parseTx(txDetail collector.TxDetail) (summarizer.TxSummaryEntry, error) {
rawTxBytes, err := hexutil.Decode(txDetail.RawTx)
func parseTx(timestampMs, hash, rawTx string) (summarizer.TxSummaryEntry, error) {
ts, err := strconv.Atoi(timestampMs)
if err != nil {
return summarizer.TxSummaryEntry{}, err
}

rawTxBytes, err := hexutil.Decode(rawTx)
if err != nil {
return summarizer.TxSummaryEntry{}, err
}
Expand Down Expand Up @@ -290,7 +240,7 @@ func parseTx(txDetail collector.TxDetail) (summarizer.TxSummaryEntry, error) {
}

return summarizer.TxSummaryEntry{
Timestamp: txDetail.Timestamp,
Timestamp: int64(ts),
Hash: tx.Hash().Hex(),

ChainID: tx.ChainId().String(),
Expand Down
30 changes: 0 additions & 30 deletions collector/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,34 +16,4 @@ type TxDetail struct {
Timestamp int64 `json:"timestamp"`
Hash string `json:"hash"`
RawTx string `json:"rawTx"`

// ChainID string `json:"-"` // `json:"chainId"`
// From string `json:"-"` // `json:"from"`
// To string `json:"-"` // `json:"to"`
// Value string `json:"-"` // `json:"value"`
// Nonce uint64 `json:"-"` // `json:"nonce"`

// Gas uint64 `json:"-"` // `json:"gas"`
// GasPrice string `json:"-"` // `json:"gasPrice"`
// GasTipCap string `json:"-"` // `json:"gasTipCap"`
// GasFeeCap string `json:"-"` // `json:"gasFeeCap"`

// DataSize int64 `json:"-"` // `json:"dataSize"`
// Data4Bytes string `json:"-"` // `json:"data4Bytes"`
// // AccessList string `json:"accessList"`
// // BlobGas string `json:"blobGas"`
// // BlobGasFeeCap string `json:"blobGasFeeCap"`
// // BlobHashes string `json:"blobHashes"`

// // Signature
// V string `json:"-"`
// R string `json:"-"`
// S string `json:"-"`
}

// TxDetailCSVHeader is a CSV header for TxDetail
var TxDetailCSVHeader []string = []string{
"timestamp",
"hash",
"rawTx",
}
Loading

0 comments on commit 98a4dd5

Please sign in to comment.