Skip to content

Commit

Permalink
cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
metachris committed Sep 4, 2023
1 parent 56a3ea0 commit 2a87b29
Show file tree
Hide file tree
Showing 8 changed files with 45 additions and 46 deletions.
6 changes: 3 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ clean-dev:
.PHONY: build
build: clean-build
@mkdir -p build
go build -trimpath -ldflags "-X main.version=${VERSION}" -v -o ./build/collector cmd/collector/main.go
go build -trimpath -ldflags "-X main.version=${VERSION}" -v -o ./build/summerizer cmd/summarizer/main.go
go build -trimpath -ldflags "-X main.version=${VERSION}" -v -o ./build/sourcelog cmd/sourcelog/main.go
go build -trimpath -ldflags "-X main.version=${VERSION}" -v -o ./build/collect cmd/collect/*
go build -trimpath -ldflags "-X main.version=${VERSION}" -v -o ./build/merge cmd/merge/*
go build -trimpath -ldflags "-X main.version=${VERSION}" -v -o ./build/analyze cmd/analyze/*

.PHONY: website
website:
Expand Down
34 changes: 20 additions & 14 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,12 @@ Dump mempool transactions from EL nodes, and archive them in [Parquet](https://g

Output files:

1. CSV: Raw transactions (timestamp in milliseconds, tx hash, RLP hex; about 1GB / day zipped)
2. CSV: [Transaction metadata](/common/types.go#L5-L22) (~100MB / day zipped)
3. Parquet: [Transaction metadata](/common/types.go#L5-L22) (~100MB / day)
1. Raw transactions CSV (`timestamp_ms, tx_hash, rlp_hex`; about 800MB/day zipped)
1. Sourcelog CSV - list of received transactions by any source (`timestamp_ms, hash, source`; about 100MB/day zipped)
1. [Transaction metadata](/common/types.go#L5-L22) in CSV and Parquet format (~100MB/day zipped)
1. Summary file with information about transaction sources and latency ([example](https://gist.github.com/metachris/65b674b27b5d931bca77a43db4c95a02))

Mempool sources:
Available mempool sources:

1. Generic EL nodes (`newPendingTransactions`) (i.e. go-ethereum, Infura, etc.)
2. Alchemy ([`alchemy_pendingTransactions`](https://docs.alchemy.com/reference/alchemy-pendingtransactions))
Expand All @@ -20,14 +21,16 @@ Mempool sources:
Notes:

- This project is under active development, although relatively stable and ready to use in production
- Observing about 30k - 100k mempool transactions per hour (1M - 1.5M transactions per day)
- Observing about 1M - 1.5M transactions per day

---

# System architecture

1. [Mempool Collector](cmd/collector/main.go): Connects to EL nodes and writes new mempool transactions to CSV files. Multiple collector instances can run without colliding.
2. [Summarizer](cmd/summarizer/main.go): Takes collector CSV files as input, de-duplicates, sorts by timestamp and writes CSV + Parquet output files
1. [Collector](cmd/collect/main.go): Connects to EL nodes and writes new mempool transactions to CSV files. Multiple collector instances can run without colliding.
2. [Merger](cmd/merge/main.go): Takes collector CSV files as input, de-duplicates, sorts by timestamp and writes CSV + Parquet output files.
3. [Analyzer](cmd/analyze/main.go): Analyzes sourcelog CSV files and produces summary report.
4. [Website](cmd/website/main.go): Website dev-mode as well as build + upload.

---

Expand All @@ -41,33 +44,36 @@ Notes:
1. Writes `timestamp` + `hash` + `rawTx` to CSV file (one file per hour [by default](collector/consts.go))
1. Note: the collector can store transactions repeatedly, and only the summarizer will properly deduplicate them later

Default filename:
**Default filenames:**

Transactions
- Schema: `<out_dir>/<date>/transactions/txs_<date>_<uid>.csv`
- Example: `out/2023-08-07/transactions/txs_2023-08-07-10-00_collector1.csv`

Sourcelog
- Schema: `<out_dir>/<date>/sourcelog/src_<date>_<uid>.csv`
- Example: `out/2023-08-07/sourcelog/src_2023-08-07-10-00_collector1.csv`

**Running the mempool collector:**

```bash
# print help
go run cmd/collector/main.go -help

# Connect to ws://localhost:8546 and write CSVs into ./out
go run cmd/collector/main.go -out ./out

# Connect to multiple nodes
go run cmd/collector/main.go -out ./out -nodes ws://server1.com:8546,ws://server2.com:8546
```

## Summarizer
## Merger

- Iterates over collector output directory / CSV files
- Deduplicates transactions, sorts them by timestamp
- Creates:
- Summary file in Parquet and CSV format with [key transaction attributes](summarizer/types.go)
- A single, sorted and deduplicated transactions CSV file

```bash
go run cmd/summarizer/main.go -h

go run cmd/summarizer/main.go -out /mnt/data/mempool-dumpster/2023-08-12/ --out-date 2023-08-12 /mnt/data/mempool-dumpster/2023-08-12/2023-08-12_transactions/*.csv
```


Expand Down
8 changes: 4 additions & 4 deletions cmd/analyze/analyzer.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ func (a *Analyzer) Sprint() string {
out += "Transactions received: \n"
for _, src := range a.sources { // sorted iteration
if a.nTransactionsPerSource[src] > 0 {
out += fmt.Sprintf("- %-8s %8s\n", src, prettyInt64(a.nTransactionsPerSource[src]))
out += fmt.Sprintf("- %-8s %10s\n", src, prettyInt64(a.nTransactionsPerSource[src]))
}
}

Expand All @@ -191,7 +191,7 @@ func (a *Analyzer) Sprint() string {
for _, src := range a.sources {
if a.nTransactionsPerSource[src] > 0 {
cnt := a.nUniqueTxPerSource[src]
out += fmt.Sprintf("- %-8s %8s\n", src, prettyInt(int(cnt)))
out += fmt.Sprintf("- %-8s %10s\n", src, prettyInt(int(cnt)))
}
}

Expand All @@ -200,7 +200,7 @@ func (a *Analyzer) Sprint() string {
for _, src := range a.sources {
if a.nTransactionsPerSource[src] > 0 && src != referenceLocalSource {
cnt := a.nNotSeenLocalPerSource[src]
out += fmt.Sprintf("- %-8s %8s\n", src, prettyInt64(cnt))
out += fmt.Sprintf("- %-8s %10s\n", src, prettyInt64(cnt))
}
}

Expand All @@ -225,7 +225,7 @@ func (a *Analyzer) Sprint() string {
for _, bucketMS := range bucketsMS {
s := fmt.Sprintf("%d ms", bucketMS)
cnt := srcFirstBuckets[bucketMS]
out += fmt.Sprintf("- %-8s %8s (%7s) \n", s, prettyInt64(cnt), common.Int64DiffPercentFmt(cnt, int64(totalFirstBySrc)))
out += fmt.Sprintf("- %-8s %10s (%7s) \n", s, prettyInt64(cnt), common.Int64DiffPercentFmt(cnt, int64(totalFirstBySrc)))
}
}

Expand Down
2 changes: 1 addition & 1 deletion cmd/collect/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ var (
outDirPtr = flag.String("out", "", "path to collect raw transactions into")
uidPtr = flag.String("uid", "", "collector uid (part of output CSV filename)")
blxAuthToken = flag.String("blx-token", defaultblxAuthToken, "bloxroute auth token (optional)")
sourcelog = flag.Bool("source-log", false, "write a CSV with all received transactions from any source (timestamp_ms,hash,source)")
sourcelog = flag.Bool("sourcelog", false, "write a CSV with all received transactions from any source (timestamp_ms,hash,source)")
)

func main() {
Expand Down
2 changes: 1 addition & 1 deletion common/sourcelog.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func LoadSourceLogFiles(log *zap.SugaredLogger, files []string) (txs map[string]

// Collect transactions from all input files to memory
for _, filename := range files {
log.Infof("Processing: %s", filename)
log.Infof("Loading %s ...", filename)
cntProcessedFiles += 1
cntTxInFileTotal := 0

Expand Down
2 changes: 1 addition & 1 deletion common/txsfile.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ func LoadTransactionCSVFiles(log *zap.SugaredLogger, files []string) (txs map[st
cntProcessedFiles := 0
txs = make(map[string]*TxEnvelope)
for _, filename := range files {
log.Infof("Processing: %s", filename)
log.Infof("Loading %s ...", filename)
cntProcessedFiles += 1
cntTxInFileTotal := 0
cntTxInFileNew := 0
Expand Down
3 changes: 0 additions & 3 deletions scripts/upload-yesterday.sh
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,4 @@ source .env.prod
YES=1 ./scripts/upload.sh "/mnt/data/mempool-dumpster/$d"

# update website
SCRIPT_DIR=$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd )
cd $SCRIPT_DIR
cd ..
make website
34 changes: 15 additions & 19 deletions scripts/upload.sh
Original file line number Diff line number Diff line change
Expand Up @@ -47,46 +47,42 @@ if [ -z ${YES:-} ]; then
fi

#
# RAW TX
# PROCESS RAW FILES
#
# summarize raw transactions
echo "Running summarizer..."
/root/mempool-dumpster/build/summerizer -out $1 --out-date $date $1/transactions/*.csv
echo "Merging transactions..."
/root/mempool-dumpster/build/merge transactions --out $1 --fn-prefix $date $1/transactions/*.csv

echo "Merging sourcelog..."
/root/mempool-dumpster/build/merge sourcelog --out $1 --fn-prefix $date $1/sourcelog/*.csv

# compress
cd $1
echo "Compressing transaction files..."
zip "${date}_transactions.csv.zip" "${date}_transactions.csv"
zip "${date}.csv.zip" "${date}.csv"
zip "${date}_sourcelog.csv.zip" "${date}_sourcelog.csv"

# upload to Cloudflare R2 and AWS S3
echo "Uploading parquet file..."
echo "Uploading ${date}.parquet ..."
aws s3 cp --no-progress "${date}.parquet" "s3://flashbots-mempool-dumpster/ethereum/mainnet/${ym}/" --endpoint-url "https://${CLOUDFLARE_R2_ACCOUNT_ID}.r2.cloudflarestorage.com"
aws --profile aws s3 cp --no-progress "${date}.parquet" "s3://flashbots-mempool-dumpster/ethereum/mainnet/${ym}/"

echo "Uploading zipped summary CSV file..."
echo "Uploading ${date}.csv.zip CSV ..."
aws s3 cp --no-progress "${date}.csv.zip" "s3://flashbots-mempool-dumpster/ethereum/mainnet/${ym}/" --endpoint-url "https://${CLOUDFLARE_R2_ACCOUNT_ID}.r2.cloudflarestorage.com"
aws --profile aws s3 cp --no-progress "${date}.csv.zip" "s3://flashbots-mempool-dumpster/ethereum/mainnet/${ym}/"

echo "Uploading transactions file..."
echo "Uploading ${date}_transactions.csv.zip ..."
aws s3 cp --no-progress "${date}_transactions.csv.zip" "s3://flashbots-mempool-dumpster/ethereum/mainnet/${ym}/" --endpoint-url "https://${CLOUDFLARE_R2_ACCOUNT_ID}.r2.cloudflarestorage.com"
aws --profile aws s3 cp --no-progress "${date}_transactions.csv.zip" "s3://flashbots-mempool-dumpster/ethereum/mainnet/${ym}/"

#
# SOURCELOG
#
# Create sourcelog + summary
echo "Running sourcelog..."
/root/mempool-dumpster/build/sourcelog -out $1 -out-date $date $1/sourcelog/*.csv

# zip
cd $1
zip "${date}_sourcelog.csv.zip" "${date}_sourcelog.csv"

# upload to Cloudflare R2 and AWS S3
echo "Uploading ${date}_sourcelog.csv.zip ..."
aws s3 cp --no-progress "${date}_sourcelog.csv.zip" "s3://flashbots-mempool-dumpster/ethereum/mainnet/${ym}/" --endpoint-url "https://${CLOUDFLARE_R2_ACCOUNT_ID}.r2.cloudflarestorage.com"
aws --profile aws s3 cp --no-progress "${date}_sourcelog.csv.zip" "s3://flashbots-mempool-dumpster/ethereum/mainnet/${ym}/"

echo "Creating summary..."
/root/mempool-dumpster/build/analyze sourcelog --out "$1/${date}_summary.txt" $1/sourcelog/*.csv

echo "Uploading ${date}_summary.txt ..."
aws s3 cp --no-progress "${date}_summary.txt" "s3://flashbots-mempool-dumpster/ethereum/mainnet/${ym}/" --endpoint-url "https://${CLOUDFLARE_R2_ACCOUNT_ID}.r2.cloudflarestorage.com"
aws --profile aws s3 cp --no-progress "${date}_summary.txt" "s3://flashbots-mempool-dumpster/ethereum/mainnet/${ym}/"

Expand Down

0 comments on commit 2a87b29

Please sign in to comment.