From 55d01a878e97d1fe026221f404a9f74de3aa7820 Mon Sep 17 00:00:00 2001 From: Roman <167799377+roman-opentensor@users.noreply.github.com> Date: Tue, 3 Sep 2024 13:56:40 -0700 Subject: [PATCH] Merge pull request #2280 from opentensor/feat/roman/add-subtensor-reconnection-logic Add reconnection logic + tests --- bittensor/extrinsics/serving.py | 2 + bittensor/subtensor.py | 98 ++++++++++++++++++++++-------- bittensor/utils/networking.py | 33 ++++++++-- tests/unit_tests/test_subtensor.py | 36 +++++++++++ 4 files changed, 141 insertions(+), 28 deletions(-) diff --git a/bittensor/extrinsics/serving.py b/bittensor/extrinsics/serving.py index bba5367de1..734561835f 100644 --- a/bittensor/extrinsics/serving.py +++ b/bittensor/extrinsics/serving.py @@ -25,6 +25,7 @@ import bittensor import bittensor.utils.networking as net from bittensor.utils import format_error_message +from bittensor.utils.networking import ensure_connected from ..errors import MetadataError @@ -269,6 +270,7 @@ def publish_metadata( raise MetadataError(format_error_message(response.error_message)) +@ensure_connected def get_metadata(self, netuid: int, hotkey: str, block: Optional[int] = None) -> str: @retry(delay=2, tries=3, backoff=2, max_delay=4) def make_substrate_call_with_retry(): diff --git a/bittensor/subtensor.py b/bittensor/subtensor.py index ac22a3a14d..c4e70deae5 100644 --- a/bittensor/subtensor.py +++ b/bittensor/subtensor.py @@ -26,6 +26,7 @@ import argparse import copy import socket +import sys import time from typing import List, Dict, Union, Optional, Tuple, TypedDict, Any @@ -188,6 +189,7 @@ def __init__( config: Optional[bittensor.config] = None, _mock: bool = False, log_verbose: bool = True, + connection_timeout: int = 600, ) -> None: """ Initializes a Subtensor interface for interacting with the Bittensor blockchain. @@ -251,7 +253,25 @@ def __init__( "To get ahead of this change, please run a local subtensor node and point to it." ) - # Attempt to connect to chosen endpoint. Fallback to finney if local unavailable. + self.log_verbose = log_verbose + self._connection_timeout = connection_timeout + self._get_substrate() + + self._subtensor_errors: Dict[str, Dict[str, str]] = {} + + def __str__(self) -> str: + if self.network == self.chain_endpoint: + # Connecting to chain endpoint without network known. + return "subtensor({})".format(self.chain_endpoint) + else: + # Connecting to network with endpoint known. + return "subtensor({}, {})".format(self.network, self.chain_endpoint) + + def __repr__(self) -> str: + return self.__str__() + + def _get_substrate(self): + """Establishes a connection to the Substrate node using configured parameters.""" try: # Set up params. self.substrate = SubstrateInterface( @@ -260,6 +280,11 @@ def __init__( url=self.chain_endpoint, type_registry=bittensor.__type_registry__, ) + if self.log_verbose: + _logger.info( + f"Connected to {self.network} network and {self.chain_endpoint}." + ) + except ConnectionRefusedError: _logger.error( f"Could not connect to {self.network} network with {self.chain_endpoint} chain endpoint. Exiting...", @@ -268,13 +293,10 @@ def __init__( "You can check if you have connectivity by running this command: nc -vz localhost " f"{self.chain_endpoint.split(':')[2]}" ) - exit(1) - # TODO (edu/phil): Advise to run local subtensor and point to dev docs. + sys.exit(1) try: - self.substrate.websocket.settimeout(600) - # except: - # bittensor.logging.warning("Could not set websocket timeout.") + self.substrate.websocket.settimeout(self._connection_timeout) except AttributeError as e: _logger.warning(f"AttributeError: {e}") except TypeError as e: @@ -282,24 +304,6 @@ def __init__( except (socket.error, OSError) as e: _logger.warning(f"Socket error: {e}") - if log_verbose: - _logger.info( - f"Connected to {self.network} network and {self.chain_endpoint}." - ) - - self._subtensor_errors: Dict[str, Dict[str, str]] = {} - - def __str__(self) -> str: - if self.network == self.chain_endpoint: - # Connecting to chain endpoint without network known. - return "subtensor({})".format(self.chain_endpoint) - else: - # Connecting to network with endpoint known. - return "subtensor({}, {})".format(self.network, self.chain_endpoint) - - def __repr__(self) -> str: - return self.__str__() - @staticmethod def config() -> "bittensor.config": """ @@ -670,6 +674,7 @@ def set_take( wait_for_finalization=wait_for_finalization, ) + @networking.ensure_connected def send_extrinsic( self, wallet: "bittensor.wallet", @@ -839,6 +844,7 @@ def set_weights( return success, message + @networking.ensure_connected def _do_set_weights( self, wallet: "bittensor.wallet", @@ -986,6 +992,7 @@ def commit_weights( return success, message + @networking.ensure_connected def _do_commit_weights( self, wallet: "bittensor.wallet", @@ -1110,6 +1117,7 @@ def reveal_weights( return success, message + @networking.ensure_connected def _do_reveal_weights( self, wallet: "bittensor.wallet", @@ -1372,6 +1380,7 @@ def burned_register( prompt=prompt, ) + @networking.ensure_connected def _do_pow_register( self, netuid: int, @@ -1434,6 +1443,7 @@ def make_substrate_call_with_retry(): return make_substrate_call_with_retry() + @networking.ensure_connected def _do_burned_register( self, netuid: int, @@ -1491,6 +1501,7 @@ def make_substrate_call_with_retry(): return make_substrate_call_with_retry() + @networking.ensure_connected def _do_swap_hotkey( self, wallet: "bittensor.wallet", @@ -1588,6 +1599,7 @@ def transfer( prompt=prompt, ) + @networking.ensure_connected def get_transfer_fee( self, wallet: "bittensor.wallet", dest: str, value: Union["Balance", float, int] ) -> "Balance": @@ -1645,6 +1657,7 @@ def get_transfer_fee( ) return fee + @networking.ensure_connected def _do_transfer( self, wallet: "bittensor.wallet", @@ -1880,6 +1893,7 @@ def serve_axon( self, netuid, axon, wait_for_inclusion, wait_for_finalization ) + @networking.ensure_connected def _do_serve_axon( self, wallet: "bittensor.wallet", @@ -1947,6 +1961,7 @@ def serve_prometheus( wait_for_finalization=wait_for_finalization, ) + @networking.ensure_connected def _do_serve_prometheus( self, wallet: "bittensor.wallet", @@ -1992,6 +2007,7 @@ def make_substrate_call_with_retry(): return make_substrate_call_with_retry() + @networking.ensure_connected def _do_associate_ips( self, wallet: "bittensor.wallet", @@ -2122,6 +2138,7 @@ def add_stake_multiple( prompt, ) + @networking.ensure_connected def _do_stake( self, wallet: "bittensor.wallet", @@ -2249,6 +2266,7 @@ def unstake( prompt, ) + @networking.ensure_connected def _do_unstake( self, wallet: "bittensor.wallet", @@ -2339,6 +2357,7 @@ def set_childkey_take( prompt=prompt, ) + @networking.ensure_connected def _do_set_childkey_take( self, wallet: "bittensor.wallet", @@ -2430,6 +2449,7 @@ def set_children( prompt=prompt, ) + @networking.ensure_connected def _do_set_children( self, wallet: "bittensor.wallet", @@ -2806,6 +2826,7 @@ def root_register( prompt=prompt, ) + @networking.ensure_connected def _do_root_register( self, wallet: "bittensor.wallet", @@ -2886,6 +2907,7 @@ def root_set_weights( prompt=prompt, ) + @networking.ensure_connected def _do_set_root_weights( self, wallet: "bittensor.wallet", @@ -2958,6 +2980,7 @@ def make_substrate_call_with_retry(): ################## # Queries subtensor registry named storage with params and block. + @networking.ensure_connected def query_identity( self, key: str, @@ -3000,6 +3023,7 @@ def make_substrate_call_with_retry() -> "ScaleType": identity_info.value["info"] ) + @networking.ensure_connected def update_identity( self, wallet: "bittensor.wallet", @@ -3103,6 +3127,7 @@ def get_commitment(self, netuid: int, uid: int, block: Optional[int] = None) -> ################## # Queries subtensor named storage with params and block. + @networking.ensure_connected def query_subtensor( self, name: str, @@ -3139,6 +3164,7 @@ def make_substrate_call_with_retry() -> "ScaleType": return make_substrate_call_with_retry() # Queries subtensor map storage with params and block. + @networking.ensure_connected def query_map_subtensor( self, name: str, @@ -3175,6 +3201,7 @@ def make_substrate_call_with_retry(): return make_substrate_call_with_retry() + @networking.ensure_connected def query_constant( self, module_name: str, constant_name: str, block: Optional[int] = None ) -> Optional["ScaleType"]: @@ -3209,6 +3236,7 @@ def make_substrate_call_with_retry(): return make_substrate_call_with_retry() # Queries any module storage with params and block. + @networking.ensure_connected def query_module( self, module: str, @@ -3248,6 +3276,7 @@ def make_substrate_call_with_retry() -> "ScaleType": return make_substrate_call_with_retry() # Queries any module map storage with params and block. + @networking.ensure_connected def query_map( self, module: str, @@ -3286,6 +3315,7 @@ def make_substrate_call_with_retry() -> "QueryMapResult": return make_substrate_call_with_retry() + @networking.ensure_connected def state_call( self, method: str, @@ -3374,6 +3404,7 @@ def query_runtime_api( return obj.decode() + @networking.ensure_connected def _encode_params( self, call_definition: List["ParamWithTypes"], @@ -4351,6 +4382,7 @@ def get_subnets(self, block: Optional[int] = None) -> List[int]: else [] ) + @networking.ensure_connected def get_all_subnets_info(self, block: Optional[int] = None) -> List[SubnetInfo]: """ Retrieves detailed information about all subnets within the Bittensor network. This function @@ -4382,6 +4414,7 @@ def make_substrate_call_with_retry(): return SubnetInfo.list_from_vec_u8(result) + @networking.ensure_connected def get_subnet_info( self, netuid: int, block: Optional[int] = None ) -> Optional[SubnetInfo]: @@ -4540,6 +4573,7 @@ def get_nominators_for_hotkey( else 0 ) + @networking.ensure_connected def get_delegate_by_hotkey( self, hotkey_ss58: str, block: Optional[int] = None ) -> Optional[DelegateInfo]: @@ -4577,6 +4611,7 @@ def make_substrate_call_with_retry(encoded_hotkey_: List[int]): return DelegateInfo.from_vec_u8(result) + @networking.ensure_connected def get_delegates_lite(self, block: Optional[int] = None) -> List[DelegateInfoLite]: """ Retrieves a lighter list of all delegate neurons within the Bittensor network. This function provides an @@ -4611,6 +4646,7 @@ def make_substrate_call_with_retry(): return [DelegateInfoLite(**d) for d in result] + @networking.ensure_connected def get_delegates(self, block: Optional[int] = None) -> List[DelegateInfo]: """ Retrieves a list of all delegate neurons within the Bittensor network. This function provides an overview of the @@ -4643,6 +4679,7 @@ def make_substrate_call_with_retry(): return DelegateInfo.list_from_vec_u8(result) + @networking.ensure_connected def get_delegated( self, coldkey_ss58: str, block: Optional[int] = None ) -> List[Tuple[DelegateInfo, Balance]]: @@ -4715,6 +4752,7 @@ def get_childkey_take( return None return None + @networking.ensure_connected def get_children(self, hotkey, netuid) -> list[tuple[int, str]] | list[Any] | None: """ Get the children of a hotkey on a specific network. @@ -4741,6 +4779,7 @@ def get_children(self, hotkey, netuid) -> list[tuple[int, str]] | list[Any] | No print(f"Unexpected error in get_children: {e}") return None + @networking.ensure_connected def get_parents(self, child_hotkey, netuid): """ Get the parents of a child hotkey on a specific network. @@ -4852,6 +4891,7 @@ def get_stake_info_for_coldkeys( return StakeInfo.list_of_tuple_from_vec_u8(bytes_result) # type: ignore + @networking.ensure_connected def get_minimum_required_stake( self, ) -> Balance: @@ -5104,6 +5144,7 @@ def neuron_for_wallet( wallet.hotkey.ss58_address, netuid=netuid, block=block ) + @networking.ensure_connected def neuron_for_uid( self, uid: Optional[int], netuid: int, block: Optional[int] = None ) -> NeuronInfo: @@ -5396,6 +5437,7 @@ def get_subnet_burn_cost(self, block: Optional[int] = None) -> Optional[str]: # Extrinsics # ############## + @networking.ensure_connected def _do_delegation( self, wallet: "bittensor.wallet", @@ -5447,6 +5489,7 @@ def make_substrate_call_with_retry(): return make_substrate_call_with_retry() + @networking.ensure_connected def _do_undelegation( self, wallet: "bittensor.wallet", @@ -5501,6 +5544,7 @@ def make_substrate_call_with_retry(): return make_substrate_call_with_retry() + @networking.ensure_connected def _do_nominate( self, wallet: "bittensor.wallet", @@ -5548,6 +5592,7 @@ def make_substrate_call_with_retry(): return make_substrate_call_with_retry() + @networking.ensure_connected def _do_increase_take( self, wallet: "bittensor.wallet", @@ -5603,6 +5648,7 @@ def make_substrate_call_with_retry(): return make_substrate_call_with_retry() + @networking.ensure_connected def _do_decrease_take( self, wallet: "bittensor.wallet", @@ -5662,6 +5708,7 @@ def make_substrate_call_with_retry(): # Legacy # ########## + @networking.ensure_connected def get_balance(self, address: str, block: Optional[int] = None) -> Balance: """ Retrieves the token balance of a specific address within the Bittensor network. This function queries @@ -5698,6 +5745,7 @@ def make_substrate_call_with_retry(): return Balance(1000) return Balance(result.value["data"]["free"]) + @networking.ensure_connected def get_current_block(self) -> int: """ Returns the current block number on the Bittensor blockchain. This function provides the latest block @@ -5716,6 +5764,7 @@ def make_substrate_call_with_retry(): return make_substrate_call_with_retry() + @networking.ensure_connected def get_balances(self, block: Optional[int] = None) -> Dict[str, Balance]: """ Retrieves the token balances of all accounts within the Bittensor network as of a specific blockchain block. @@ -5775,6 +5824,7 @@ def _null_neuron() -> NeuronInfo: ) # type: ignore return neuron + @networking.ensure_connected def get_block_hash(self, block_id: int) -> str: """ Retrieves the hash of a specific block on the Bittensor blockchain. The block hash is a unique diff --git a/bittensor/utils/networking.py b/bittensor/utils/networking.py index 4d1af585c3..f4b729fe97 100644 --- a/bittensor/utils/networking.py +++ b/bittensor/utils/networking.py @@ -19,15 +19,17 @@ # OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER # DEALINGS IN THE SOFTWARE. -# Standard Lib +import json import os +import socket import urllib -import json -import netaddr +from functools import wraps -# 3rd party +import netaddr import requests +from bittensor.btlogging import logging + def int_to_ip(int_val: int) -> str: r"""Maps an integer to a unique ip-string @@ -171,3 +173,26 @@ def get_formatted_ws_endpoint_url(endpoint_url: str) -> str: endpoint_url = "ws://{}".format(endpoint_url) return endpoint_url + + +def ensure_connected(func): + """Decorator ensuring the function executes with an active substrate connection.""" + + @wraps(func) + def wrapper(self, *args, **kwargs): + # Check the socket state before method execution + if ( + # connection was closed correctly + self.substrate.websocket.sock is None + # connection has a broken pipe + or self.substrate.websocket.sock.getsockopt( + socket.SOL_SOCKET, socket.SO_ERROR + ) + != 0 + ): + logging.info("Reconnection substrate...") + self._get_substrate() + # Execute the method if the connection is active or after reconnecting + return func(self, *args, **kwargs) + + return wrapper diff --git a/tests/unit_tests/test_subtensor.py b/tests/unit_tests/test_subtensor.py index c651eaa57f..b8dfc3e81b 100644 --- a/tests/unit_tests/test_subtensor.py +++ b/tests/unit_tests/test_subtensor.py @@ -2315,3 +2315,39 @@ def test_get_remaining_arbitration_period_happy(subtensor, mocker): ) # if we change the methods logic in the future we have to be make sure the returned type is correct assert result == 1800 # 2000 - 200 + + +def test_connect_without_substrate(mocker): + """Ensure re-connection is called when using an alive substrate.""" + # Prep + fake_substrate = mocker.MagicMock() + fake_substrate.websocket.sock.getsockopt.return_value = 1 + mocker.patch.object( + subtensor_module, "SubstrateInterface", return_value=fake_substrate + ) + fake_subtensor = Subtensor() + spy_get_substrate = mocker.spy(Subtensor, "_get_substrate") + + # Call + _ = fake_subtensor.block + + # Assertions + assert spy_get_substrate.call_count == 1 + + +def test_connect_with_substrate(mocker): + """Ensure re-connection is non called when using an alive substrate.""" + # Prep + fake_substrate = mocker.MagicMock() + fake_substrate.websocket.sock.getsockopt.return_value = 0 + mocker.patch.object( + subtensor_module, "SubstrateInterface", return_value=fake_substrate + ) + fake_subtensor = Subtensor() + spy_get_substrate = mocker.spy(Subtensor, "_get_substrate") + + # Call + _ = fake_subtensor.block + + # Assertions + assert spy_get_substrate.call_count == 0