Skip to content

Commit

Permalink
Mitigate boundprotocol insanities (#3164)
Browse files Browse the repository at this point in the history
* client/peer: remove bound property

* client: better typesafety for peers

* client: add TODO

* client: move boundprotocol interface

* client: remove addMethods boundProtocol

* client: fix net tests

* Update typing

* Fix many tests

* Revert typing changes

* client: fix more integration tests

* Remove unused params

---------

Co-authored-by: acolytec3 <[email protected]>
  • Loading branch information
jochem-brouwer and acolytec3 authored Nov 24, 2023
1 parent 3742539 commit 0170906
Show file tree
Hide file tree
Showing 9 changed files with 215 additions and 150 deletions.
102 changes: 47 additions & 55 deletions packages/client/src/net/peer/peer.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,9 @@
import { EventEmitter } from 'events'

import { BoundEthProtocol, BoundLesProtocol, BoundSnapProtocol } from '../protocol'

import type { Config } from '../../config'
import type {
BoundProtocol,
EthProtocolMethods,
LesProtocolMethods,
Protocol,
Sender,
SnapProtocolMethods,
} from '../protocol'
import type { BoundProtocol, Protocol, Sender } from '../protocol'
import type { Server } from '../server'

export interface PeerOptions {
Expand Down Expand Up @@ -38,17 +33,22 @@ export interface PeerOptions {
* Network peer
* @memberof module:net/peer
*/
export class Peer extends EventEmitter {
export abstract class Peer extends EventEmitter {
public config: Config
public id: string
public address: string
public inbound: boolean
public server: Server | undefined
public bound: Map<string, BoundProtocol>
protected transport: string
protected protocols: Protocol[]
protected boundProtocols: BoundProtocol[] = []
private _idle: boolean

// TODO check if this should be moved into RlpxPeer
public eth?: BoundEthProtocol
public snap?: BoundSnapProtocol
public les?: BoundLesProtocol

/*
If the peer is in the PeerPool.
If true, messages are handled immediately.
Expand All @@ -57,11 +57,6 @@ export class Peer extends EventEmitter {
*/
public pooled: boolean = false

// Dynamically bound protocol properties
public eth: (BoundProtocol & EthProtocolMethods) | undefined
public snap: (BoundProtocol & SnapProtocolMethods) | undefined
public les: (BoundProtocol & LesProtocolMethods) | undefined

/**
* Create new peer
*/
Expand All @@ -75,7 +70,6 @@ export class Peer extends EventEmitter {
this.transport = options.transport
this.inbound = options.inbound ?? false
this.protocols = options.protocols ?? []
this.bound = new Map()

this._idle = true
}
Expand All @@ -94,59 +88,57 @@ export class Peer extends EventEmitter {
this._idle = value
}

async connect(): Promise<void> {}
abstract connect(): Promise<void>

/**
* Adds a protocol to this peer given a sender instance. Protocol methods
* will be accessible via a field with the same name as protocol. New methods
* will be added corresponding to each message defined by the protocol, in
* addition to send() and request() methods that takes a message name and message
* arguments. send() only sends a message without waiting for a response, whereas
* request() also sends the message but will return a promise that resolves with
* the response payload.
* @param protocol protocol instance
* @param sender sender instance provided by subclass
* @example
* ```typescript
* await peer.bindProtocol(ethProtocol, sender)
* // Example: Directly call message name as a method on the bound protocol
* const headers1 = await peer.eth.getBlockHeaders({ block: BigInt(1), max: 100 })
* // Example: Call request() method with message name as first parameter
* const headers2 = await peer.eth.request('getBlockHeaders', { block: BigInt(1), max: 100 })
* // Example: Call send() method with message name as first parameter and
* // wait for response message as an event
* peer.eth.send('getBlockHeaders', { block: BigInt(1), max: 100 })
* peer.eth.on('message', ({ data }) => console.log(`Received ${data.length} headers`))
* ```
* Handle unhandled messages along handshake
*/
protected async bindProtocol(protocol: Protocol, sender: Sender): Promise<void> {
const bound = await protocol.bind(this, sender)
this.bound.set(bound.name, bound)
handleMessageQueue() {
this.boundProtocols.map((e) => e.handleMessageQueue())
}

/**
* Return true if peer understand the specified protocol name
* @param protocolName
*/
understands(protocolName: string): boolean {
return !!this.bound.get(protocolName)
}
async addProtocol(sender: Sender, protocol: Protocol): Promise<void> {
let bound: BoundProtocol
const boundOpts = {
config: protocol.config, // TODO: why cant we use `this.config`?
protocol,
peer: this,
sender,
}

/**
* Handle unhandled messages along handshake
*/
handleMessageQueue() {
for (const bound of this.bound.values()) {
bound.handleMessageQueue()
if (protocol.name === 'eth') {
bound = new BoundEthProtocol(boundOpts)
} else if (protocol.name === 'les') {
bound = new BoundLesProtocol(boundOpts)
} else if (protocol.name === 'snap') {
bound = new BoundSnapProtocol(boundOpts)
} else {
throw new Error(`addProtocol: ${protocol.name} protocol not supported`)
}

// Handshake only when snap, else
if (protocol.name !== 'snap') {
await bound!.handshake(sender)
} else {
if (sender.status === undefined) throw Error('Snap can only be bound on handshaked peer')
}

if (protocol.name === 'eth') {
this.eth = <BoundEthProtocol>bound
} else if (protocol.name === 'snap') {
this.snap = <BoundSnapProtocol>bound
} else if (protocol.name === 'les') {
this.les = <BoundLesProtocol>bound
}
this.boundProtocols.push(bound)
}

toString(withFullId = false): string {
const properties = {
id: withFullId ? this.id : this.id.substr(0, 8),
address: this.address,
transport: this.transport,
protocols: Array.from(this.bound.keys()),
protocols: this.boundProtocols.map((e) => e.name),
inbound: this.inbound,
}
return Object.entries(properties)
Expand Down
5 changes: 2 additions & 3 deletions packages/client/src/net/peer/rlpxpeer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ export class RlpxPeer extends Peer {
public rlpx: Devp2pRLPx | null
public rlpxPeer: Devp2pRlpxPeer | null
public connected: boolean

/**
* Create new devp2p/rlpx peer
*/
Expand Down Expand Up @@ -169,7 +168,7 @@ export class RlpxPeer extends Peer {
// handshake, and can just use the eth handshake
if (protocol && name !== 'snap') {
const sender = new RlpxSender(rlpxProtocol as Devp2pETH | Devp2pLES | Devp2pSNAP)
return this.bindProtocol(protocol, sender).then(() => {
return this.addProtocol(sender, protocol).then(() => {
if (name === 'eth') {
const snapRlpxProtocol = rlpxPeer
.getProtocols()
Expand All @@ -184,7 +183,7 @@ export class RlpxPeer extends Peer {
const snapSender = new RlpxSender(
snapRlpxProtocol as Devp2pETH | Devp2pLES | Devp2pSNAP
)
return this.bindProtocol(snapProtocol, snapSender)
return this.addProtocol(snapSender, snapProtocol)
}
}
})
Expand Down
152 changes: 134 additions & 18 deletions packages/client/src/net/protocol/boundprotocol.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,14 @@ import { Event } from '../../types'

import type { Config } from '../../config'
import type { Peer } from '../peer/peer'
import type { EthProtocolMethods } from './ethprotocol'
import type { LesProtocolMethods } from './lesprotocol'
import type { Message, Protocol } from './protocol'
import type { Sender } from './sender'
import type { AccountData, SnapProtocolMethods, StorageData } from './snapprotocol'
import type { BlockBodyBytes, BlockHeader } from '@ethereumjs/block'
import type { TypedTransaction } from '@ethereumjs/tx'
import type { TxReceipt } from '@ethereumjs/vm'

export interface BoundProtocolOptions {
/* Config */
Expand All @@ -29,7 +35,7 @@ export class BoundProtocol {
public config: Config
public name: string
private protocol: Protocol
private peer: Peer
protected peer: Peer
private sender: Sender
private versions: number[]
private timeout: number
Expand Down Expand Up @@ -71,7 +77,6 @@ export class BoundProtocol {
this.sender.on('error', (error: Error) =>
this.config.events.emit(Event.PROTOCOL_ERROR, error, this.peer)
)
this.addMethods()
}

get status(): any {
Expand Down Expand Up @@ -161,7 +166,7 @@ export class BoundProtocol {
* @param name message to wait for
* @param args message arguments
*/
async request(name: string, args: any[]): Promise<any> {
async request(name: string, args: any): Promise<any> {
const message = this.send(name, args)
let lock
if (
Expand Down Expand Up @@ -198,21 +203,132 @@ export class BoundProtocol {
}, this.timeout)
})
}
}

/**
* Add methods to the bound protocol for each protocol message that has a
* corresponding response message.
*/
addMethods() {
const messages = this.protocol.messages.filter((m) => m.response)
for (const message of messages) {
const name = message.name
const camel = name[0].toLowerCase() + name.slice(1)
;(this as any)[camel] = async (args: any[]) =>
this.request(name, args).catch((error: Error) => {
this.config.events.emit(Event.PROTOCOL_ERROR, error, this.peer)
return undefined
})
}
export class BoundEthProtocol extends BoundProtocol implements EthProtocolMethods {
name = 'eth' // public name: string

constructor(options: BoundProtocolOptions) {
super(options)
}

async getBlockHeaders(opts: {
reqId?: bigint | undefined
block: bigint | Uint8Array
max: number
skip?: number | undefined
reverse?: boolean | undefined
}): Promise<[bigint, BlockHeader[]]> {
return this.request('GetBlockHeaders', opts).catch((error: Error) => {
this.config.events.emit(Event.PROTOCOL_ERROR, error, this.peer)
return undefined
})
}
async getBlockBodies(opts: {
reqId?: bigint | undefined
hashes: Uint8Array[]
}): Promise<[bigint, BlockBodyBytes[]]> {
return this.request('GetBlockBodies', opts).catch((error: Error) => {
this.config.events.emit(Event.PROTOCOL_ERROR, error, this.peer)
return undefined
})
}
async getPooledTransactions(opts: {
reqId?: bigint | undefined
hashes: Uint8Array[]
}): Promise<[bigint, TypedTransaction[]]> {
return this.request('GetPooledTransactions', opts).catch((error: Error) => {
this.config.events.emit(Event.PROTOCOL_ERROR, error, this.peer)
return undefined
})
}
async getReceipts(opts: {
reqId?: bigint | undefined
hashes: Uint8Array[]
}): Promise<[bigint, TxReceipt[]]> {
return this.request('GetReceipts', opts).catch((error: Error) => {
this.config.events.emit(Event.PROTOCOL_ERROR, error, this.peer)
return undefined
})
}
}

export class BoundSnapProtocol extends BoundProtocol implements SnapProtocolMethods {
name = 'snap' // public name: string

constructor(options: BoundProtocolOptions) {
super(options)
}
async getAccountRange(opts: {
reqId?: bigint | undefined
root: Uint8Array
origin: Uint8Array
limit: Uint8Array
bytes: bigint
}): Promise<{ reqId: bigint; accounts: AccountData[]; proof: Uint8Array[] }> {
return this.request('GetAccountRange', opts).catch((error: Error) => {
this.config.events.emit(Event.PROTOCOL_ERROR, error, this.peer)
return undefined
})
}
async getStorageRanges(opts: {
reqId?: bigint | undefined
root: Uint8Array
accounts: Uint8Array[]
origin: Uint8Array
limit: Uint8Array
bytes: bigint
}): Promise<{
reqId: bigint
slots: StorageData[][]
proof: Uint8Array[]
}> {
return this.request('GetStorageRanges', opts).catch((error: Error) => {
this.config.events.emit(Event.PROTOCOL_ERROR, error, this.peer)
return undefined
})
}
async getByteCodes(opts: {
reqId?: bigint | undefined
hashes: Uint8Array[]
bytes: bigint
}): Promise<{ reqId: bigint; codes: Uint8Array[] }> {
return this.request('GetByteCodes', opts).catch((error: Error) => {
this.config.events.emit(Event.PROTOCOL_ERROR, error, this.peer)
return undefined
})
}
async getTrieNodes(opts: {
reqId?: bigint | undefined // so this adds a guard here if something goes wrong
// so this adds a guard here if something goes wrong
root: Uint8Array
paths: Uint8Array[][]
bytes: bigint
}): Promise<{ reqId: bigint; nodes: Uint8Array[] }> {
return this.request('GetTrieNodes', opts).catch((error: Error) => {
this.config.events.emit(Event.PROTOCOL_ERROR, error, this.peer)
return undefined
})
}
}

export class BoundLesProtocol extends BoundProtocol implements LesProtocolMethods {
name = 'les' // public name: string

constructor(options: BoundProtocolOptions) {
super(options)
}

async getBlockHeaders(opts: {
reqId?: bigint | undefined
block: bigint | Uint8Array
max: number
skip?: number | undefined
reverse?: boolean | undefined
}): Promise<{ reqId: bigint; bv: bigint; headers: BlockHeader[] }> {
return this.request('GetBlockHeaders', opts).catch((error: Error) => {
this.config.events.emit(Event.PROTOCOL_ERROR, error, this.peer)
return undefined
})
}
}
Loading

0 comments on commit 0170906

Please sign in to comment.