diff --git a/requirements.txt b/requirements.txt index bf73a73..93176ca 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,2 +1,3 @@ aiohttp==3.8.4 graphql-core==2.3.1 +python-dotenv==1.0.0 diff --git a/src/airstack/constant.py b/src/airstack/constant.py index 6e0ab82..53fa242 100644 --- a/src/airstack/constant.py +++ b/src/airstack/constant.py @@ -3,6 +3,25 @@ Description: This module contains the constant used in airstack sdk. """ +from enum import Enum + + +class SocialsDappName(Enum): + LENS = "lens" + FARCASTER = "farcaster" + + +class TransferType(Enum): + SEND = "send" + RECEIVED = "received" + + +class ChainType(Enum): + ETH = "ethereum" + POLYGON = "polygon" + BASE = "base" + + class AirstackConstants: """Class for keeping constants """ diff --git a/src/airstack/execute_query.py b/src/airstack/execute_query.py index c15667e..67e56f2 100644 --- a/src/airstack/execute_query.py +++ b/src/airstack/execute_query.py @@ -15,17 +15,31 @@ has_cursor, RemoveQueryByStartingName, add_page_info_to_queries, - remove_unused_variables + remove_unused_variables, + format_poaps_data, + format_socials_followings_data, + format_socials_followers_data, + format_token_sent_data, + format_token_received_data, + format_eth_nft_data, + format_polygon_nft_data, + calculating_score ) from airstack.constant import AirstackConstants +# from airstack.popular_queries import ExecutePopularQueries +from airstack.constant import SocialsDappName, TransferType +from utility.custom_exception import AirstackException +from airstack.onchain_graph import ExecuteOnchainGraph warnings.filterwarnings("ignore", message="coroutine .* was never awaited") + class QueryResponse: """Class for generate the query response """ + def __init__(self, response, status_code, error, has_next_page=None, has_prev_page=None, - get_next_page=None, get_prev_page=None): + get_next_page=None, get_prev_page=None): """Init function Args: @@ -45,6 +59,7 @@ def __init__(self, response, status_code, error, has_next_page=None, has_prev_pa self.get_next_page = get_next_page self.get_prev_page = get_prev_page + class AirstackClient: """Class to create api client for airstack api's """ @@ -77,19 +92,14 @@ def create_execute_query_object(self, query=None, variables=None): object: execute query obiect """ execute_query = ExecuteQuery(query=query, variables=variables, url=self.url, - api_key=self.api_key, timeout=self.timeout) + api_key=self.api_key, timeout=self.timeout) return execute_query - def queries_object(self): - """Create popular query object for popular queries + def onchain_graph(self): + onchain_graph_object = ExecuteOnchainGraph( + create_execute_query_object=self.create_execute_query_object) + return onchain_graph_object - Returns: - object: execute popular query obiect - """ - from airstack.popular_queries import ExecutePopularQueries - execute_popular_query = ExecutePopularQueries(url=self.url,api_key=self.api_key, - timeout=self.timeout) - return execute_popular_query class ExecuteQuery: """Class to execute query functions @@ -97,6 +107,7 @@ class ExecuteQuery: Returns: object: object of execute query """ + def __init__(self, query=None, variables=None, url=None, api_key=None, timeout=None): self.deleted_queries = [] self.query = query @@ -157,19 +168,21 @@ async def execute_paginated_query(self, query=None, variables=None): query_response = await self.execute_query(query=query) if query_response.error is not None: return QueryResponse(None, query_response.status_code, query_response.error, - None, None, None, None) + None, None, None, None) page_info = {} for _key, value in query_response.data.items(): page_info[_key] = find_page_info(query_response.data[_key]) - has_next_page = any(page_info['nextCursor'] != '' for page_info in page_info.values()) - has_prev_page = any(page_info['prevCursor'] != '' for page_info in page_info.values()) + has_next_page = any(page_info['nextCursor'] + != '' for page_info in page_info.values()) + has_prev_page = any(page_info['prevCursor'] + != '' for page_info in page_info.values()) return QueryResponse(query_response.data, query_response.status_code, - query_response.error, has_next_page, has_prev_page, - self.get_next_page(query, variables, page_info), - self.get_prev_page(query, variables, page_info)) + query_response.error, has_next_page, has_prev_page, + self.get_next_page(query, variables, page_info), + self.get_prev_page(query, variables, page_info)) async def get_next_page(self, query, variables, page_info): """Async function to get the next page data. @@ -196,16 +209,16 @@ async def get_next_page(self, query, variables, page_info): visitor = RemoveQueryByStartingName(query_start=_page_info_key) document_ast = visit(document_ast, visitor) next_query = remove_unused_variables(document_ast=document_ast, - query=print_ast(document_ast)) + query=print_ast(document_ast)) else: if not stored: self.deleted_queries.append(None) if has_cursor(document_ast, _page_info_key): replace_cursor_value(document_ast, _page_info_key, - _page_info_value['nextCursor'], self.variables) + _page_info_value['nextCursor'], self.variables) else: add_cursor_to_input_field(document_ast, _page_info_key, - _page_info_value['nextCursor']) + _page_info_value['nextCursor']) next_query = print_ast(document_ast) return await self.execute_paginated_query(next_query, variables) @@ -231,9 +244,9 @@ async def get_prev_page(self, query, variables, page_info): document_ast = parse(next_query) if has_cursor(document_ast, _page_info_key): replace_cursor_value(document_ast, _page_info_key, - _page_info_value['prevCursor'], self.variables) + _page_info_value['prevCursor'], self.variables) else: add_cursor_to_input_field(document_ast, _page_info_key, - _page_info_value['prevCursor']) + _page_info_value['prevCursor']) next_query = print_ast(document_ast) return await self.execute_paginated_query(next_query, variables) diff --git a/src/airstack/generic.py b/src/airstack/generic.py index dbe2bf1..72fc795 100644 --- a/src/airstack/generic.py +++ b/src/airstack/generic.py @@ -7,6 +7,8 @@ from graphql.language.ast import Field, ObjectField, ObjectValue, StringValue, Name, Argument, FragmentSpread, InlineFragment, SelectionSet, Document from graphql.language.visitor import Visitor from graphql import parse, print_ast +from airstack.constant import SocialsDappName +from configurations.conf import default_score_map, BURNED_ADDRESSES def find_page_info(json_data): @@ -71,7 +73,7 @@ def add_cursor_to_input_field(ast, field_name, cursor_value): for selection in definition.selection_set.selections: if isinstance(selection, Field): if selection.name.value == field_name or (selection.alias and - selection.alias.value == field_name): + selection.alias.value == field_name): arguments = selection.arguments if not arguments: # If no arguments are present, create a new input argument @@ -79,7 +81,7 @@ def add_cursor_to_input_field(ast, field_name, cursor_value): argument = Argument( name=Name(value='input'), value=ObjectValue(fields=[create_object_field('cursor', - cursor_value)]) + cursor_value)]) ) selection.arguments.append(argument) else: @@ -90,10 +92,12 @@ def add_cursor_to_input_field(ast, field_name, cursor_value): for field in input_value.fields: if isinstance(field, ObjectField) and field.name.value == 'cursor': # If cursor field exists, replace its value - field.value = StringValue(value=cursor_value) + field.value = StringValue( + value=cursor_value) return # Exit the loop if cursor field is found # If cursor field is not found, add it to the input fields - cursor_field = create_object_field('cursor', cursor_value) + cursor_field = create_object_field( + 'cursor', cursor_value) input_value.fields.append(cursor_field) return # Exit the loop if input argument is found return @@ -124,10 +128,12 @@ def replace_cursor_value(ast, key, cursor_value, variables): if _replaced_value in variables.keys(): variables[_replaced_value] = cursor_value else: - field.value = StringValue(value=cursor_value) + field.value = StringValue( + value=cursor_value) return # Exit the loop if cursor field is found # If cursor field is not found, add it to the input fields - cursor_field = create_object_field('cursor', cursor_value) + cursor_field = create_object_field( + 'cursor', cursor_value) input_value.fields.append(cursor_field) return # Exit the loop if input argument is found @@ -155,6 +161,7 @@ def has_cursor(ast, key): return True return False + class RemoveQueryByStartingName(Visitor): """Class to remove queries from a multi-query that do not have next or prevCursor based on field names or aliases""" @@ -164,7 +171,8 @@ def __init__(self, query_start): def enter_OperationDefinition(self, node, key, parent, path, ancestors): if node.operation == 'query': selections = node.selection_set.selections - selections[:] = [selection for selection in selections if not self._should_remove_query(selection)] + selections[:] = [ + selection for selection in selections if not self._should_remove_query(selection)] if not selections: # Remove variables when there are no remaining selections node.variable_definitions = None @@ -181,6 +189,7 @@ def _should_remove_query(self, selection): return True return False + def add_page_info_to_queries(graphql_document): """Func to add page info to the graphql query @@ -194,10 +203,11 @@ def add_page_info_to_queries(graphql_document): modified_document = _add_page_info_to_queries(parsed_document) return print_ast(modified_document) + def _add_page_info_to_queries(node): if isinstance(node, Document): node.definitions = [_add_page_info_to_queries(definition) for - definition in node.definitions] + definition in node.definitions] elif isinstance(node, Field): if node.selection_set is None: node.selection_set = SelectionSet(selections=[]) @@ -210,9 +220,10 @@ def _add_page_info_to_queries(node): )) elif hasattr(node, "selection_set"): node.selection_set.selections = [_add_page_info_to_queries(selection) for - selection in node.selection_set.selections] + selection in node.selection_set.selections] return node + def remove_unused_variables(document_ast, query): """Func to remove unused variables from the query @@ -225,3 +236,409 @@ def remove_unused_variables(document_ast, query): if query.count(_variable.variable.name.value) == 1: del document_ast.definitions[0].variable_definitions[_count] return print_ast(document_ast) + + +def format_poaps_data(poaps, existing_user=None): + if existing_user is None: + existing_user = [] + + recommended_users = existing_user.copy() + for poap in poaps or []: + attendee = poap.get('attendee', {}) + poap_event = poap.get('poapEvent', {}) + event_id = poap.get('eventId') + + name = poap_event.get('eventName') + content_value = poap_event.get('contentValue', {}) + addresses = attendee.get('owner', {}).get('addresses', []) + + existing_user_index = -1 + for index, recommended_user in enumerate(recommended_users): + recommended_user_addresses = recommended_user.get('addresses', []) + if any(addr in recommended_user_addresses for addr in addresses): + existing_user_index = index + break + + image = content_value.get('image', {}).get( + 'extraSmall') if content_value.get('image') else None + + new_poap = { + 'name': name, + 'image': image, + 'eventId': event_id + } + + if existing_user_index != -1: + recommended_user = recommended_users[existing_user_index] + _addresses = set(recommended_user.get('addresses', [])) + _addresses.update(addresses) + recommended_user['addresses'] = list(_addresses) + + _poaps = recommended_user.get('poaps', []) + if event_id and all(poap['eventId'] != event_id for poap in _poaps): + _poaps.append(new_poap) + recommended_user['poaps'] = _poaps + else: + new_user = attendee.get('owner', {}) + new_user['poaps'] = [new_poap] + recommended_users.append(new_user) + + return recommended_users + + +def format_socials_followings_data(followings: list, dappName: SocialsDappName = SocialsDappName.LENS, existing_user: list = []) -> list: + recommended_users = existing_user.copy() + is_dappname_lens = dappName == SocialsDappName.LENS + for following in followings: + existing_user_index = -1 + for index, recommended_user in enumerate(recommended_users): + recommended_user_addresses = recommended_user.get('addresses', []) + if any(addr in recommended_user_addresses for addr in following.get('addresses', [])): + existing_user_index = index + break + + mutual_follower = following.get('mutualFollower', {}) + follower = mutual_follower.get( + 'Follower') if mutual_follower is not None else [] + follows_back = bool(follower[0]) if follower else False + + if existing_user_index != -1: + follows = recommended_users[existing_user_index].get('follows', {}) + recommended_users[existing_user_index] = { + **following, + **recommended_users[existing_user_index], + 'follows': { + **follows, + 'followingOnLens': True, + 'followedOnLens': follows_back + } if is_dappname_lens else { + **follows, + 'followingOnFarcaster': True, + 'followedOnFarcaster': follows_back + } + } + else: + recommended_users.append({ + **following, + 'follows': { + 'followingOnLens': True, + 'followedOnLens': follows_back + } if is_dappname_lens else { + 'followingOnFarcaster': True, + 'followedOnFarcaster': follows_back + } + }) + + return recommended_users + if existing_user is None: + existing_user = [] + + recommended_users = existing_user.copy() + for following in followings: + existing_user_index = -1 + for index, recommended_user in enumerate(recommended_users): + recommended_user_addresses = recommended_user.get('addresses', []) + if any(address in recommended_user_addresses for address in following.get('addresses', [])): + existing_user_index = index + break + + mutual_follower = following.get('mutualFollower', {}) + follower = mutual_follower.get( + 'Follower', []) if mutual_follower is not None else [] + follows_back = bool(follower[0]) if follower else False + + if existing_user_index != -1: + follows = recommended_users[existing_user_index].get('follows', {}) + recommended_users[existing_user_index].update({ + **following, + 'follows': { + **follows, + 'followingOnLens': True, + 'followedOnLens': follows_back + } + }) + else: + recommended_users.append({ + **following, + 'follows': { + 'followingOnLens': True, + 'followedOnLens': follows_back + } + }) + + return recommended_users + + +def format_socials_followers_data(followers: list, dappName: SocialsDappName = SocialsDappName.LENS, existing_user: list = []) -> list: + recommended_users = existing_user.copy() + is_dappname_lens = dappName == SocialsDappName.LENS + for follower in followers: + existing_user_index = -1 + for index, recommended_user in enumerate(recommended_users): + recommended_user_addresses = recommended_user.get('addresses', []) + if any(address in follower.get('addresses', []) for address in recommended_user_addresses): + existing_user_index = index + break + + following = bool(follower.get('mutualFollower', {}).get('Following')) + + if existing_user_index != -1: + follows = recommended_users[existing_user_index].get('follows', {}) + + follows['followedOnLens' if is_dappname_lens else 'followedOnFarcaster'] = True + follows['followingOnLens' if is_dappname_lens else 'followingOnFarcaster'] = follows.get( + 'followingOnFarcaster', False) or following + + recommended_users[existing_user_index].update({ + **follower, + 'follows': follows + }) + else: + recommended_users.append({ + **follower, + 'follows': { + 'followingOnLens': following, + 'followedOnLens': True + } if is_dappname_lens else { + 'followingOnFarcaster': following, + 'followedOnFarcaster': True + } + }) + + return recommended_users + + +def format_token_sent_data(data: list, recommended_users: list = []): + for transfer in data: + addresses = transfer.get('addresses', []) + existing_user_index = next((index for index, recommended_user in enumerate(recommended_users) + if any(address in recommended_user.get('addresses', []) for address in addresses)), -1) + + token_transfers = {'sent': True} + + if existing_user_index != -1: + existing_addresses = recommended_users[existing_user_index].get( + 'addresses', []) + unique_addresses = list(set(existing_addresses + addresses)) + recommended_users[existing_user_index]['addresses'] = unique_addresses + existing_token_transfers = recommended_users[existing_user_index].get( + 'tokenTransfers', {}) + recommended_users[existing_user_index]['tokenTransfers'] = { + **existing_token_transfers, **token_transfers} + else: + recommended_users.append( + {**transfer, 'tokenTransfers': token_transfers}) + + return recommended_users + + +def format_token_received_data(data: list, _recommended_users: list = []) -> list: + recommended_users = _recommended_users.copy() + + for transfer in data: + addresses = transfer.get('addresses', []) if transfer else [] + existing_user_index = -1 + + for index, recommended_user in enumerate(recommended_users): + recommended_user_addresses = recommended_user.get('addresses', []) + if any(address in recommended_user_addresses for address in addresses): + existing_user_index = index + break + + _token_transfers = {'received': True} + + if existing_user_index != -1: + _addresses = recommended_users[existing_user_index].get( + 'addresses', []) + new_addresses = list(set(_addresses + addresses)) + recommended_users[existing_user_index]['addresses'] = new_addresses + existing_token_transfers = recommended_users[existing_user_index].get( + 'tokenTransfers', {}) + recommended_users[existing_user_index]['tokenTransfers'] = { + **existing_token_transfers, **_token_transfers} + else: + new_user = transfer.copy() if transfer else {} + new_user['tokenTransfers'] = _token_transfers + recommended_users.append(new_user) + + return recommended_users + + +def format_eth_nft_data(data, _recommended_users=None): + if _recommended_users is None: + _recommended_users = [] + + recommended_users = _recommended_users.copy() + + for nft in data or []: + owner = nft.get('owner') if nft else {} + token = nft.get('token') if nft else {} + + name = token.get('name') + logo = token.get('logo', {}) + address = token.get('address') + token_nfts = token.get('tokenNfts', []) + addresses = owner.get('addresses', []) + token_nft = token_nfts[0] if len(token_nfts) > 0 else None + + existing_user_index = -1 + for index, recommended_user in enumerate(recommended_users): + recommended_user_addresses = recommended_user.get('addresses', []) + if any(addr in addresses for addr in recommended_user_addresses): + existing_user_index = index + break + + if existing_user_index != -1: + _addresses = recommended_users[existing_user_index].get( + 'addresses', []) + _addresses.extend(addresses) + _addresses = list(set(_addresses)) # Remove duplicates + recommended_users[existing_user_index]['addresses'] = _addresses + + _nfts = recommended_users[existing_user_index].get('nfts', []) + nft_exists = any(nft['address'] == address for nft in _nfts) + if not nft_exists: + _nfts.append({ + 'name': name, + 'image': logo.get('small'), + 'blockchain': 'ethereum', + 'address': address, + 'tokenNfts': token_nft + }) + recommended_users[existing_user_index]['nfts'] = _nfts + else: + recommended_users.append({ + **owner, + 'nfts': [{ + 'name': name, + 'image': logo.get('small'), + 'blockchain': 'ethereum', + 'address': address, + 'tokenNfts': token_nft + }] + }) + + return recommended_users + + +def format_polygon_nft_data(data, _recommended_users=None): + if _recommended_users is None: + _recommended_users = [] + + recommended_users = _recommended_users.copy() + + for nft in data or []: + owner = nft.get('owner', {}) + token = nft.get('token', {}) + + name = token.get('name') + logo = token.get('logo', {}) + address = token.get('address') + token_nfts = token.get('tokenNfts', []) + addresses = owner.get('addresses', []) + token_nft = token_nfts[0] if len(token_nfts) > 0 else None + + existing_user_index = -1 + for index, recommended_user in enumerate(recommended_users): + recommended_user_addresses = recommended_user.get('addresses', []) + if any(addr in recommended_user_addresses for addr in addresses): + existing_user_index = index + break + + if existing_user_index != -1: + _addresses = recommended_users[existing_user_index].get( + 'addresses', []) + _addresses.extend(addresses) + _addresses = list(set(_addresses)) # Remove duplicates + recommended_users[existing_user_index]['addresses'] = _addresses + + _nfts = recommended_users[existing_user_index].get('nfts', []) + nft_exists = any(nft['address'] == address for nft in _nfts) + if not nft_exists: + _nfts.append({ + 'name': name, + 'image': logo.get('small'), + 'blockchain': 'polygon', + 'address': address, + 'tokenNfts': token_nfts + }) + recommended_users[existing_user_index]['nfts'] = _nfts + else: + recommended_users.append({ + **owner, + 'nfts': [{ + 'name': name, + 'image': logo.get('small'), + 'blockchain': 'polygon', + 'address': address, + 'tokenNfts': token_nfts + }] + }) + + return recommended_users + + +def identity_map(users): + identity_dict = {} + for user in users: + # Assuming user is a dictionary and has an 'id' field of a hashable type (e.g., string or int) + user_id = user.get('id') + if user_id is not None: + identity_dict[user_id] = True + return identity_dict + + +def is_burned_address(address): + if not address: + return False + address = address.lower() + return address in BURNED_ADDRESSES + + +def calculating_score(user, score_map=None): + if score_map is None: + score_map = default_score_map + + identities = [user] + identity_dict = identity_map(identities) + + addresses = user.get('addresses', []) + domains = user.get('domains', []) + + # Ensure addresses is a list + if not isinstance(addresses, list): + addresses = [] + + # Ensure domains is a list of dictionaries + if domains is not None or not isinstance(domains, list) or not all(isinstance(domain, dict) for domain in domains): + domains = [] + + if any(address in identity_dict for address in addresses if address is not None) or \ + any(domain.get('name') in identity_dict for domain in domains if domain is not None) or \ + any(is_burned_address(address) for address in addresses if address is not None): + return None + + score = 0 + follows = user.get('follows', {}) + token_transfers = user.get('tokenTransfers', {}) + + for key in ['followingOnLens', 'followedOnLens', 'followingOnFarcaster', 'followedOnFarcaster']: + score += follows.get(key, 0) * score_map.get(key, 0) + + for key in ['sent', 'received']: + score += token_transfers.get(key, 0) * \ + score_map.get('token' + key.capitalize(), 0) + + unique_nfts = {f"{nft['address']}-{nft.get('tokenNfts', {}).get('tokenId')}" for nft in user.get( + 'nfts', []) if not is_burned_address(nft['address'])} + eth_nft_count = sum(1 for nft in unique_nfts if 'ethereum' in nft) + polygon_nft_count = sum(1 for nft in unique_nfts if 'polygon' in nft) + + score += (score_map['commonEthNfts'] * eth_nft_count) + \ + (score_map['commonPolygonNfts'] * polygon_nft_count) + + poaps = user.get('poaps', []) + score += score_map['commonPoaps'] * len(poaps) + + user['_score'] = score + return user diff --git a/src/airstack/onchain_graph.py b/src/airstack/onchain_graph.py new file mode 100644 index 0000000..b91985b --- /dev/null +++ b/src/airstack/onchain_graph.py @@ -0,0 +1,610 @@ +""" +Module: onchain_graph.py +Description: This module contains the methods of Onchain Graphs. +""" + +from airstack.constant import SocialsDappName, TransferType, ChainType +from airstack.generic import format_poaps_data, format_socials_followings_data, format_socials_followers_data, format_token_sent_data, format_token_received_data, format_eth_nft_data, format_polygon_nft_data, calculating_score +import traceback +from utility.custom_exception import AirstackException +import json + + +class ExecuteOnchainGraph(): + """Class to generate onchain graph data of a user + """ + + def __init__(self, create_execute_query_object=None) -> None: + """Init function + + Args: + create_execute_query_object (func): function to create execute query object from Airstack client class. + """ + self.create_execute_query_object = create_execute_query_object + + async def _fetch_poaps_data(self, address: str, existing_users: list = []) -> list: + """Async function to fetch all common POAP holders of a user + + Args: + address (str): user's address + existing_users (list, optional): Existing onchain graph data. Defaults to + None. + + Returns: + list: Concatenated list of existing onchain graph users with all common POAP holders + """ + try: + user_poaps_event_ids_query = """ + query MyQuery($user: Identity!) { + Poaps(input: {filter: {owner: {_eq: $user}}, blockchain: ALL}) { + Poap { + eventId + poapEvent { + isVirtualEvent + } + } + } + } + """ + + poaps_by_event_ids_query = """ + query MyQuery($eventIds: [String!]) { + Poaps(input: {filter: {eventId: {_in: $eventIds}}, blockchain: ALL}) { + Poap { + eventId + poapEvent { + eventName + contentValue { + image { + extraSmall + } + } + } + attendee { + owner { + addresses + domains { + name + isPrimary + } + socials { + dappName + blockchain + profileName + profileImage + profileTokenId + profileTokenAddress + } + xmtp { + isXMTPEnabled + } + } + } + } + } + } + """ + poaps_data_response = None + recommended_users = existing_users.copy() + while True: + if poaps_data_response is None: + execute_query_client = self.create_execute_query_object( + query=user_poaps_event_ids_query, variables={'user': address}) + # Pagination #1: Fetch All POAPs + poaps_data_response = await execute_query_client.execute_paginated_query() + + if poaps_data_response.error is None: + event_ids = [ + poap.get('eventId') + for poap in poaps_data_response.data.get('Poaps', {}).get('Poap', []) + if not poap.get('poapEvent', {}).get('isVirtualEvent') + ] if poaps_data_response.data and 'Poaps' in poaps_data_response.data and 'Poap' in poaps_data_response.data['Poaps'] else [] + poap_holders_data_response = None + while True: + if poap_holders_data_response is None: + execute_query_client = self.create_execute_query_object( + query=poaps_by_event_ids_query, variables={'eventIds': event_ids}) + # Pagination 2: Fetch all POAP Holders + poap_holders_data_response = await execute_query_client.execute_paginated_query() + + if poap_holders_data_response.error is None: + recommended_users = format_poaps_data( + poap_holders_data_response.data.get( + 'Poaps', {}).get('Poap', []), + recommended_users + ) + + if not poap_holders_data_response.has_next_page: + break + else: + poap_holders_data_response = await poap_holders_data_response.get_next_page + else: + raise AirstackException( + f"Error message {poap_holders_data_response.error}") + + if not poaps_data_response.has_next_page: + break + else: + poaps_data_response = await poaps_data_response.get_next_page + else: + raise AirstackException( + f"Error message {poaps_data_response.error}") + + return recommended_users + except Exception as e: + error = traceback.format_exc() + raise AirstackException(f"Error message {error}") + + async def _fetch_socials_followings(self, address: str, dapp_name: SocialsDappName = SocialsDappName.LENS, existing_users: list = []) -> list: + """Async function to fetch all social followings of a user on Lens or Farcaster + + Args: + address (str): user's address + existing_users (list, optional): Existing onchain graph data. Defaults to + None. + dapp_name (SocialsDappName, optional): Social dapp name. Defaults to SocialsDappName.LENS. + + Returns: + list: Concatenated list of existing onchain graph users with all social followings of a user or Lens or Farcaster + """ + try: + social_followings_query = """ + query MyQuery($user: Identity!, $dappName: SocialDappName!) { + SocialFollowings( + input: {filter: {identity: {_eq: $user}, dappName: {_eq: $dappName}}, blockchain: ALL, limit: 200} + ) { + Following { + followingAddress { + addresses + domains { + name + isPrimary + } + socials { + dappName + blockchain + profileName + profileImage + profileTokenId + profileTokenAddress + } + xmtp { + isXMTPEnabled + } + mutualFollower: socialFollowers( + input: {filter: {identity: {_eq: $user}, dappName: {_eq: $dappName}}} + ) { + Follower { + followerAddress { + socials { + profileName + } + } + } + } + } + } + } + } + """ + res = None + recommended_users = existing_users.copy() + while True: + if res is None: + execute_query_client = self.create_execute_query_object( + query=social_followings_query, variables={'user': address, 'dappName': dapp_name}) + res = await execute_query_client.execute_paginated_query() + + if res.error is None: + followings = [following['followingAddress'] for following in (res.data.get( + 'SocialFollowings', {}).get('Following', []) or []) if 'followingAddress' in following] + recommended_users = format_socials_followings_data( + followings, + dapp_name, + recommended_users + ) + + if not res.has_next_page: + break + else: + res = await res.get_next_page + else: + raise AirstackException( + f"Error message {res.error}") + + return recommended_users + except Exception as e: + error = traceback.format_exc() + raise AirstackException(f"Error message {error}") + + async def _fetch_socials_followers(self, address: str, dapp_name: SocialsDappName = SocialsDappName.LENS, existing_users: list = []) -> list: + """Async function to fetch all social followers of a user on Lens or Farcaster + + Args: + address (str): user's address + existing_users (list, optional): Existing onchain graph data. Defaults to + None. + dapp_name (SocialsDappName, optional): Social dapp name. Defaults to SocialsDappName.LENS. + + Returns: + list: Concatenated list of existing onchain graph users with all social followers of a user or Lens or Farcaster + """ + try: + social_followers_query = """ + query MyQuery($user: Identity!, $dappName: SocialDappName!) { + SocialFollowers( + input: {filter: {identity: {_eq: $user}, dappName: {_eq: $dappName}}, blockchain: ALL, limit: 200} + ) { + Follower { + followerAddress { + addresses + domains { + name + isPrimary + } + socials { + dappName + blockchain + profileName + profileImage + profileTokenId + profileTokenAddress + } + xmtp { + isXMTPEnabled + } + mutualFollowing: socialFollowings( + input: {filter: {identity: {_eq: $user}, dappName: {_eq: $dappName}}} + ) { + Following { + followingAddress { + socials { + profileName + } + } + } + } + } + } + } + } + """ + res = None + recommended_users = existing_users.copy() + while True: + if res is None: + execute_query_client = self.create_execute_query_object( + query=social_followers_query, variables={'user': address, 'dappName': dapp_name}) + res = await execute_query_client.execute_paginated_query() + + if res.error is None: + followings = [following['followerAddress'] for following in (res.data.get( + 'SocialFollowers', {}).get('Follower', []) or []) if 'followerAddress' in following] + recommended_users = format_socials_followers_data( + followings, + dapp_name, + recommended_users + ) + + if not res.has_next_page: + break + else: + res = await res.get_next_page + else: + raise AirstackException( + f"Error message {res.error}") + + return recommended_users + except Exception as e: + error = traceback.format_exc() + raise AirstackException(f"Error message {error}") + + async def _fetch_token_transfers(self, address: str, transfer_type: TransferType = TransferType.SEND, existing_users: list = []) -> list: + """Async function to fetch all the users that interacted in token transfer with the given user on Ethereum, Polygon, or Base + + Args: + address (str): user's address + existing_users (list, optional): Existing onchain graph data. Defaults to + None. + transfer_type (TransferType, optional): Transfer type (send or received). Defaults to TransferType.SEND. + + Returns: + list: Concatenated list of existing onchain graph users with all the users that interacted in token transfer with the given user on Ethereum, Polygon, or Base + """ + try: + isSend = transfer_type == TransferType.SEND + token_sent_query = """ + query MyQuery($user: Identity!) { + Ethereum: TokenTransfers( + input: {filter: {from: {_eq: $user}}, blockchain: ethereum, limit: 200} + ) { + TokenTransfer { + account:""" + ("from" if isSend else "to") + """{ + addresses + domains { + name + isPrimary + } + socials { + dappName + blockchain + profileName + profileImage + profileTokenId + profileTokenAddress + } + xmtp { + isXMTPEnabled + } + } + } + } + Polygon: TokenTransfers( + input: {filter: {from: {_eq: $user}}, blockchain: ethereum, limit: 200} + ) { + TokenTransfer { + account:""" + ("from" if isSend else "to") + """{ + addresses + domains { + name + isPrimary + } + socials { + dappName + blockchain + profileName + profileImage + profileTokenId + profileTokenAddress + } + xmtp { + isXMTPEnabled + } + } + } + } + Base: TokenTransfers( + input: {filter: {from: {_eq: $user}}, blockchain: base, limit: 200} + ) { + TokenTransfer { + account:""" + ("from" if isSend else "to") + """{ + addresses + domains { + name + isPrimary + } + socials { + dappName + blockchain + profileName + profileImage + profileTokenId + profileTokenAddress + } + xmtp { + isXMTPEnabled + } + } + } + } + } + """ + res = None + recommended_users = existing_users.copy() + while True: + if res is None: + execute_query_client = self.create_execute_query_object( + query=token_sent_query, variables={'user': address}) + res = await execute_query_client.execute_paginated_query() + + if res.error is None: + eth_data = [transfer['account'] for transfer in (res.data.get('Ethereum', {}).get( + 'TokenTransfer', []) if isinstance(res.data.get('Ethereum', {}).get('TokenTransfer', []), list) else [])] + polygon_data = [transfer['account'] for transfer in (res.data.get('Polygon', {}).get( + 'TokenTransfer', []) if isinstance(res.data.get('Polygon', {}).get('TokenTransfer', []), list) else [])] + base_data = [transfer['account'] for transfer in (res.data.get('Base', {}).get( + 'TokenTransfer', []) if isinstance(res.data.get('Base', {}).get('TokenTransfer', []), list) else [])] + token_transfer = eth_data + polygon_data + base_data + recommended_users = (format_token_sent_data if isSend else format_token_received_data)( + token_transfer, + recommended_users + ) + + if not res.has_next_page: + break + else: + res = await res.get_next_page + else: + raise AirstackException( + f"Error message {res.error}") + + return recommended_users + except Exception as e: + error = traceback.format_exc() + raise AirstackException(f"Error message {error}") + + async def _fetch_nft_data(self, address: str, existing_users: list = [], chain: ChainType = ChainType.ETH) -> list: + """Async function to fetch all the common NFT holders on Ethereum, Polygon, or Base + + Args: + address (str): user's address + existing_users (list, optional): Existing onchain graph data. Defaults to + None. + chain (ChainType, optional): Chain type. Defaults to ChainType.ETH. + + Returns: + list: Concatenated list of existing onchain graph users with all the common NFT holders on Ethereum, Polygon, or Base + """ + try: + nft_addresses_query = """ + query MyQuery($user: Identity!, $chain: TokenBlockchain!) { + TokenBalances(input: {filter: {tokenType: {_in: [ERC721]}, owner: {_eq: $user}}, blockchain: $chain, limit: 200}) { + TokenBalance { + tokenAddress + } + } + } + """ + + nft_query = """ + query MyQuery($tokenAddresses: [Address!], $chain: TokenBlockchain!) { + TokenBalances( + input: {filter: {tokenAddress: {_in: $tokenAddresses}, tokenType: {_in: [ERC721]}}, blockchain: $chain, limit: 200} + ) { + TokenBalance { + token { + name + address + tokenNfts { + tokenId + } + blockchain + logo { + small + } + } + owner { + addresses + domains { + name + isPrimary + } + socials { + dappName + blockchain + profileName + profileImage + profileTokenId + profileTokenAddress + } + xmtp { + isXMTPEnabled + } + } + } + } + } + """ + + nft_response = None + recommended_users = existing_users.copy() + while True: + if nft_response is None: + execute_query_client = self.create_execute_query_object( + query=nft_addresses_query, variables={'user': address, 'chain': chain}) + # Pagination #1: Fetch NFTs + nft_response = await execute_query_client.execute_paginated_query() + + if nft_response.error is None: + token_addresses = [token['tokenAddress'] for token in nft_response.data.get('TokenBalances', {}).get( + 'TokenBalance', [])] if nft_response.data and 'TokenBalances' in nft_response.data and 'TokenBalance' in nft_response.data['TokenBalances'] else [] + nft_holders_response = None + while True: + if nft_holders_response is None: + execute_query_client = self.create_execute_query_object( + query=nft_query, variables={'tokenAddresses': token_addresses, 'chain': chain}) + # Pagination #2: Fetch NFT Holders + nft_holders_response = await execute_query_client.execute_paginated_query() + + if nft_holders_response.error is None: + recommended_users = format_eth_nft_data( + nft_holders_response.data.get( + 'TokenBalances', {}).get('TokenBalance', []), + recommended_users + ) + + if not nft_holders_response.has_next_page: + break + else: + nft_holders_response = await nft_holders_response.get_next_page + else: + raise AirstackException( + f"Error message {nft_holders_response.error}") + + if not nft_response.has_next_page: + break + else: + nft_response = await nft_response.get_next_page + else: + raise AirstackException( + f"Error message {nft_response.error}") + + return recommended_users + except Exception as e: + error = traceback.format_exc() + raise AirstackException(f"Error message {error}") + + async def fetch_onchain_graph_data(self, address: str, score_and_sort_each_response: bool = True): + """Async function to fetch all the common NFT holders on Ethereum, Polygon, or Base + + Args: + address (str): user's address + existing_users (list, optional): Existing onchain graph data. Defaults to + None. + chain (ChainType, optional): Chain type. Defaults to ChainType.ETH. + + Returns: + list: Concatenated list of existing onchain graph users with all the common NFT holders on Ethereum, Polygon, or Base + """ + try: + recommended_users = [] + fetch_functions = [ + {'fct': self._fetch_poaps_data}, + {'fct': self._fetch_socials_followings, 'args': { + 'dapp_name': 'lens'}}, + {'fct': self._fetch_socials_followings, 'args': { + 'dapp_name': 'farcaster'}}, + {'fct': self._fetch_socials_followers, 'args': { + 'dapp_name': 'lens'}}, + {'fct': self._fetch_socials_followers, 'args': { + 'dapp_name': 'farcaster'}}, + {'fct': self._fetch_token_transfers, 'args': { + 'transfer_type': 'send'}}, + {'fct': self._fetch_token_transfers, 'args': { + 'transfer_type': 'received'}}, + {'fct': self._fetch_nft_data, 'args': { + 'chain': 'ethereum'}}, + {'fct': self._fetch_nft_data, 'args': { + 'chain': 'polygon'}}, + {'fct': self._fetch_nft_data, 'args': { + 'chain': 'base'}}, + ] + for func in fetch_functions: + recommended_users = await func.get('fct')(address=address, existing_users=recommended_users, **func.get('args', {})) + return recommended_users + # return recommended_users if score_and_sort_each_response else self._sort_by_score(self._calculate_score(recommended_users)) + except Exception: + error = traceback.format_exc() + raise AirstackException(f"Error message {error}") + + def _calculate_score(self, onchain_graph): + """Calculate all user score in the onchain graph + + Args: + onchain_graph (list): Existing onchain graph data. + + Returns: + list: Concatenated list of existing onchain graph users with all the common NFT holders on Ethereum, Polygon, or Base + """ + try: + onchain_graph_users_with_score = [ + calculating_score(user) for user in onchain_graph] + return onchain_graph_users_with_score + except Exception as e: + error = traceback.format_exc() + raise AirstackException(f"Error message {error}") + + def _sort_by_score(self, recommendations): + """Sort Onchain Graph result by the `_score` field + + Args: + onchain_graph (list): Existing onchain graph data. + + Returns: + list: Sorted list of onchain graph users + """ + try: + return sorted(recommendations, key=lambda x: x.get('_score', 0), reverse=True) + except Exception as e: + error = traceback.format_exc() + raise AirstackException(f"Error message {error}") diff --git a/src/configurations/conf.py b/src/configurations/conf.py new file mode 100644 index 0000000..d14a0e0 --- /dev/null +++ b/src/configurations/conf.py @@ -0,0 +1,30 @@ +import os +from dotenv import load_dotenv + +load_dotenv() + +DEFAULT_TOKEN_SENT = os.environ.get("DEFAULT_TOKEN_SENT", 10) +DEFAULT_TOKEN_RECEIVED = os.environ.get("DEFAULT_TOKEN_RECEIVED", 0) +DEFAULT_FOLLOWED_BY_ON_LENS = os.environ.get("DEFAULT_FOLLOWED_BY_ON_LENS", 5) +DEFAULT_FOLLOWING_ON_LENS = os.environ.get("DEFAULT_FOLLOWING_ON_LENS", 7) +DEFAULT_FOLLOWED_BY_ON_FARCASTER = os.environ.get( + "DEFAULT_FOLLOWED_BY_ON_FARCASTER", 5) +DEFAULT_FOLLOWING_ON_FARCASTER = os.environ.get( + "DEFAULT_FOLLOWING_ON_FARCASTER", 5) +DEFAULT_COMMON_POAPS = os.environ.get("DEFAULT_COMMON_POAPS", 7) +DEFAULT_COMMON_ETH_NFTS = os.environ.get("DEFAULT_COMMON_ETH_NFTS", 5) +DEFAULT_COMMON_POL_NFTS = os.environ.get("DEFAULT_COMMON_POL_NFTS", 5) +BURNED_ADDRESSES = os.environ.get("BURNED_ADDRESSES", [ + "0x0000000000000000000000000000000000000000", "0x000000000000000000000000000000000000dead"]) + +default_score_map = { + 'tokenSent': DEFAULT_TOKEN_SENT, + 'tokenReceived': DEFAULT_TOKEN_RECEIVED, + 'followedByOnLens': DEFAULT_FOLLOWED_BY_ON_LENS, + 'followingOnLens': DEFAULT_FOLLOWING_ON_LENS, + 'followedByOnFarcaster': DEFAULT_FOLLOWED_BY_ON_FARCASTER, + 'followingOnFarcaster': DEFAULT_FOLLOWING_ON_FARCASTER, + 'commonPoaps': DEFAULT_COMMON_POAPS, + 'commonEthNfts': DEFAULT_COMMON_ETH_NFTS, + 'commonPolygonNfts': DEFAULT_COMMON_POL_NFTS, +} diff --git a/src/example.py b/src/example.py index cb67685..202e0cd 100644 --- a/src/example.py +++ b/src/example.py @@ -1,9 +1,18 @@ +import os import asyncio from airstack.execute_query import AirstackClient +from dotenv import load_dotenv + +load_dotenv() async def main(): - api_client = AirstackClient(api_key='api_key') + api_key = os.environ.get("AIRSTACK_API_KEY") + if api_key is None: + print("Please set the AIRSTACK_API_KEY environment variable") + return + + api_client = AirstackClient(api_key=api_key) query1 = """ query MyQuery($name1: Address!, $name2: Address!) { @@ -35,13 +44,14 @@ async def main(): "name2": "0xf4eced2f682ce333f96f2d8966c613ded8fc95dd", } - execute_query_client = api_client.create_execute_query_object(query=query1, variables=variables1) + execute_query_client = api_client.create_execute_query_object( + query=query1, variables=variables1) query_response = await execute_query_client.execute_paginated_query() if query_response.has_next_page: next_page_response = await query_response.get_next_page if next_page_response.has_prev_page: prev_page_response = await next_page_response.get_prev_page - + query = """ query MyQuery($name1: Address!) { test1: TokenBalances( @@ -70,8 +80,9 @@ async def main(): print(f"Error: {query_response.error}") else: print(f"Response: {query_response.data}") - - execute_query_client = api_client.create_execute_query_object(query=query, variables=variables) + + execute_query_client = api_client.create_execute_query_object( + query=query, variables=variables) query_response = await execute_query_client.execute_query() # Process the next page response @@ -80,90 +91,89 @@ async def main(): else: print(f"Response: {query_response.data}") - # Example to use popular queries execute_query_client = api_client.queries_object() query_response = await execute_query_client.get_token_balances(variables={ - "identity": "vitalik.eth", - "tokenType": "ERC721", - "blockchain": "ethereum", - "limit": 20 + "identity": "vitalik.eth", + "tokenType": "ERC721", + "blockchain": "ethereum", + "limit": 20 }) query_response = await execute_query_client.get_token_details(variables={ - "address": "0x9340204616750cb61e56437befc95172c6ff6606", - "blockchain": "ethereum" + "address": "0x9340204616750cb61e56437befc95172c6ff6606", + "blockchain": "ethereum" }) query_response = await execute_query_client.get_nft_details(variables={ - "address": "0x9340204616750cb61e56437befc95172c6ff6606", - "blockchain": "ethereum", - "tokenId": "2" + "address": "0x9340204616750cb61e56437befc95172c6ff6606", + "blockchain": "ethereum", + "tokenId": "2" }) query_response = await execute_query_client.get_nfts(variables={ - "address": "0x9340204616750cb61e56437befc95172c6ff6606", - "blockchain": "ethereum", - "limit": 40 + "address": "0x9340204616750cb61e56437befc95172c6ff6606", + "blockchain": "ethereum", + "limit": 40 }) query_response = await execute_query_client.get_nft_images(variables={ - "address": "0x9340204616750cb61e56437befc95172c6ff6606", - "blockchain": "ethereum", - "tokenId": "2" + "address": "0x9340204616750cb61e56437befc95172c6ff6606", + "blockchain": "ethereum", + "tokenId": "2" }) query_response = await execute_query_client.get_wallet_ens_and_social(variables={ - "identity": "betashop.eth", - "blockchain": "ethereum" + "identity": "betashop.eth", + "blockchain": "ethereum" }) query_response = await execute_query_client.get_wallet_ens(variables={ - "identity": "betashop.eth", - "blockchain": "ethereum" + "identity": "betashop.eth", + "blockchain": "ethereum" }) query_response = await execute_query_client.get_balance_of_token(variables={ - "owner": "", - "blockchain": "ethereum", - "tokenAddress": "0x9340204616750cb61e56437befc95172c6ff6606" + "owner": "", + "blockchain": "ethereum", + "tokenAddress": "0x9340204616750cb61e56437befc95172c6ff6606" }) query_response = await execute_query_client.get_holders_of_collection(variables={ - "tokenAddress": "0x9340204616750cb61e56437befc95172c6ff6606", - "blockchain": "ethereum", - "limit": 30 + "tokenAddress": "0x9340204616750cb61e56437befc95172c6ff6606", + "blockchain": "ethereum", + "limit": 30 }) query_response = await execute_query_client.get_holders_of_nft(variables={ - "tokenAddress": "0x9340204616750cb61e56437befc95172c6ff6606", - "blockchain": "ethereum", - "tokenId": "1" + "tokenAddress": "0x9340204616750cb61e56437befc95172c6ff6606", + "blockchain": "ethereum", + "tokenId": "1" }) query_response = await execute_query_client.get_primary_ens(variables={ - "identity": "betashop.eth", - "blockchain": "ethereum" + "identity": "betashop.eth", + "blockchain": "ethereum" }) query_response = await execute_query_client.get_ens_subdomains(variables={ - "owner": "betashop.eth", - "blockchain": "ethereum" + "owner": "betashop.eth", + "blockchain": "ethereum" }) query_response = await execute_query_client.get_token_transfers(variables={ - "tokenAddress": "0x32e14d6f3dda2b95e505b14eb4552fd3eeaa1f0d", - "blockchain": "ethereum", - "limit": 30 + "tokenAddress": "0x32e14d6f3dda2b95e505b14eb4552fd3eeaa1f0d", + "blockchain": "ethereum", + "limit": 30 }) query_response = await execute_query_client.get_nft_transfers(variables={ - "tokenId": "1053", - "tokenAddress": "0x32e14d6f3dda2b95e505b14eb4552fd3eeaa1f0d", - "blockchain": "ethereum", - "limit": 30 + "tokenId": "1053", + "tokenAddress": "0x32e14d6f3dda2b95e505b14eb4552fd3eeaa1f0d", + "blockchain": "ethereum", + "limit": 30 }) asyncio.run(main()) diff --git a/src/example_onchain_graph.py b/src/example_onchain_graph.py new file mode 100644 index 0000000..04523d8 --- /dev/null +++ b/src/example_onchain_graph.py @@ -0,0 +1,20 @@ +import os +import asyncio +from airstack.execute_query import AirstackClient +from dotenv import load_dotenv + +load_dotenv() + + +async def main(): + api_key = os.environ.get("AIRSTACK_API_KEY") + if api_key is None: + print("Please set the AIRSTACK_API_KEY environment variable") + return + + api_client = AirstackClient(api_key=api_key) + onchain_graph_client = api_client.onchain_graph() + res = await onchain_graph_client.fetch_onchain_graph_data("yosephks.eth") + print(len(res), res if len(res) > 0 else "No data found") + +asyncio.run(main()) diff --git a/src/utility/custom_exception.py b/src/utility/custom_exception.py new file mode 100644 index 0000000..820bc5a --- /dev/null +++ b/src/utility/custom_exception.py @@ -0,0 +1,3 @@ +class AirstackException(Exception): + def __init__(self, message="") -> None: + super().__init__(message)