Skip to content

Commit

Permalink
refactor: flush logic (#89)
Browse files Browse the repository at this point in the history
* Refactor flush logic

* Update README

* Check bounds
  • Loading branch information
joelsmith-2019 authored May 1, 2024
1 parent 7f4149b commit 580a328
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 29 deletions.
39 changes: 23 additions & 16 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,21 +20,28 @@ Sample configs can be found in [config](config).

### Flush Interval

Using the `--flush-interval` flag will run a flush on all paths every `duration`; ex `--flush-interval 5m`
Using the `--flush-interval` flag will run a flush on all chains every `duration`; ex `--flush-interval 5m`

The relayer will keep track of the latest flushed block. The first time the flush is run, the flush will start at the chains latest height - lookback period and flush up until height of the chain when the flush started. It will then store the height the flush ended on.
The first time the flush is run per chain, the flush will start at the chains `latest height - (2 * lookback period)`. The flush will always finish at the `latest chain height - lookback period`. This allows the flush to lag behind the chain so that the flush does not compete for transactions that are actively being processed. For subsequent flushes, each chain will reference its last flushed block, start from there and flush to the `latest chain height - lookback period` again. The flushing process will continue as long as the relayer is running.

After that, it will flush from the last stored height - lookback period up until the latest height of the chain.
For best results and coverage, the lookback period in blocks should correspond to the flush interval. If a chain produces 1 block a second and the flush interval is set to 30 minutes (1800 seconds), the lookback period should be at least 1800 blocks. When in doubt, round up and add a small buffer.

#### Examples

Consider a 30 minute flush interval (1800 seconds)
- Ethereum: 12 second blocks = (1800 / 12) = `150 blocks`
- Polygon: 2 second blocks = (1800 / 2) = `900 blocks`
- Arbitrum: 0.26 second blocks = (1800 / 0.26) = `~6950 blocks`

### Prometheus Metrics

By default, metrics are exported at on port :2112/metrics (`http://localhost:2112/metrics`). You can customize the port using the `--metrics-port` flag.

| **Exported Metric** | **Description** | **Type** |
|-------------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------|----------|
| cctp_relayer_wallet_balance | Current balance of a relayer wallet in Wei.<br><br>Noble balances are not currently exported b/c `MsgReceiveMessage` is free to submit on Noble. | Gauge |
| cctp_relayer_chain_latest_height | Current height of the chain. | Gauge |
| cctp_relayer_broadcast_errors_total | The total number of failed broadcasts. Note: this is AFTER it retries `broadcast-retries` (config setting) number of times. | Counter |
| **Exported Metric** | **Description** | **Type** |
| ----------------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------ | -------- |
| cctp_relayer_wallet_balance | Current balance of a relayer wallet in Wei.<br><br>Noble balances are not currently exported b/c `MsgReceiveMessage` is free to submit on Noble. | Gauge |
| cctp_relayer_chain_latest_height | Current height of the chain. | Gauge |
| cctp_relayer_broadcast_errors_total | The total number of failed broadcasts. Note: this is AFTER it retries `broadcast-retries` (config setting) number of times. | Counter |

### Minter Private Keys
Minter private keys are required on a per chain basis to broadcast transactions to the target chain. These private keys can either be set in the `config.yaml` or via environment variables.
Expand Down Expand Up @@ -66,14 +73,14 @@ localhost:8000/tx/<hash>?domain=0

### State

| IrisLookupId | Status | SourceDomain | DestDomain | SourceTxHash | DestTxHash | MsgSentBytes | Created | Updated |
|:-------------|:---------|:-------------|:-----------|:--------------|:-----------|:-------------|:--------|:--------|
| 0x123 | Created | 0 | 4 | 0x123 | ABC123 | bytes... | date | date |
| 0x123 | Pending | 0 | 4 | 0x123 | ABC123 | bytes... | date | date |
| 0x123 | Attested | 0 | 4 | 0x123 | ABC123 | bytes... | date | date |
| 0x123 | Complete | 0 | 4 | 0x123 | ABC123 | bytes... | date | date |
| 0x123 | Failed | 0 | 4 | 0x123 | ABC123 | bytes... | date | date |
| 0x123 | Filtered | 0 | 4 | 0x123 | ABC123 | bytes... | date | date |
| IrisLookupId | Status | SourceDomain | DestDomain | SourceTxHash | DestTxHash | MsgSentBytes | Created | Updated |
| :----------- | :------- | :----------- | :--------- | :----------- | :--------- | :----------- | :------ | :------ |
| 0x123 | Created | 0 | 4 | 0x123 | ABC123 | bytes... | date | date |
| 0x123 | Pending | 0 | 4 | 0x123 | ABC123 | bytes... | date | date |
| 0x123 | Attested | 0 | 4 | 0x123 | ABC123 | bytes... | date | date |
| 0x123 | Complete | 0 | 4 | 0x123 | ABC123 | bytes... | date | date |
| 0x123 | Failed | 0 | 4 | 0x123 | ABC123 | bytes... | date | date |
| 0x123 | Filtered | 0 | 4 | 0x123 | ABC123 | bytes... | date | date |

### Generating Go ABI bindings

Expand Down
36 changes: 29 additions & 7 deletions ethereum/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,16 @@ func (e *Ethereum) consumeStream(
}
}

// flushMechanism looks back over the chain history every specified flushInterval.
//
// Each chain is configured with a lookback period which signifies how many blocks to look back
// at each interval. The flush mechanism will start from the last flushed block and will rescan
// the lookback period and consume all messages in that range. The flush mechanism will not flush
// all the way to the chain's latest block to avoid consuming messages that are still in the queue.
// There will be a minimum gap of the lookback period between the last flushed block and the latest block.
//
// Note: The first time the flush mechanism is run, it will set the lastFlushedBlock to the latest block
// minus twice the lookback period.
func (e *Ethereum) flushMechanism(
ctx context.Context,
logger log.Logger,
Expand All @@ -281,19 +291,31 @@ func (e *Ethereum) flushMechanism(

// initialize first lastFlushedBlock if not set
if e.lastFlushedBlock == 0 {
e.lastFlushedBlock = latestBlock - e.lookbackPeriod
e.lastFlushedBlock = latestBlock - 2*e.lookbackPeriod

if latestBlock < e.lookbackPeriod {
e.lastFlushedBlock = 0
}
}

// start from lastFlushedBlock
start := e.lastFlushedBlock
// start from the last block it flushed
startBlock := e.lastFlushedBlock

// set finish block to be latestBlock - lookbackPeriod
finishBlock := latestBlock - e.lookbackPeriod

if startBlock >= finishBlock {
logger.Debug("No new blocks to flush")
continue
}

logger.Info(fmt.Sprintf("Flush started from %d to %d", start, latestBlock))
logger.Info(fmt.Sprintf("Flush started from %d to %d (current height: %d, lookback period: %d)", startBlock, finishBlock, latestBlock, e.lookbackPeriod))

// consume from lastFlushedBlock to the latestBlock
e.getAndConsumeHistory(ctx, logger, processingQueue, messageSent, messageTransmitterAddress, messageTransmitterABI, start, latestBlock)
// consume from lastFlushedBlock to the finishBlock
e.getAndConsumeHistory(ctx, logger, processingQueue, messageSent, messageTransmitterAddress, messageTransmitterABI, startBlock, finishBlock)

// update lastFlushedBlock to the last block it flushed
e.lastFlushedBlock = latestBlock
e.lastFlushedBlock = finishBlock

logger.Info("Flush complete")

Expand Down
35 changes: 29 additions & 6 deletions noble/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,16 @@ func (n *Noble) StartListener(
<-ctx.Done()
}

// flushMechanism looks back over the chain history every specified flushInterval.
//
// Each chain is configured with a lookback period which signifies how many blocks to look back
// at each interval. The flush mechanism will start from the last flushed block and will rescan
// the lookback period and consume all messages in that range. The flush mechanism will not flush
// all the way to the chain's latest block to avoid consuming messages that are still in the queue.
// There will be a minimum gap of the lookback period between the last flushed block and the latest block.
//
// Note: The first time the flush mechanism is run, it will set the lastFlushedBlock to the latest block
// minus twice the lookback period.
func (n *Noble) flushMechanism(
ctx context.Context,
logger log.Logger,
Expand All @@ -145,19 +155,32 @@ func (n *Noble) flushMechanism(
continue
}

// initialize first lastFlushedBlock if not set
if n.lastFlushedBlock == 0 {
n.lastFlushedBlock = latestBlock
n.lastFlushedBlock = latestBlock - (2 * n.lookbackPeriod)

if latestBlock < n.lookbackPeriod {
n.lastFlushedBlock = 0
}
}
lastFlushedBlock := n.lastFlushedBlock

flushStart := lastFlushedBlock - n.lookbackPeriod
// start from the last block it flushed
startBlock := n.lastFlushedBlock

// set finish block to be latestBlock - lookbackPeriod
finishBlock := latestBlock - n.lookbackPeriod

if startBlock >= finishBlock {
logger.Debug("No new blocks to flush")
continue
}

logger.Info(fmt.Sprintf("Flush started from: %d to: %d", flushStart, latestBlock))
logger.Info(fmt.Sprintf("Flush started from %d to %d (current height: %d, lookback period: %d)", startBlock, finishBlock, latestBlock, n.lookbackPeriod))

for i := flushStart; i <= latestBlock; i++ {
for i := startBlock; i <= finishBlock; i++ {
blockQueue <- i
}
n.lastFlushedBlock = latestBlock
n.lastFlushedBlock = finishBlock

logger.Info("Flush complete")

Expand Down

0 comments on commit 580a328

Please sign in to comment.