Skip to content

Commit

Permalink
Connect existing postgres (#1504)
Browse files Browse the repository at this point in the history
This PR allows the agent0 pipeline to connect to an existing postgres
resource instead of managing it's own container.

NOTE: actual implementation of this in the infra repo is blocked by
registry updates to query registered pools
(delvtech/hyperdrive#1039). Currently, we use
events to get registered pools, but since infra uses anvil's save/load
state, register events get lost. Need follow up after the hyperdrive
registry update to get list of registered pools from contract call.

- Agent0 `Chain` object now has a flag `use_existing_postgres` to look
for env variables to connect to existing postgres container.
- Using singular `.env` file instead of multiple ones for defining env
variables.
- Fixing checkpoint bots and invariance checks to take an `--infra` flag
to look for environment variables for connection.
- Fixing data pipeline scripts to read hyperdrive addresses from
registry.
- Fixing bug to rolling back db when adding hyperdrive addr to name
fails
  • Loading branch information
slundqui authored May 31, 2024
1 parent 2f053b4 commit 8f11205
Show file tree
Hide file tree
Showing 22 changed files with 214 additions and 114 deletions.
10 changes: 10 additions & 0 deletions env.sample
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# Postgres configuration
# Used when chain's config `use_existing_postgres` is set to True
POSTGRES_USER="admin"
POSTGRES_PASSWORD="password"
POSTGRES_DB="agent0_db"
POSTGRES_HOST="localhost"
POSTGRES_PORT=5432

# Rollbar api key
ROLLBAR_API_KEY=<INSERT_KEY>
5 changes: 0 additions & 5 deletions postgres.env.sample

This file was deleted.

1 change: 0 additions & 1 deletion rollbar.env.sample

This file was deleted.

79 changes: 60 additions & 19 deletions scripts/testnet_checkpoint_bots.py → scripts/checkpoint_bots.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@
from eth_account.account import Account
from eth_account.signers.local import LocalAccount
from eth_typing import ChecksumAddress
from fixedpointmath import FixedPoint

from agent0 import Chain, Hyperdrive
from agent0.core.base.make_key import make_private_key
from agent0.ethpy.base import smart_contract_transact
from agent0.ethpy.hyperdrive import get_hyperdrive_pool_config
from agent0.ethpy.hyperdrive import get_hyperdrive_pool_config, get_hyperdrive_registry_from_artifacts
from agent0.hyperfuzz.system_fuzz.run_local_fuzz_bots import async_runner
from agent0.hypertypes import IHyperdriveContract

Expand Down Expand Up @@ -84,8 +86,7 @@ def run_checkpoint_bot(
"""
# pylint: disable=too-many-arguments

# TODO pull this function out and put into agent0, and use this function in
# the infra version of checkpoint bot
# TODO pull this function out and put into agent0
web3 = chain._web3 # pylint: disable=protected-access

hyperdrive_contract: IHyperdriveContract = IHyperdriveContract.factory(w3=web3)(pool_address)
Expand Down Expand Up @@ -200,26 +201,57 @@ def main(argv: Sequence[str] | None = None) -> None:
parsed_args = parse_arguments(argv)

# Initialize
chain = Chain(parsed_args.rpc_uri)

# We calculate how many blocks we should wait before checking for a new pool
pool_check_num_blocks = parsed_args.pool_check_sleep_time // 12

private_key = os.getenv("CHECKPOINT_BOT_KEY")
sender: LocalAccount = Account().from_key(private_key)

if parsed_args.infra:
# Get the rpc uri from env variable
rpc_uri = os.getenv("RPC_URI", None)
if rpc_uri is None:
raise ValueError("RPC_URI is not set")

chain = Chain(rpc_uri, Chain.Config(use_existing_postgres=True))
private_key = make_private_key()
# We create an agent here to fund it eth
agent = chain.init_agent(private_key=private_key)
agent.add_funds(eth=FixedPoint(100_000))
sender: LocalAccount = agent.account

# Get the registry address from artifacts
artifacts_uri = os.getenv("ARTIFACTS_URI", None)
if artifacts_uri is None:
raise ValueError("ARTIFACTS_URI is not set")
registry_address = get_hyperdrive_registry_from_artifacts(artifacts_uri)

# Get block time and block timestamp interval from env vars
block_time = int(os.getenv("BLOCK_TIME", "12"))
block_timestamp_interval = int(os.getenv("BLOCK_TIMESTAMP_INTERVAL", "12"))
else:
chain = Chain(parsed_args.rpc_uri)
private_key = os.getenv("CHECKPOINT_BOT_KEY", None)
if private_key is None:
raise ValueError("CHECKPOINT_BOT_KEY is not set")
sender: LocalAccount = Account().from_key(private_key)
registry_address = parsed_args.registry_addr
block_time = 1
block_timestamp_interval = 1

# Loop for checkpoint bot across all registered pools
while True:
logging.info("Checking for new pools...")
# Reset hyperdrive objs
deployed_pools = Hyperdrive.get_hyperdrive_addresses_from_registry(chain, parsed_args.registry_addr)
deployed_pools = Hyperdrive.get_hyperdrive_addresses_from_registry(chain, registry_address)

logging.info("Running for all pools...")

# TODO because _async_runner only takes one set of arguments for all calls,
# we make partial calls for each call. The proper fix here is to generalize
# _async_runner to take separate arguments for each call.
partials = [
partial(run_checkpoint_bot, pool_address=pool_addr, pool_name=pool_name)
partial(
run_checkpoint_bot,
pool_address=pool_addr,
pool_name=pool_name,
block_time=block_time,
block_timestamp_interval=block_timestamp_interval,
)
for pool_name, pool_addr in deployed_pools.items()
]

Expand All @@ -231,15 +263,16 @@ def main(argv: Sequence[str] | None = None) -> None:
funcs=partials,
chain=chain,
sender=sender,
block_to_exit=chain.block_number() + pool_check_num_blocks,
block_to_exit=chain.block_number() + parsed_args.pool_check_sleep_blocks,
)
)


class Args(NamedTuple):
"""Command line arguments for the checkpoint bot."""

pool_check_sleep_time: int
pool_check_sleep_blocks: int
infra: bool
registry_addr: str
rpc_uri: str

Expand All @@ -258,7 +291,8 @@ def namespace_to_args(namespace: argparse.Namespace) -> Args:
Formatted arguments
"""
return Args(
pool_check_sleep_time=namespace.pool_check_sleep_time,
pool_check_sleep_blocks=namespace.pool_check_sleep_blocks,
infra=namespace.infra,
registry_addr=namespace.registry_addr,
rpc_uri=namespace.rpc_uri,
)
Expand All @@ -279,10 +313,17 @@ def parse_arguments(argv: Sequence[str] | None = None) -> Args:
"""
parser = argparse.ArgumentParser(description="Runs a bot that creates checkpoints each checkpoint_duration.")
parser.add_argument(
"--pool-check-sleep-time",
"--pool-check-sleep-blocks",
type=int,
default=86400, # 1 day
help="Sleep time between checking for new pools, in seconds.",
default=7200, # 1 day for 12 second block time
help="Number of blocks in between checking for new pools.",
)

parser.add_argument(
"--infra",
default=False,
action="store_true",
help="Infra mode, we get registry address from artifacts, and we fund a random account with eth as sender.",
)

parser.add_argument(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,13 @@

import argparse
import logging
import os
import sys
import time
from typing import NamedTuple, Sequence

from agent0 import Chain, Hyperdrive
from agent0.ethpy.hyperdrive import get_hyperdrive_registry_from_artifacts
from agent0.hyperfuzz.system_fuzz.invariant_checks import run_invariant_checks
from agent0.hyperlogs.rollbar_utilities import initialize_rollbar

Expand All @@ -30,18 +32,35 @@ def main(argv: Sequence[str] | None = None) -> None:
argv: Sequence[str]
A sequence containing the uri to the database server and the test epsilon.
"""
# pylint: disable=too-many-locals

parsed_args = parse_arguments(argv)
chain = Chain(parsed_args.rpc_uri)

if parsed_args.infra:
# Get the rpc uri from env variable
rpc_uri = os.getenv("RPC_URI", None)
if rpc_uri is None:
raise ValueError("RPC_URI is not set")

chain = Chain(rpc_uri, Chain.Config(use_existing_postgres=True))

# Get the registry address from artifacts
artifacts_uri = os.getenv("ARTIFACTS_URI", None)
if artifacts_uri is None:
raise ValueError("ARTIFACTS_URI is not set")
registry_address = get_hyperdrive_registry_from_artifacts(artifacts_uri)
else:
chain = Chain(parsed_args.rpc_uri)
registry_address = parsed_args.registry_addr

# We use the logical name if we don't specify pool addr, otherwise we use the pool addr
rollbar_environment_name = "testnet_fuzz_bot_invariant_check"
log_to_rollbar = initialize_rollbar(rollbar_environment_name)

# We calculate how many blocks we should wait before checking for a new pool
pool_check_num_blocks = parsed_args.pool_check_sleep_time // 12

last_executed_block_number = -pool_check_num_blocks - 1 # no matter what we will run the check the first time
last_executed_block_number = (
-parsed_args.pool_check_sleep_blocks - 1
) # no matter what we will run the check the first time
last_pool_check_block_number = 0

hyperdrive_objs: dict[str, Hyperdrive] = {}
Expand All @@ -54,12 +73,12 @@ def main(argv: Sequence[str] | None = None) -> None:
if latest_block_number is None:
raise AssertionError("Block has no number.")

if latest_block_number > last_pool_check_block_number + pool_check_num_blocks:
if latest_block_number > last_pool_check_block_number + parsed_args.pool_check_sleep_blocks:
logging.info("Checking for new pools...")
# Reset hyperdrive objs
hyperdrive_objs: dict[str, Hyperdrive] = {}
# First iteration, get list of deployed pools
deployed_pools = Hyperdrive.get_hyperdrive_addresses_from_registry(chain, parsed_args.registry_addr)
deployed_pools = Hyperdrive.get_hyperdrive_addresses_from_registry(chain, registry_address)
for name, addr in deployed_pools.items():
logging.info("Adding pool %s", name)
hyperdrive_objs[name] = Hyperdrive(chain, addr)
Expand Down Expand Up @@ -90,7 +109,8 @@ class Args(NamedTuple):

test_epsilon: float
invariance_check_sleep_time: int
pool_check_sleep_time: int
pool_check_sleep_blocks: int
infra: bool
registry_addr: str
rpc_uri: str

Expand All @@ -111,7 +131,8 @@ def namespace_to_args(namespace: argparse.Namespace) -> Args:
return Args(
test_epsilon=namespace.test_epsilon,
invariance_check_sleep_time=namespace.invariance_check_sleep_time,
pool_check_sleep_time=namespace.pool_check_sleep_time,
pool_check_sleep_blocks=namespace.pool_check_sleep_blocks,
infra=namespace.infra,
registry_addr=namespace.registry_addr,
rpc_uri=namespace.rpc_uri,
)
Expand Down Expand Up @@ -143,11 +164,19 @@ def parse_arguments(argv: Sequence[str] | None = None) -> Args:
default=5,
help="Sleep time between invariance checks, in seconds.",
)

parser.add_argument(
"--pool-check-sleep-time",
"--pool-check-sleep-blocks",
type=int,
default=3600, # 1 hour
help="Sleep time between checking for new pools, in seconds.",
default=300, # 1 hour for 12 second block time
help="Number of blocks in between checking for new pools.",
)

parser.add_argument(
"--infra",
default=False,
action="store_true",
help="Infra mode, we get registry address from artifacts, and we fund a random account with eth as sender.",
)

parser.add_argument(
Expand Down
2 changes: 1 addition & 1 deletion scripts/local_fuzz_bots.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ def parse_arguments(argv: Sequence[str] | None = None) -> Args:
help="Pause execution on invariance failure.",
)
parser.add_argument(
"--chain_port",
"--chain-port",
type=int,
default=-1,
help="The port to run anvil on.",
Expand Down
12 changes: 9 additions & 3 deletions scripts/run_acquire_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,11 @@
import os

from agent0.chainsync.exec import acquire_data
from agent0.ethpy.hyperdrive.addresses import get_hyperdrive_addresses_from_artifacts
from agent0.ethpy.base import initialize_web3_with_http_provider
from agent0.ethpy.hyperdrive.addresses import (
get_hyperdrive_addresses_from_registry,
get_hyperdrive_registry_from_artifacts,
)
from agent0.hyperlogs import setup_logging

if __name__ == "__main__":
Expand All @@ -14,12 +18,14 @@
rpc_uri = os.getenv("RPC_URI", "http://localhost:8545")
# TODO get this from the registry after refactor
artifacts_uri = os.getenv("ARTIFACTS_URI", "http://localhost:8080")
hyperdrive_addr = get_hyperdrive_addresses_from_artifacts(artifacts_uri)["erc4626_hyperdrive"]
registry_address = get_hyperdrive_registry_from_artifacts(artifacts_uri)
web3 = initialize_web3_with_http_provider(rpc_uri, reset_provider=False)
hyperdrive_addrs = get_hyperdrive_addresses_from_registry(registry_address, web3)
acquire_data(
# This is the start block needed based on the devnet image, which corresponds
# to the block that the contract was deployed.
# TODO ideally would gather this from the deployer
start_block=48,
rpc_uri=rpc_uri,
hyperdrive_addresses=[hyperdrive_addr],
hyperdrive_addresses=hyperdrive_addrs,
)
14 changes: 10 additions & 4 deletions scripts/run_data_analysis.py → scripts/run_analyze_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,22 +5,28 @@
import os

from agent0.chainsync.exec import analyze_data
from agent0.ethpy.hyperdrive.addresses import get_hyperdrive_addresses_from_artifacts
from agent0.ethpy.base import initialize_web3_with_http_provider
from agent0.ethpy.hyperdrive.addresses import (
get_hyperdrive_addresses_from_registry,
get_hyperdrive_registry_from_artifacts,
)
from agent0.hyperlogs import setup_logging

if __name__ == "__main__":
setup_logging(".logging/data_analysis.log", log_stdout=True)
setup_logging(".logging/analyze_data.log", log_stdout=True)
# Get the RPC URI and pool address from environment var
rpc_uri = os.getenv("RPC_URI", "http://localhost:8545")
# TODO get this from the registry after refactor
artifacts_uri = os.getenv("ARTIFACTS_URI", "http://localhost:8080")
hyperdrive_addr = get_hyperdrive_addresses_from_artifacts(artifacts_uri)["erc4626_hyperdrive"]
registry_address = get_hyperdrive_registry_from_artifacts(artifacts_uri)
web3 = initialize_web3_with_http_provider(rpc_uri, reset_provider=False)
hyperdrive_addrs = get_hyperdrive_addresses_from_registry(registry_address, web3=web3)

analyze_data(
# This is the start block needed based on the devnet image, which corresponds
# to the block that the contract was deployed.
# TODO ideally would gather this from the deployer
start_block=48,
rpc_uri=rpc_uri,
hyperdrive_addresses=[hyperdrive_addr],
hyperdrive_addresses=hyperdrive_addrs,
)
2 changes: 1 addition & 1 deletion scripts/run_unit_fuzz.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ def parse_arguments(argv: Sequence[str] | None = None) -> Args:
"""
parser = argparse.ArgumentParser(description="Runs a loop to check Hyperdrive invariants at each block.")
parser.add_argument(
"--number_of_runs",
"--number-of-runs",
type=int,
default=0,
help="The number times to run the tests. If not set, will run forever.",
Expand Down
2 changes: 1 addition & 1 deletion scripts/update_usernames.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@
}

# Get session object
# This reads the postgres.env file for database credentials
# This reads the .env file for database credentials
db_session = initialize_session()

setup_logging(".logging/update_usernames.log", log_stdout=True, delete_previous_logs=True)
Expand Down
2 changes: 1 addition & 1 deletion src/agent0/chainsync/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
"""Loads config"""

from .postgres_config import PostgresConfig, build_postgres_config
from .postgres_config import PostgresConfig, build_postgres_config_from_env
Loading

0 comments on commit 8f11205

Please sign in to comment.