Skip to content

Commit

Permalink
Merge pull request #107 from eosnetworkfoundation/kayan_ws_getlogs
Browse files Browse the repository at this point in the history
websocket: use batch request in getLogs
  • Loading branch information
elmato authored Nov 15, 2023
2 parents 969f754 + a4db0e1 commit f75ce6a
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 12 deletions.
58 changes: 51 additions & 7 deletions peripherals/eos-evm-ws-proxy/block-monitor.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ const axios = require('axios');
const {Web3} = require('web3');
const Deque = require('collections/deque');
const {num_from_id} = require('./utils');
const { clearTimeout } = require('timers');
class BlockMonitor extends EventEmitter {

constructor({ web3_rpc_endpoint, nodeos_rpc_endpoint, poll_interval, logger}) {
Expand Down Expand Up @@ -40,22 +41,63 @@ class BlockMonitor extends EventEmitter {
this.emit('block_appended', {block});
}

async getBlockWithLogs(number_) {
let number = Number(number_);

let id1 = "get_block_" + number;
let id2 = "get_logs_" + number;
let requests = [
{jsonrpc:"2.0",method:"eth_getBlockByNumber",params:["0x" + number.toString(16), true], id: id1},
{jsonrpc:"2.0",method:"eth_getLogs",params:[{fromBlock: "0x" + number.toString(16), toBlock: "0x" + number.toString(16)}], id: id2}
]
const results = await axios.post(this.web3_rpc_endpoint, requests);

if (!Array.isArray(results.data) || results.data.length != 2) {
throw new Error("invalid RPC response of [getBlock, GetPastLogs] batch request");
}
const block = results.data[0].result;
const logs = results.data[1].result;

block.logs = logs;
//console.log("RPC batch result:" + JSON.stringify(block));
return block;
}

async poll() {
let next_block = null;
try {
// need to be conservative, sometimes getLogs return empty result for head block
let head_block = await this.web3.eth.getBlock("latest", true);
let max_block_num = Number(head_block.number) - 1;

let last = this.reversible_blocks.peekBack();
if( last == undefined ) {
last = await this.web3.eth.getBlock("latest", true);
this.append_new_block(last);
last = await this.getBlockWithLogs(max_block_num);
if (last != null) {
this.append_new_block(last);
}
}

if (last != null && Number(last.number) + 1 < max_block_num) {
next_block = await this.getBlockWithLogs(Number(last.number) + 1);
} else {
next_block = null;
}

let next_block = await this.web3.eth.getBlock(last.number+BigInt(1), true);
let found_next_block = false;

while(last != null && next_block != null) {
found_next_block = true;
if(next_block.parentHash == last.hash) {
this.append_new_block(next_block);
last = next_block;
next_block = await this.web3.eth.getBlock(last.number+BigInt(1), true);

if (Number(last.number) + 1 < max_block_num) {
next_block = await this.getBlockWithLogs(Number(last.number) + 1);
} else {
next_block = null;
}

} else {
last = this.fork_last_block();
}
Expand All @@ -74,6 +116,7 @@ class BlockMonitor extends EventEmitter {
}

if(this.run == true) {
if (this.timer_id != null) clearTimeout(this.timer_id);
this.timer_id = setTimeout(() => this.poll(), this.poll_interval || 5000);
} else {
this.reversible_blocks.clear();
Expand All @@ -84,13 +127,14 @@ class BlockMonitor extends EventEmitter {
start() {
this.logger.info("BlockMonitor start");
this.run = true;
setTimeout(() => this.poll(), 0);
if (this.timer_id != null) clearTimeout(this.timer_id);
this.timer_id = setTimeout(() => this.poll(), 0);
}

stop() {
clearTimeout(this.timer_id);
this.logger.info("BlockMonitor stopping");
this.run = false;
this.run = false;
// don't clean up timeout. let poll() cleanup reversible_blocks
}

is_running() {
Expand Down
5 changes: 2 additions & 3 deletions peripherals/eos-evm-ws-proxy/subscription-server.js
Original file line number Diff line number Diff line change
Expand Up @@ -144,9 +144,8 @@ class SubscriptionServer extends EventEmitter {
// Process all `logs` subscriptions
// Get logs from the recently appended block
if(this.logs_subs.size > 0) {
const logs = await this.web3.eth.getPastLogs({fromBlock:block.number, toBlock:block.number});
this.logger.debug("LOG => ", JSON.stringify(logs, bigint_replacer));
for(const log of logs) {
this.logger.debug("LOG => ", JSON.stringify(block.logs, bigint_replacer));
for(const log of block.logs) {
for(const [subid, client] of this.logs_subs) {
if(this.logs_filter_match(client.filter, log)) {
this.send_logs_response_and_save(block, client, subid, log);
Expand Down
7 changes: 5 additions & 2 deletions tests/nodeos_eos_evm_ws_test_basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -681,8 +681,10 @@ def makeReservedEvmAddress(account):
time.sleep(0.5)
recevied_msg=ws.recv()
res=json.loads(recevied_msg)
if block_count == 0:
Utils.Print("recevied block message from websocket:" + recevied_msg)
block_json=res["params"]["result"]
num=(int)(block_json["number"])
num=block_json["number"] # number can be decimal or hex (with 0x prefix)
hash=block_json["hash"]
parent_hash=block_json["parentHash"]
Utils.Print("received block {0} from websocket, hash={1}..., parent={2}...".format(num, hash[0:8], parent_hash[0:8]))
Expand Down Expand Up @@ -727,7 +729,8 @@ def makeReservedEvmAddress(account):
res=json.loads(recevied_msg)
Utils.Print("last ws msg is:" + recevied_msg)
if ("method" in res and res["method"] == "eth_subscription"):
assert(res["params"]["result"]["transaction"]["value"] == "93000000000000000") # 0.103 - 0.01(fee)=0.093
assert(res["params"]["result"]["transaction"]["value"] == "93000000000000000" or \
res["params"]["result"]["transaction"]["value"] == "0x14a6701dc1c8000") # 0.103 - 0.01(fee)=0.093
break
try_count = try_count - 1
if (try_count == 0):
Expand Down

0 comments on commit f75ce6a

Please sign in to comment.