-
Notifications
You must be signed in to change notification settings - Fork 0
/
top_tokens.py
363 lines (287 loc) · 12.1 KB
/
top_tokens.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
#! Python3
"""
File name: top_tokens.py
Author: Jonathan Snow
Date created: 09/15/2022
Python Version: 3.9.x
File Details:
Purpose: A series of functions to pull information about Top-500 ERC20 Tokens.
NOTE: This is a WIP script. Not validated, tested, etc.
"""
# Imports
from time import time, sleep
from modules import data, eth, db, dexguru, apyvision, holders
import requests
import sys
DEBUG = False
BLOCK = eth.get_latest_block()
##################################################
def main():
"""
Function to aggregate information on the top 500 tokens on Ethereum
and grab general information about Chainlink Data Feeds, market cap,
24-hour volume, CEX availability, and more.
"""
print("\nStarting Token Analysis.")
output = []
i = 1
# Grab list of tokens from CMC and filter by Ethereum tokens
raw_tokens = fetch_tokens()
eth_tokens = process_tokens(raw_tokens)[:500]
# Fetch chainlink data feeds
chainlink_feeds = process_chainlink_feeds()
# Fetch current exchange pairs
exchange_tokens = get_exchange_tokens()
# Rework data
t_addresses = []
t_symbols = []
for t in eth_tokens:
t_address = eth.get_checksum(t["platform"]["token_address"])
t_addresses.append(t_address)
t_symbol = t["symbol"]
t_symbols.append(t_symbol)
# Process current liquidity data
liquidity_data = dexguru.scrape_async(t_addresses)
# Fetch apyvision token pair pools for provided T500 tokens
token_pools = apyvision.process(t_symbols)
# Get list of pool addresses
token_pool_addresses = data.flatten(list(token_pools.values()))
# Get holder data on pools (slow-mode)
pool_owners = holders.scrape_slow(token_pool_addresses, BLOCK)
# Determine token unique LP count
token_lp_count = get_lp_count(token_pools, pool_owners)
# Iterate over token list and prepare for data dump
for token in eth_tokens:
print("\rProcessing contract " + str(i) + " / " + str(len(eth_tokens)) + (20 * " "), end="", flush=True)
# Set up loop variables
address = eth.get_checksum(token["platform"]["token_address"])
token_symbol = token["symbol"].upper()
has_chainlink = False
usd_proxy = ""
eth_proxy = ""
# Fetch contract data from DB else process
contract_data = process_contract(address)
# Check if contract is verified on Etherscan
etherscan_verified = True if contract_data[1] == 1 else False
# Check if audited based on data from CMC
is_audited = token["isAudited"]
# Grab Chainlink oracle if present
if token_symbol in chainlink_feeds:
has_chainlink = True
usd_proxy = chainlink_feeds[token_symbol]["USD"]
eth_proxy = chainlink_feeds[token_symbol]["ETH"]
# Grab protocol data from JSON file
# If not available, run /utilities/depositable_assets.py
protocol_tokens = data.load_json("files/input/depositable_assets.json")
# Update variables if exists in dictionary else default value
on_aave = protocol_tokens[token_symbol]["on_aave"] if token_symbol in protocol_tokens else ""
on_compound = protocol_tokens[token_symbol]["on_compound"] if token_symbol in protocol_tokens else ""
on_maker = protocol_tokens[token_symbol]["on_maker"] if token_symbol in protocol_tokens else ""
on_yearn = protocol_tokens[token_symbol]["on_yearn"] if token_symbol in protocol_tokens else ""
# Get count of exchanges token is listed on else 0
exchange_count = 0 if token_symbol not in exchange_tokens else len(exchange_tokens[token_symbol]["exchanges"])
# Check if eligible for Chainlink Data Feed
volume_24h = token["quotes"][0]["volume24h"]
chainlink_eligible = exchange_count >= 3 and volume_24h > 3000000
# DEBUG code used to validate the exchange listings for a specific hardcoded token symbol
if DEBUG:
if token_symbol == "USDT":
print(exchange_tokens[token_symbol]["exchanges"])
# Grab token liquidity
tld = liquidity_data[address]["data"]
token_liquidity = tld[0]["liquidityUSD"] if len(tld) > 0 else "N/A"
# Determine the number of pools for token
pool_count = 0 if token_symbol not in token_pools else len(token_pools[token_symbol])
# Determine the number of LP providers for pools for token
lp_count = 0 if token_symbol not in token_lp_count else token_lp_count[token_symbol]
# Build token data list
token_data = [
token["name"], token["symbol"], address, etherscan_verified, is_audited, chainlink_eligible, has_chainlink,
usd_proxy, eth_proxy, token["quotes"][0]["marketCap"], volume_24h, exchange_count,
on_aave, on_compound, on_maker, on_yearn, token_liquidity, pool_count, lp_count
]
# Add to output data
output.append(token_data)
# Increment counter
i+=1
print("\nProcessed " + str(len(eth_tokens)) + " tokens.")
# Dump token data to CSV
output_path = "files/output"
output_name = "top_tokens_" + str(int(time())) + ".csv"
data.save(output, output_path, output_name,
["name", "symbol", "address", "etherscan_verified", "is_audited", "chainlink_eligible", "has_chainlink_feed",
"usd_proxy", "eth_proxy", "market_cap_usd", "24h_volume", "cex_count", "aave", "compound", "maker", "yearn",
"liquidity_dexguru", "pool_count", "lp_count"
])
##################################################
def fetch_tokens(limit=1000):
"""
Function to grab a list of tokens from CoinMarketCap
for further processing. Default page size is 1000.
"""
url = "https://api.coinmarketcap.com/data-api/v3/cryptocurrency/listing?" \
"start=1&limit=" + str(limit) + "&sortBy=market_cap&sortType=desc" \
"&convert=USD&cryptoType=tokens&tagType=all&audited=false"
token_data = get_data(url)["data"]["cryptoCurrencyList"]
return token_data
##################################################
def process_tokens(tokens):
"""
Function to process a raw list of tokens, and return a
list of all valid Ethereum tokens.
"""
output = []
# Loop through list and validate that the token is on Ethereum
for token in tokens:
# Check if ETH
if "platform" in token and token["platform"]["id"] == 1:
output.append(token)
return output
##################################################
def process_chainlink_feeds():
"""
Function to process the current list of all Chainlink
Data Feeds and convert this into token pair data.
"""
url = "https://cl-docs-addresses.web.app/addresses.json"
chainlink_all_data = get_data(url)["ethereum"]["networks"]
chainlink_raw_data = []
chainlink_output = []
output = {}
# Grab Ethereum Mainnet data only
for network in chainlink_all_data:
if network["name"] == "Ethereum Mainnet":
chainlink_raw_data.extend(network["proxies"])
break
# Process to grab only crypto feeds and add pair data
for feed in chainlink_raw_data:
if feed["feedCategory"] == "verified" and feed["feedType"] == "Crypto":
# Create output dictionary for modification
feed_output = feed
pair = feed["pair"].split(" / ")
# Add token and base
feed_output["token"] = pair[0]
feed_output["base"] = pair[1]
chainlink_output.append(feed_output)
"""
There is a possibility of token symbol key conflict. However, this
doesn't appear to be an issue in the top 500 tokens at this time.
"""
# Process data into token dictionary
for pair in chainlink_output:
# Add token to output dictionary
token = pair["token"].upper()
# Add key value generic if does not exist
if token not in output:
output[token] = {"USD": "", "ETH": ""}
# Check if has USD data feed
if pair["base"] == "USD":
output[token]["USD"] = pair["proxy"]
# Check if has ETH data feed
if pair["base"] == "ETH":
output[token]["ETH"] = pair["proxy"]
return output
##################################################
def get_data(url):
"""
Function to fetch data from URL and return JSON output.
"""
with requests.Session() as s:
response = s.get(url)
return response.json()
##################################################
def process_contract(address):
"""
Function to take a contract address and query the
DB for details or trigger a processing to add the
contract address to the DB.
"""
contract_details = db.get_contract(address)
# Check if user exists in the DB
if contract_details is None:
# Check if contract is verified on Etherscan
is_verified = eth.is_contract_verified(address)
# Pull contract deploy date
deploy_date = eth.get_contract_deploy_date(address)
# Add contract to the DB
db.add_contract(address, is_verified, deploy_date)
# Pull standardized data from DB
contract_details = db.get_contract(address)
return contract_details
##################################################
def get_exchange_tokens():
"""
Function to fetch a list of all top exchange pairs.
"""
output = {}
exchange_data = []
exchanges = [
"binance", "kraken", "ftx", "kucoin", "bitfinex", "gemini", "bybit", "coinbase-exchange",
"gate-io", "huobi-global", "bitstamp", "okx"
]
# Fetch raw data for token pairs on provided exchange slugs
for exchange in exchanges:
data = get_exchange_pairs(exchange)
exchange_data.extend(data)
# Clean up data to make it easier to query by token symbol
for pair in exchange_data:
# Create list of the two possible pair symbols
base_data = [pair["baseSymbol"], pair["quoteSymbol"]]
# Loop over two symbols (small loop to be DRY)
for base in base_data:
# Check if base in output list else setup
if base not in output:
output[base] = {}
output[base]["data"] = []
output[base]["exchanges"] = []
# Add data to pair dict
output[base]["data"].append(pair)
# Check if exchange in list already
if pair["exchangeSlug"] not in output[base]["exchanges"]:
output[base]["exchanges"].append(pair["exchangeSlug"])
return output
##################################################
def get_exchange_pairs(exchange):
"""
Function to fetch a list of all exchange pairs for
a provided exchange slug. Default page size is 500.
"""
output = []
start = 1
while True:
# Build URL and fetch data from CMC
url = "https://api.coinmarketcap.com/data-api/v3/exchange/market-pairs/latest?slug=" \
+ exchange + "&category=spot&start=" + str(start) + "&limit=500"
data = get_data(url)
pairs = data["data"]["marketPairs"]
# Break loop if no pairs are returned
if len(pairs) == 0: break
# Add pairs to output list
output.extend(pairs)
# Increment start value
start += 500
return output
##################################################
def get_lp_count(token_pools, pool_owners):
"""
Function to process pool ownership data and
return pool values for each token.
"""
output = {}
# Loop through token list
for key, value in token_pools.items():
pool_addresses = value
# Loop through related pools for token
lp_count = 0
for pool in pool_addresses:
# Get number of addresses with pool tokens else 0
unique_owners = 0 if pool not in pool_owners else pool_owners[pool]
# Increase LP count for token
lp_count += unique_owners
# Add token symbol to dict
output[key] = lp_count
return output
##################################################
# Runtime Entry Point
if __name__ == "__main__":
main()