Skip to content

Commit

Permalink
Update data ingestion for chainsync (#1734)
Browse files Browse the repository at this point in the history
Updating data ingestion scripts.

- Move "run data ingestion in background" functionality out of
`acquire_data.py` logic and into outside script.
- Adding ability to backfill calculate historical positions.
- Allow pool info insertion logic to handle non-duplicate data.
- Paginating event logs when getting from chain.
  • Loading branch information
slundqui authored Nov 16, 2024
1 parent 401e2d9 commit 6ab90fe
Show file tree
Hide file tree
Showing 17 changed files with 430 additions and 249 deletions.
32 changes: 0 additions & 32 deletions scripts/run_acquire_data.py

This file was deleted.

33 changes: 0 additions & 33 deletions scripts/run_analyze_data.py

This file was deleted.

175 changes: 175 additions & 0 deletions scripts/sync_db.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
"""Script to maintain trade events on an external db."""

from __future__ import annotations

import argparse
import logging
import os
import sys
import time
from pathlib import Path
from typing import NamedTuple, Sequence

from agent0 import Chain, Hyperdrive
from agent0.chainsync.exec import acquire_data, analyze_data
from agent0.ethpy.base import EARLIEST_BLOCK_LOOKUP


def main(argv: Sequence[str] | None = None) -> None:
"""Check Hyperdrive invariants each block.
Arguments
---------
argv: Sequence[str]
A sequence containing the uri to the database server.
"""
# pylint: disable=too-many-locals
# pylint: disable=too-many-branches
# pylint: disable=too-many-statements

# Placeholder in case we have cli args
parsed_args = parse_arguments(argv)

# Get the rpc uri from env variable
rpc_uri = os.getenv("RPC_URI", None)
if rpc_uri is None:
raise ValueError("RPC_URI is not set")

chain = Chain(rpc_uri, Chain.Config(use_existing_postgres=True))

# Get the registry address from artifacts
registry_address = os.getenv("REGISTRY_ADDRESS", None)
if registry_address is None:
raise ValueError("REGISTRY_ADDRESS is not set")

logging.info("Checking for new pools...")
deployed_pools = Hyperdrive.get_hyperdrive_pools_from_registry(chain, registry_address)

interfaces = [pool.interface for pool in deployed_pools]

db_dump_path = Path(".db/")
# Make sure directory exists
db_dump_path.mkdir(exist_ok=True)

# Ignore if file not found, we start from scratch
try:
chain.load_db(db_dump_path)
logging.info("Loaded db from %s", db_dump_path)
except FileNotFoundError:
pass

chain_id = chain.chain_id
earliest_block = EARLIEST_BLOCK_LOOKUP[chain_id]

# TODO add backfill to hyperdrive object creation

acquire_data(
start_block=earliest_block,
interfaces=list(interfaces),
db_session=chain.db_session,
lookback_block_limit=None,
backfill=True,
backfill_sample_period=parsed_args.backfill_sample_period,
backfill_progress_bar=True,
)
analyze_data(
start_block=earliest_block,
interfaces=list(interfaces),
db_session=chain.db_session,
calc_pnl=True,
backfill=True,
backfill_sample_period=parsed_args.backfill_sample_period,
backfill_progress_bar=True,
)

# Loop forever, running db once an hour
while True:
logging.info("Syncing database")
# TODO to ensure these tables are synced, we want to
# add the block to add the data on.
# TODO add these functions to Hyperdrive object
acquire_data(
start_block=earliest_block,
interfaces=list(interfaces),
db_session=chain.db_session,
lookback_block_limit=None,
backfill=False,
)
analyze_data(
start_block=earliest_block,
interfaces=list(interfaces),
db_session=chain.db_session,
calc_pnl=True,
backfill=False,
)

chain.dump_db(db_dump_path)

time.sleep(parsed_args.dbsync_time)


class Args(NamedTuple):
"""Command line arguments for the script."""

backfill_sample_period: int
dbsync_time: int


# Placeholder for cli args
def namespace_to_args(namespace: argparse.Namespace) -> Args: # pylint: disable=unused-argument
"""Converts argprase.Namespace to Args.
Arguments
---------
namespace: argparse.Namespace
Object for storing arg attributes.
Returns
-------
Args
Formatted arguments
"""
return Args(
backfill_sample_period=namespace.backfill_sample_period,
dbsync_time=namespace.dbsync_time,
)


def parse_arguments(argv: Sequence[str] | None = None) -> Args:
"""Parses input arguments.
Arguments
---------
argv: Sequence[str]
The argv values returned from argparser.
Returns
-------
Args
Formatted arguments
"""
parser = argparse.ArgumentParser(description="Populates a database with trade events on hyperdrive.")

parser.add_argument(
"--backfill-sample-period",
type=int,
default=1000,
help="The block sample period when backfilling data. Default is 1000 blocks.",
)

parser.add_argument(
"--dbsync-time",
type=int,
default=3600,
help="The time in seconds to sleep between db syncs. Default is 3600 seconds (1 hour).",
)

# Use system arguments if none were passed
if argv is None:
argv = sys.argv

return namespace_to_args(parser.parse_args())


if __name__ == "__main__":
main()
19 changes: 14 additions & 5 deletions src/agent0/chainsync/analysis/db_to_analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
def db_to_analysis(
db_session: Session,
interfaces: list[HyperdriveReadInterface],
block_number: int,
calc_pnl: bool = True,
) -> None:
"""Function to query postgres data tables and insert to analysis tables.
Expand All @@ -32,6 +33,8 @@ def db_to_analysis(
The initialized db session.
interfaces: list[HyperdriveReadInterface]
A collection of Hyperdrive interface objects, each connected to a pool.
block_number: int
The block number to run analysis on.
calc_pnl: bool, optional
Whether to calculate pnl. Defaults to True.
"""
Expand All @@ -45,11 +48,16 @@ def db_to_analysis(
wallet_addr=None,
calc_pnl=calc_pnl,
db_session=db_session,
block_number=block_number,
)


def snapshot_positions_to_db(
interfaces: list[HyperdriveReadInterface], wallet_addr: str | None, calc_pnl: bool, db_session: Session
interfaces: list[HyperdriveReadInterface],
wallet_addr: str | None,
calc_pnl: bool,
db_session: Session,
block_number: int,
):
"""Function to query the trade events table and takes a snapshot
of the current positions and pnl.
Expand All @@ -75,17 +83,18 @@ def snapshot_positions_to_db(
The database session.
calc_pnl: bool
Whether to calculate pnl.
block_number: int
The block number to snapshot positions on.
"""
assert len(interfaces) > 0
query_block_number = interfaces[0].get_block_number(interfaces[0].get_block("latest"))

all_pool_positions: list[pd.DataFrame] = []
for interface in interfaces:
# TODO filter by hyperdrive address here
last_snapshot_block = get_latest_block_number_from_positions_snapshot_table(
db_session, wallet_addr, hyperdrive_address=interface.hyperdrive_address
)
if query_block_number <= last_snapshot_block:
if block_number <= last_snapshot_block:
continue

# Calculate all open positions for the end block
Expand All @@ -96,13 +105,13 @@ def snapshot_positions_to_db(
db_session,
wallet_addr=wallet_addr,
hyperdrive_address=interface.hyperdrive_address,
query_block=query_block_number + 1, # Query block numbers are not inclusive
query_block=block_number + 1, # Query block numbers are not inclusive
show_closed_positions=True,
coerce_float=False,
)
if len(current_pool_positions) > 0:
# Add missing columns
current_pool_positions["block_number"] = query_block_number
current_pool_positions["block_number"] = block_number
# Calculate pnl for these positions if flag is set
if calc_pnl:
current_pool_positions = fill_pnl_values(
Expand Down
2 changes: 1 addition & 1 deletion src/agent0/chainsync/dashboard/build_dashboard_dfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@


def build_pool_dashboard(
hyperdrive_address: str, session: Session, max_live_blocks: int = 5000, max_ticker_rows: int = 1000
hyperdrive_address: str, session: Session, max_live_blocks: int = 20000, max_ticker_rows: int = 10000
) -> dict[str, pd.DataFrame]:
"""Builds the dataframes for the main dashboard page that focuses on pools.
Expand Down
2 changes: 1 addition & 1 deletion src/agent0/chainsync/db/hyperdrive/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
"""Hyperdrive database utilities."""

from .chain_to_db import checkpoint_events_to_db, data_chain_to_db, init_data_chain_to_db, trade_events_to_db
from .chain_to_db import checkpoint_events_to_db, init_data_chain_to_db, pool_info_to_db, trade_events_to_db
from .convert_data import convert_pool_config, convert_pool_info
from .import_export_data import export_db_to_file, import_to_db, import_to_pandas
from .interface import (
Expand Down
23 changes: 14 additions & 9 deletions src/agent0/chainsync/db/hyperdrive/chain_to_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
add_pool_config,
add_pool_infos,
get_latest_block_number_from_checkpoint_info_table,
get_latest_block_number_from_pool_info_table,
get_latest_block_number_from_trade_event,
)
from .schema import DBCheckpointInfo, DBTradeEvent
Expand Down Expand Up @@ -50,7 +51,7 @@ def init_data_chain_to_db(
add_pool_config(pool_config_db_obj, session)


def data_chain_to_db(interfaces: list[HyperdriveReadInterface], block_number: int, session: Session) -> None:
def pool_info_to_db(interfaces: list[HyperdriveReadInterface], block_number: int, session: Session) -> None:
"""Function to query and insert data to dashboard.
Arguments
Expand All @@ -74,16 +75,20 @@ def data_chain_to_db(interfaces: list[HyperdriveReadInterface], block_number: in
# missing data TODO)
# Pool info table drives which blocks gets queried.

# Add all trade events to the table
# TODO there may be time and memory concerns here if we're spinning up from
# scratch and there's lots of trades/pools.
trade_events_to_db(interfaces, wallet_addr=None, db_session=session)

# Add all checkpoint events to the table
checkpoint_events_to_db(interfaces, db_session=session)

for interface in interfaces:
# TODO abstract this function out
# Only add the pool info row if it's already not in the db
hyperdrive_address = interface.hyperdrive_address
if block_number <= get_latest_block_number_from_pool_info_table(session, hyperdrive_address=hyperdrive_address):
continue

# If the pool wasn't deployed at the query block, skip
deploy_block = interface.get_deploy_block_number()
# deploy_block may be None in cases where we have a local chain and we lose the past events
# In this case, we don't skip and hope the pool is already deployed
if deploy_block is not None and block_number < deploy_block:
continue

pool_state = interface.get_hyperdrive_state(block_data=block)

## Query and add block_pool_info
Expand Down
Loading

0 comments on commit 6ab90fe

Please sign in to comment.