From c8d2e37963f3221a45c0d77f4e5342562ab096d0 Mon Sep 17 00:00:00 2001 From: Dmitri Tsumak Date: Sun, 26 Dec 2021 10:53:40 +0200 Subject: [PATCH 1/6] Fix oracle timeout --- oracle/oracle/clients.py | 16 ++++++++-------- oracle/oracle/main.py | 2 ++ 2 files changed, 10 insertions(+), 8 deletions(-) diff --git a/oracle/oracle/clients.py b/oracle/oracle/clients.py index 8d429f3..d7c64e4 100644 --- a/oracle/oracle/clients.py +++ b/oracle/oracle/clients.py @@ -24,31 +24,31 @@ aws_secret_access_key=AWS_SECRET_ACCESS_KEY, ) -# set default timeout to 5 minutes -DEFAULT_TIMEOUT = 5 * 60 +# set default GQL query execution timeout to 30 seconds +EXECUTE_TIMEOUT = 30 @backoff.on_exception(backoff.expo, Exception, max_time=300) async def execute_sw_gql_query(query: DocumentNode, variables: Dict) -> Dict: """Executes GraphQL query.""" - transport = AIOHTTPTransport(url=STAKEWISE_SUBGRAPH_URL, timeout=DEFAULT_TIMEOUT) - async with Client(transport=transport) as session: + transport = AIOHTTPTransport(url=STAKEWISE_SUBGRAPH_URL) + async with Client(transport=transport, execute_timeout=EXECUTE_TIMEOUT) as session: return await session.execute(query, variable_values=variables) @backoff.on_exception(backoff.expo, Exception, max_time=300) async def execute_uniswap_v3_gql_query(query: DocumentNode, variables: Dict) -> Dict: """Executes GraphQL query.""" - transport = AIOHTTPTransport(url=UNISWAP_V3_SUBGRAPH_URL, timeout=DEFAULT_TIMEOUT) - async with Client(transport=transport) as session: + transport = AIOHTTPTransport(url=UNISWAP_V3_SUBGRAPH_URL) + async with Client(transport=transport, execute_timeout=EXECUTE_TIMEOUT) as session: return await session.execute(query, variable_values=variables) @backoff.on_exception(backoff.expo, Exception, max_time=300) async def execute_ethereum_gql_query(query: DocumentNode, variables: Dict) -> Dict: """Executes GraphQL query.""" - transport = AIOHTTPTransport(url=ETHEREUM_SUBGRAPH_URL, timeout=DEFAULT_TIMEOUT) - async with Client(transport=transport) as session: + transport = AIOHTTPTransport(url=ETHEREUM_SUBGRAPH_URL) + async with Client(transport=transport, execute_timeout=EXECUTE_TIMEOUT) as session: return await session.execute(query, variable_values=variables) diff --git a/oracle/oracle/main.py b/oracle/oracle/main.py index 94a63f0..0ac96ee 100644 --- a/oracle/oracle/main.py +++ b/oracle/oracle/main.py @@ -65,12 +65,14 @@ async def main() -> None: ) # check stakewise graphql connection + logger.info("Checking connection to graph node...") await get_finalized_block() # aiohttp session session = aiohttp.ClientSession() # check ETH2 API connection + logger.info("Checking connection to ETH2 node...") await get_finality_checkpoints(session) # check whether oracle is part of the oracles set From 91136399ecb458ebfb02e2b65a606ec65d7b44f2 Mon Sep 17 00:00:00 2001 From: Dmitri Tsumak Date: Sun, 26 Dec 2021 11:52:52 +0200 Subject: [PATCH 2/6] Fix oracle timeout --- oracle/oracle/clients.py | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/oracle/oracle/clients.py b/oracle/oracle/clients.py index d7c64e4..2b5d1c9 100644 --- a/oracle/oracle/clients.py +++ b/oracle/oracle/clients.py @@ -3,7 +3,7 @@ import backoff import boto3 import ipfshttpclient -from aiohttp import ClientSession +from aiohttp import ClientSession, ClientTimeout from gql import Client from gql.transport.aiohttp import AIOHTTPTransport from graphql import DocumentNode @@ -26,12 +26,17 @@ # set default GQL query execution timeout to 30 seconds EXECUTE_TIMEOUT = 30 +AIOHTTP_SESSION_ARGS = { + "timeout": ClientTimeout(total=None, sock_connect=5, sock_read=5) +} @backoff.on_exception(backoff.expo, Exception, max_time=300) async def execute_sw_gql_query(query: DocumentNode, variables: Dict) -> Dict: """Executes GraphQL query.""" - transport = AIOHTTPTransport(url=STAKEWISE_SUBGRAPH_URL) + transport = AIOHTTPTransport( + url=STAKEWISE_SUBGRAPH_URL, client_session_args=AIOHTTP_SESSION_ARGS + ) async with Client(transport=transport, execute_timeout=EXECUTE_TIMEOUT) as session: return await session.execute(query, variable_values=variables) @@ -39,7 +44,9 @@ async def execute_sw_gql_query(query: DocumentNode, variables: Dict) -> Dict: @backoff.on_exception(backoff.expo, Exception, max_time=300) async def execute_uniswap_v3_gql_query(query: DocumentNode, variables: Dict) -> Dict: """Executes GraphQL query.""" - transport = AIOHTTPTransport(url=UNISWAP_V3_SUBGRAPH_URL) + transport = AIOHTTPTransport( + url=UNISWAP_V3_SUBGRAPH_URL, client_session_args=AIOHTTP_SESSION_ARGS + ) async with Client(transport=transport, execute_timeout=EXECUTE_TIMEOUT) as session: return await session.execute(query, variable_values=variables) @@ -47,7 +54,9 @@ async def execute_uniswap_v3_gql_query(query: DocumentNode, variables: Dict) -> @backoff.on_exception(backoff.expo, Exception, max_time=300) async def execute_ethereum_gql_query(query: DocumentNode, variables: Dict) -> Dict: """Executes GraphQL query.""" - transport = AIOHTTPTransport(url=ETHEREUM_SUBGRAPH_URL) + transport = AIOHTTPTransport( + url=ETHEREUM_SUBGRAPH_URL, client_session_args=AIOHTTP_SESSION_ARGS + ) async with Client(transport=transport, execute_timeout=EXECUTE_TIMEOUT) as session: return await session.execute(query, variable_values=variables) From 9f49657273c9dbaa07fecd6970c8f01dd578695f Mon Sep 17 00:00:00 2001 From: Dmitri Tsumak Date: Sun, 26 Dec 2021 12:29:16 +0200 Subject: [PATCH 3/6] Fix oracle timeout --- oracle/oracle/clients.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/oracle/oracle/clients.py b/oracle/oracle/clients.py index 2b5d1c9..3ec5f8a 100644 --- a/oracle/oracle/clients.py +++ b/oracle/oracle/clients.py @@ -26,9 +26,7 @@ # set default GQL query execution timeout to 30 seconds EXECUTE_TIMEOUT = 30 -AIOHTTP_SESSION_ARGS = { - "timeout": ClientTimeout(total=None, sock_connect=5, sock_read=5) -} +AIOHTTP_SESSION_ARGS = {"timeout": ClientTimeout(total=None)} @backoff.on_exception(backoff.expo, Exception, max_time=300) From 6655cf4fc2fecdc11a9b546b74b115733565e7ac Mon Sep 17 00:00:00 2001 From: Dmitri Tsumak Date: Sun, 26 Dec 2021 13:49:05 +0200 Subject: [PATCH 4/6] Fix gql queries execution --- oracle/oracle/clients.py | 37 ++++++++++++++++++++++--------------- 1 file changed, 22 insertions(+), 15 deletions(-) diff --git a/oracle/oracle/clients.py b/oracle/oracle/clients.py index 3ec5f8a..4559259 100644 --- a/oracle/oracle/clients.py +++ b/oracle/oracle/clients.py @@ -1,9 +1,10 @@ +import asyncio from typing import Any, Dict, List, Union import backoff import boto3 import ipfshttpclient -from aiohttp import ClientSession, ClientTimeout +from aiohttp import ClientSession, client_exceptions from gql import Client from gql.transport.aiohttp import AIOHTTPTransport from graphql import DocumentNode @@ -26,35 +27,41 @@ # set default GQL query execution timeout to 30 seconds EXECUTE_TIMEOUT = 30 -AIOHTTP_SESSION_ARGS = {"timeout": ClientTimeout(total=None)} @backoff.on_exception(backoff.expo, Exception, max_time=300) async def execute_sw_gql_query(query: DocumentNode, variables: Dict) -> Dict: """Executes GraphQL query.""" - transport = AIOHTTPTransport( - url=STAKEWISE_SUBGRAPH_URL, client_session_args=AIOHTTP_SESSION_ARGS - ) - async with Client(transport=transport, execute_timeout=EXECUTE_TIMEOUT) as session: - return await session.execute(query, variable_values=variables) + transport = AIOHTTPTransport(url=STAKEWISE_SUBGRAPH_URL) + return await _execute_gql_query(transport, query, variables) @backoff.on_exception(backoff.expo, Exception, max_time=300) async def execute_uniswap_v3_gql_query(query: DocumentNode, variables: Dict) -> Dict: """Executes GraphQL query.""" - transport = AIOHTTPTransport( - url=UNISWAP_V3_SUBGRAPH_URL, client_session_args=AIOHTTP_SESSION_ARGS - ) - async with Client(transport=transport, execute_timeout=EXECUTE_TIMEOUT) as session: - return await session.execute(query, variable_values=variables) + transport = AIOHTTPTransport(url=UNISWAP_V3_SUBGRAPH_URL) + return await _execute_gql_query(transport, query, variables) @backoff.on_exception(backoff.expo, Exception, max_time=300) async def execute_ethereum_gql_query(query: DocumentNode, variables: Dict) -> Dict: """Executes GraphQL query.""" - transport = AIOHTTPTransport( - url=ETHEREUM_SUBGRAPH_URL, client_session_args=AIOHTTP_SESSION_ARGS - ) + transport = AIOHTTPTransport(url=ETHEREUM_SUBGRAPH_URL) + return await _execute_gql_query(transport, query, variables) + + +async def _execute_gql_query( + transport: AIOHTTPTransport, query: DocumentNode, variables: Dict +): + for _ in range(2): + try: + async with Client( + transport=transport, execute_timeout=EXECUTE_TIMEOUT + ) as session: + return await session.execute(query, variable_values=variables) + except (asyncio.exceptions.TimeoutError, client_exceptions.ServerTimeoutError): + await asyncio.sleep(3) + async with Client(transport=transport, execute_timeout=EXECUTE_TIMEOUT) as session: return await session.execute(query, variable_values=variables) From 3ea139face2e9f9685f170d50bbcffa1541cf7c5 Mon Sep 17 00:00:00 2001 From: Dmitri Tsumak Date: Sun, 26 Dec 2021 14:21:21 +0200 Subject: [PATCH 5/6] Fix gql queries execution --- oracle/oracle/clients.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/oracle/oracle/clients.py b/oracle/oracle/clients.py index 4559259..2bb8342 100644 --- a/oracle/oracle/clients.py +++ b/oracle/oracle/clients.py @@ -7,6 +7,7 @@ from aiohttp import ClientSession, client_exceptions from gql import Client from gql.transport.aiohttp import AIOHTTPTransport +from gql.transport.exceptions import TransportServerError from graphql import DocumentNode from oracle.oracle.settings import ( @@ -59,7 +60,11 @@ async def _execute_gql_query( transport=transport, execute_timeout=EXECUTE_TIMEOUT ) as session: return await session.execute(query, variable_values=variables) - except (asyncio.exceptions.TimeoutError, client_exceptions.ServerTimeoutError): + except ( + asyncio.exceptions.TimeoutError, + client_exceptions.ServerTimeoutError, + TransportServerError, + ): await asyncio.sleep(3) async with Client(transport=transport, execute_timeout=EXECUTE_TIMEOUT) as session: From 90bcfebfb3cbd365273d05c31cfb9fe60f4d56b0 Mon Sep 17 00:00:00 2001 From: Dmitri Tsumak Date: Mon, 27 Dec 2021 15:18:34 +0200 Subject: [PATCH 6/6] Reduce verbosity for retry calls --- oracle/oracle/clients.py | 41 ++++++++++++++-------------------------- 1 file changed, 14 insertions(+), 27 deletions(-) diff --git a/oracle/oracle/clients.py b/oracle/oracle/clients.py index 2bb8342..897db57 100644 --- a/oracle/oracle/clients.py +++ b/oracle/oracle/clients.py @@ -1,13 +1,12 @@ -import asyncio +import logging from typing import Any, Dict, List, Union import backoff import boto3 import ipfshttpclient -from aiohttp import ClientSession, client_exceptions +from aiohttp import ClientSession from gql import Client from gql.transport.aiohttp import AIOHTTPTransport -from gql.transport.exceptions import TransportServerError from graphql import DocumentNode from oracle.oracle.settings import ( @@ -26,47 +25,35 @@ aws_secret_access_key=AWS_SECRET_ACCESS_KEY, ) +gql_logger = logging.getLogger("gql_logger") +gql_handler = logging.StreamHandler() +gql_logger.addHandler(gql_handler) +gql_logger.setLevel(logging.ERROR) + # set default GQL query execution timeout to 30 seconds EXECUTE_TIMEOUT = 30 -@backoff.on_exception(backoff.expo, Exception, max_time=300) +@backoff.on_exception(backoff.expo, Exception, max_time=300, logger=gql_logger) async def execute_sw_gql_query(query: DocumentNode, variables: Dict) -> Dict: """Executes GraphQL query.""" transport = AIOHTTPTransport(url=STAKEWISE_SUBGRAPH_URL) - return await _execute_gql_query(transport, query, variables) + async with Client(transport=transport, execute_timeout=EXECUTE_TIMEOUT) as session: + return await session.execute(query, variable_values=variables) -@backoff.on_exception(backoff.expo, Exception, max_time=300) +@backoff.on_exception(backoff.expo, Exception, max_time=300, logger=gql_logger) async def execute_uniswap_v3_gql_query(query: DocumentNode, variables: Dict) -> Dict: """Executes GraphQL query.""" transport = AIOHTTPTransport(url=UNISWAP_V3_SUBGRAPH_URL) - return await _execute_gql_query(transport, query, variables) + async with Client(transport=transport, execute_timeout=EXECUTE_TIMEOUT) as session: + return await session.execute(query, variable_values=variables) -@backoff.on_exception(backoff.expo, Exception, max_time=300) +@backoff.on_exception(backoff.expo, Exception, max_time=300, logger=gql_logger) async def execute_ethereum_gql_query(query: DocumentNode, variables: Dict) -> Dict: """Executes GraphQL query.""" transport = AIOHTTPTransport(url=ETHEREUM_SUBGRAPH_URL) - return await _execute_gql_query(transport, query, variables) - - -async def _execute_gql_query( - transport: AIOHTTPTransport, query: DocumentNode, variables: Dict -): - for _ in range(2): - try: - async with Client( - transport=transport, execute_timeout=EXECUTE_TIMEOUT - ) as session: - return await session.execute(query, variable_values=variables) - except ( - asyncio.exceptions.TimeoutError, - client_exceptions.ServerTimeoutError, - TransportServerError, - ): - await asyncio.sleep(3) - async with Client(transport=transport, execute_timeout=EXECUTE_TIMEOUT) as session: return await session.execute(query, variable_values=variables)