Skip to content

Commit

Permalink
feat: support for tracer=callTrace (#30)
Browse files Browse the repository at this point in the history
  • Loading branch information
antazoey authored Dec 12, 2022
1 parent b042c1f commit 9e636b6
Show file tree
Hide file tree
Showing 10 changed files with 612 additions and 224 deletions.
5 changes: 4 additions & 1 deletion evm_trace/__init__.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
from .base import CallTreeNode, CallType, TraceFrame, get_calltree_from_geth_trace
from .base import CallTreeNode
from .enums import CallType
from .geth import TraceFrame, get_calltree_from_geth_call_trace, get_calltree_from_geth_trace
from .parity import ParityTrace, ParityTraceList, get_calltree_from_parity_trace

__all__ = [
"CallTreeNode",
"CallType",
"get_calltree_from_geth_trace",
"get_calltree_from_geth_call_trace",
"get_calltree_from_parity_trace",
"ParityTrace",
"ParityTraceList",
Expand Down
194 changes: 47 additions & 147 deletions evm_trace/base.py
Original file line number Diff line number Diff line change
@@ -1,178 +1,78 @@
import math
from typing import Dict, Iterator, List, Optional
from typing import List, Optional

from eth_utils import to_int
from ethpm_types import BaseModel, HexBytes
from pydantic import Field, validator
from pydantic import validator

from evm_trace.display import get_tree_display
from evm_trace.enums import CallType


class TraceFrame(BaseModel):
pc: int
op: str
gas: int
gas_cost: int = Field(alias="gasCost")
depth: int
stack: List[HexBytes] = []
memory: List[HexBytes] = []
storage: Dict[HexBytes, HexBytes] = {}

@validator("pc", "gas", "gas_cost", "depth", pre=True)
def validate_ints(cls, value):
return int(value, 16) if isinstance(value, str) else value


class CallTreeNode(BaseModel):
"""
A higher-level object modeling a node in an execution call tree.
Used by both Geth-style and Parity-style low-level data structures.
"""

call_type: CallType
"""The type of call."""

address: HexBytes = HexBytes("")
"""The contract address of the call."""

value: int = 0
"""The amount of value sent on the call."""

depth: int = 0
"""
The number of external jumps away the initially called contract (starts at 0).
"""

gas_limit: Optional[int] = None
"""
The total amount of gas available.
"""

gas_cost: Optional[int] = None # calculated from call starting and return
"""The cost to execute this opcode."""

calldata: HexBytes = HexBytes("")
"""Transaction calldata (inputs)."""

returndata: HexBytes = HexBytes("")
"""Transaction returndata (outputs)."""

calls: List["CallTreeNode"] = []
"""The list of external sub-calls this call makes."""

selfdestruct: bool = False
"""Whether this is a SELFDESTRUCT opcode or not."""

failed: bool = False
"""Whether the call failed or not."""

def __str__(self) -> str:
return get_tree_display(self)
try:
return get_tree_display(self)
except Exception as err:
return f"CallTreeNode (display_err={err})"

def __repr__(self) -> str:
return str(self)

def __getitem__(self, index: int) -> "CallTreeNode":
return self.calls[index]

@validator("calldata", "returndata", "address", pre=True)
def validate_bytes(cls, value):
return HexBytes(value) if isinstance(value, str) else value

def get_calltree_from_geth_trace(
trace: Iterator[TraceFrame], show_internal: bool = False, **root_node_kwargs
) -> CallTreeNode:
"""
Creates a CallTreeNode from a given transaction trace.
Args:
trace (Iterator[TraceFrame]): Iterator of transaction trace frames.
show_internal (bool): Boolean whether to display internal calls.
Defaults to ``False``.
root_node_kwargs (dict): Keyword arguments passed to the root ``CallTreeNode``.
Returns:
:class:`~evm_trace.base.CallTreeNode`: Call tree of transaction trace.
"""

return _create_node_from_call(
trace=trace,
show_internal=show_internal,
**root_node_kwargs,
)


def _extract_memory(offset: HexBytes, size: HexBytes, memory: List[HexBytes]) -> HexBytes:
"""
Extracts memory from the EVM stack.
Args:
offset (HexBytes): Offset byte location in memory.
size (HexBytes): Number of bytes to return.
memory (List[HexBytes]): Memory stack.
Returns:
HexBytes: Byte value from memory stack.
"""

size_int = to_int(size)

if size_int == 0:
return HexBytes("")

offset_int = to_int(offset)

# Compute the word that contains the first byte
start_word = math.floor(offset_int / 32)
# Compute the word that contains the last byte
stop_word = math.ceil((offset_int + size_int) / 32)

end_index = stop_word + 1
byte_slice = b"".join(memory[start_word:end_index])
offset_index = offset_int % 32

# NOTE: Add 4 for the selector.

end_bytes_index = offset_index + size_int
return_bytes = byte_slice[offset_index:end_bytes_index]
return HexBytes(return_bytes)

@validator("value", "depth", pre=True)
def validate_ints(cls, value):
if not value:
return 0

def _create_node_from_call(
trace: Iterator[TraceFrame], show_internal: bool = False, **node_kwargs
) -> CallTreeNode:
"""
Use specified opcodes to create a branching callnode
https://www.evm.codes/
"""
return int(value, 16) if isinstance(value, str) else value

if show_internal:
raise NotImplementedError()

node = CallTreeNode(**node_kwargs)
for frame in trace:
if frame.op in ("CALL", "DELEGATECALL", "STATICCALL"):
child_node_kwargs = {
"address": frame.stack[-2][-20:],
"depth": frame.depth,
"gas_limit": int(frame.stack[-1].hex(), 16),
"gas_cost": frame.gas_cost,
}

# TODO: Validate gas values

if frame.op == "CALL":
child_node_kwargs["call_type"] = CallType.CALL
child_node_kwargs["value"] = int(frame.stack[-3].hex(), 16)
child_node_kwargs["calldata"] = _extract_memory(
offset=frame.stack[-4], size=frame.stack[-5], memory=frame.memory
)
elif frame.op == "DELEGATECALL":
child_node_kwargs["call_type"] = CallType.DELEGATECALL
child_node_kwargs["calldata"] = _extract_memory(
offset=frame.stack[-3], size=frame.stack[-4], memory=frame.memory
)
else:
child_node_kwargs["call_type"] = CallType.STATICCALL
child_node_kwargs["calldata"] = _extract_memory(
offset=frame.stack[-3], size=frame.stack[-4], memory=frame.memory
)

child_node = _create_node_from_call(
trace=trace, show_internal=show_internal, **child_node_kwargs
)
node.calls.append(child_node)

# TODO: Handle internal nodes using JUMP and JUMPI

elif frame.op == "SELFDESTRUCT":
# TODO: Handle the internal value transfer
node.selfdestruct = True
break

elif frame.op == "STOP":
# TODO: Handle "execution halted" vs. gas limit reached
break

elif frame.op in ("RETURN", "REVERT") and not node.returndata:
node.returndata = _extract_memory(
offset=frame.stack[-1], size=frame.stack[-2], memory=frame.memory
)
# TODO: Handle "execution halted" vs. gas limit reached
node.failed = frame.op == "REVERT"
break

# TODO: Handle invalid opcodes (`node.failed = True`)
# NOTE: ignore other opcodes

# TODO: Handle "execution halted" vs. gas limit reached

return node
@validator("gas_limit", "gas_cost", pre=True)
def validate_optional_ints(cls, value):
return int(value, 16) if isinstance(value, str) else value
Loading

0 comments on commit 9e636b6

Please sign in to comment.