diff --git a/examples/interactive_hyperdrive_forking_example.py b/examples/interactive_hyperdrive_forking_example.py index 47c3039ae4..a28260690f 100644 --- a/examples/interactive_hyperdrive_forking_example.py +++ b/examples/interactive_hyperdrive_forking_example.py @@ -25,7 +25,7 @@ # Launch a local anvil chain forked from the rpc uri. chain = LocalChain(fork_uri=rpc_uri, fork_block_number=fork_block_number) -hyperdrive_address = Hyperdrive.get_hyperdrive_addresses_from_registry(registry_address, chain)["sdai_14_day"] +hyperdrive_address = Hyperdrive.get_hyperdrive_addresses_from_registry(chain, registry_address)["sdai_14_day"] # Note that we use Hyperdrive here instead of LocalHyperdrive, # as LocalHyperdrive deploys a new pool, whereas we want to connect to an existing pool diff --git a/examples/interactive_remote_hyperdrive_example.py b/examples/interactive_remote_hyperdrive_example.py index a6d3026159..5cc843d894 100644 --- a/examples/interactive_remote_hyperdrive_example.py +++ b/examples/interactive_remote_hyperdrive_example.py @@ -24,7 +24,7 @@ # This is the registry address deployed on sepolia. registry_address = "0xba5156E697d39a03EDA824C19f375383F6b759EA" -hyperdrive_address = Hyperdrive.get_hyperdrive_addresses_from_registry(registry_address, chain)["sdai_14_day"] +hyperdrive_address = Hyperdrive.get_hyperdrive_addresses_from_registry(chain, registry_address)["sdai_14_day"] hyperdrive_config = Hyperdrive.Config() hyperdrive_pool = Hyperdrive(chain, hyperdrive_address, hyperdrive_config) diff --git a/scripts/testnet_checkpoint_bots.py b/scripts/testnet_checkpoint_bots.py index 670692aef1..6d65f2b389 100644 --- a/scripts/testnet_checkpoint_bots.py +++ b/scripts/testnet_checkpoint_bots.py @@ -99,7 +99,7 @@ def run_checkpoint_bot( while True: # Check if we've reached the block to exit - if block_to_exit is not None and chain.curr_block_number() >= block_to_exit: + if block_to_exit is not None and chain.block_number() >= block_to_exit: logging.info("Exiting checkpoint bot...") break @@ -107,7 +107,7 @@ def run_checkpoint_bot( # be minted. This bot waits for a portion of the checkpoint to reduce # the probability of needing a checkpoint. After the waiting period, # the bot will attempt to mint a checkpoint. - latest_block = chain.curr_block_data() + latest_block = chain.block_data() timestamp = latest_block.get("timestamp", None) if timestamp is None: raise AssertionError(f"{latest_block=} has no timestamp") @@ -158,7 +158,7 @@ def run_checkpoint_bot( if check_checkpoint: # TODO: Add crash report assert receipt["status"] == 1, "Checkpoint failed." - latest_block = chain.curr_block_data() + latest_block = chain.block_data() timestamp = latest_block.get("timestamp", None) if timestamp is None: raise AssertionError(f"{latest_block=} has no timestamp") @@ -217,7 +217,7 @@ def main(argv: Sequence[str] | None = None) -> None: while True: logging.info("Checking for new pools...") # Reset hyperdrive objs - deployed_pools = Hyperdrive.get_hyperdrive_addresses_from_registry(parsed_args.registry_addr, chain) + deployed_pools = Hyperdrive.get_hyperdrive_addresses_from_registry(chain, parsed_args.registry_addr) logging.info("Running for all pools...") @@ -237,7 +237,7 @@ def main(argv: Sequence[str] | None = None) -> None: funcs=partials, chain=chain, sender=sender, - block_to_exit=chain.curr_block_number() + pool_check_num_blocks, + block_to_exit=chain.block_number() + pool_check_num_blocks, ) ) diff --git a/scripts/testnet_fuzz_bot_invariant_checks.py b/scripts/testnet_fuzz_bot_invariant_checks.py index 882c3af5f8..c8b236b758 100644 --- a/scripts/testnet_fuzz_bot_invariant_checks.py +++ b/scripts/testnet_fuzz_bot_invariant_checks.py @@ -53,7 +53,7 @@ def main(argv: Sequence[str] | None = None) -> None: # Run the loop forever while True: # Check for new pools - latest_block = chain.curr_block_data() + latest_block = chain.block_data() latest_block_number = latest_block.get("number", None) if latest_block_number is None: raise AssertionError("Block has no number.") @@ -63,7 +63,7 @@ def main(argv: Sequence[str] | None = None) -> None: # Reset hyperdrive objs hyperdrive_objs: dict[str, Hyperdrive] = {} # First iteration, get list of deployed pools - deployed_pools = Hyperdrive.get_hyperdrive_addresses_from_registry(parsed_args.registry_addr, chain) + deployed_pools = Hyperdrive.get_hyperdrive_addresses_from_registry(chain, parsed_args.registry_addr) for name, addr in deployed_pools.items(): logging.info("Adding pool %s", name) hyperdrive_objs[name] = Hyperdrive(chain, addr) diff --git a/src/agent0/core/hyperdrive/interactive/chain.py b/src/agent0/core/hyperdrive/interactive/chain.py index cfad3025d0..fe89ac1012 100644 --- a/src/agent0/core/hyperdrive/interactive/chain.py +++ b/src/agent0/core/hyperdrive/interactive/chain.py @@ -2,7 +2,7 @@ import contextlib -from web3.types import BlockData +from web3.types import BlockData, Timestamp from agent0.ethpy.base import initialize_web3_with_http_provider @@ -22,7 +22,6 @@ def __init__(self, rpc_uri: str): self.rpc_uri = rpc_uri # Initialize web3 here for rpc calls self._web3 = initialize_web3_with_http_provider(self.rpc_uri, reset_provider=False) - self._account_addrs: dict[str, bool] = {} def cleanup(self): """General cleanup of resources of interactive hyperdrive.""" @@ -32,15 +31,7 @@ def __del__(self): with contextlib.suppress(Exception): self.cleanup() - def _ensure_no_duplicate_addrs(self, addr: str): - if addr in self._account_addrs: - raise ValueError( - f"Wallet address {addr} already in use. " - "Cannot manage a separate interactive hyperdrive agent with the same address." - ) - self._account_addrs[addr] = True - - def curr_block_number(self) -> int: + def block_number(self) -> int: """Get the current block number on the chain. Returns @@ -50,8 +41,8 @@ def curr_block_number(self) -> int: """ return self._web3.eth.get_block_number() - def curr_block_data(self) -> BlockData: - """Get the current block number on the chain. + def block_data(self) -> BlockData: + """Get the current block on the chain. Returns ------- @@ -59,3 +50,17 @@ def curr_block_data(self) -> BlockData: The current block number """ return self._web3.eth.get_block("latest") + + def block_time(self) -> Timestamp: + """Get the current block time on the chain. + + Returns + ------- + int + The current block number + """ + block = self.block_data() + block_timestamp = block.get("timestamp", None) + if block_timestamp is None: + raise AssertionError("The provided block has no timestamp") + return block_timestamp diff --git a/src/agent0/core/hyperdrive/interactive/hyperdrive.py b/src/agent0/core/hyperdrive/interactive/hyperdrive.py index 60eb4301d9..8d02f3d144 100644 --- a/src/agent0/core/hyperdrive/interactive/hyperdrive.py +++ b/src/agent0/core/hyperdrive/interactive/hyperdrive.py @@ -125,17 +125,17 @@ def get_hyperdrive_addresses_from_artifacts( @classmethod def get_hyperdrive_addresses_from_registry( cls, - registry_contract_addr: str, chain: Chain, + registry_contract_addr: str, ) -> dict[str, ChecksumAddress]: """Gather deployed Hyperdrive pool addresses. Arguments --------- - registry_contract_addr: str - The address of the Hyperdrive factory contract. chain: Chain The Chain object connected to a chain. + registry_contract_addr: str + The address of the Hyperdrive factory contract. Returns ------- @@ -244,10 +244,6 @@ def _init_agent( agent = HyperdrivePolicyAgent(Account().from_key(private_key), initial_budget=FixedPoint(0), policy=policy_obj) - # Add the public address to the chain object to avoid multiple objects - # with the same underlying account - self.chain._ensure_no_duplicate_addrs(agent.checksum_address) # pylint: disable=protected-access - self._sync_wallet(agent) return agent diff --git a/src/agent0/core/hyperdrive/interactive/hyperdrive_test.py b/src/agent0/core/hyperdrive/interactive/hyperdrive_test.py index 228bdec79c..da6880e9e0 100644 --- a/src/agent0/core/hyperdrive/interactive/hyperdrive_test.py +++ b/src/agent0/core/hyperdrive/interactive/hyperdrive_test.py @@ -204,47 +204,6 @@ def test_remote_funding_and_trades(chain: LocalChain, check_remote_chain: bool): _ensure_agent_wallet_is_correct(hyperdrive_agent0.wallet, interactive_remote_hyperdrive.interface) -@pytest.mark.anvil -@pytest.mark.parametrize("check_remote_chain", [True, False]) -def test_multi_account_bookkeeping(chain: LocalChain, check_remote_chain: bool): - # Parameters for pool initialization. If empty, defaults to default values, allows for custom values if needed - # We explicitly set initial liquidity here to ensure we have withdrawal shares when trading - initial_pool_config = LocalHyperdrive.Config( - initial_liquidity=FixedPoint(1_000), - initial_fixed_apr=FixedPoint("0.05"), - position_duration=60 * 60 * 24 * 365, # 1 year - ) - # Launches a local hyperdrive pool - # This deploys the pool - interactive_local_hyperdrive_0 = LocalHyperdrive(chain, initial_pool_config) - interactive_local_hyperdrive_1 = LocalHyperdrive(chain, initial_pool_config) - - # Gather relevant objects from the local hyperdrive - hyperdrive_addresses_0 = interactive_local_hyperdrive_0.get_hyperdrive_address() - hyperdrive_addresses_1 = interactive_local_hyperdrive_1.get_hyperdrive_address() - - # Connect to the local chain using the remote hyperdrive interface - if check_remote_chain: - remote_chain = Chain(chain.rpc_uri) - interactive_remote_hyperdrive_0 = Hyperdrive(remote_chain, hyperdrive_addresses_0) - interactive_remote_hyperdrive_1 = Hyperdrive(remote_chain, hyperdrive_addresses_1) - else: - interactive_remote_hyperdrive_0 = Hyperdrive(chain, hyperdrive_addresses_0) - interactive_remote_hyperdrive_1 = Hyperdrive(chain, hyperdrive_addresses_1) - - # Generate trading agents from the interactive object - private_key = make_private_key() - _ = interactive_remote_hyperdrive_0.init_agent(private_key=private_key) - - # Initializing an agent with an existing key should fail - with pytest.raises(ValueError): - _ = interactive_remote_hyperdrive_0.init_agent(private_key=private_key) - - # Initializing an agent with an existing key on a separate pool should fail - with pytest.raises(ValueError): - _ = interactive_remote_hyperdrive_1.init_agent(private_key=private_key) - - @pytest.mark.anvil @pytest.mark.parametrize("check_remote_chain", [True, False]) def test_no_policy_call(chain: LocalChain, check_remote_chain: bool):