Skip to content
This repository has been archived by the owner on Jul 19, 2024. It is now read-only.

Commit

Permalink
Merge pull request #93 from CounterpartyXCP/develop
Browse files Browse the repository at this point in the history
Version 1.0.1
  • Loading branch information
Ouziel Slama committed Jan 23, 2015
2 parents e37e0a3 + b7a917f commit 0580c8a
Show file tree
Hide file tree
Showing 10 changed files with 67 additions and 43 deletions.
7 changes: 5 additions & 2 deletions docs/Modules.rst
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ want to run a process for every new block (but not when counterblockd is catchin
.. code-block:: python
@BlockProcessor.subscribe()
def my_custom_block_event():
if not (config.state['cpd_latest_block']['block_index'] - config.state['my_latest_block']['block_index']) == 1:
if not (config.state['cpd_latest_block_index'] - config.state['my_latest_block']['block_index']) == 1:
return
#Do stuff here
Expand All @@ -116,7 +116,10 @@ want to run a process for every new block (but not when counterblockd is catchin
print('counterblockd is now caught up to Counterpartyd!')
``RollbackProcessor`` runs whenever the ``counterblockd`` database is rolled back (either due to a blockchain
reorg, or an explicit rollback command being specified to ``counterblockd`` via the command line).
reorg, or an explicit rollback command being specified to ``counterblockd`` via the command line).

Note that if this processor runs and ``None`` is passed as ``max_block_index``, it means that there was a reparse of
all block data.

.. code-block:: python
@RollbackProcessor.subscribe()
Expand Down
45 changes: 25 additions & 20 deletions lib/blockfeed.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ def fuzzy_is_caught_up():
"""We don't want to give users 525 errors or login errors if counterblockd/counterpartyd is in the process of
getting caught up, but we DO if counterblockd is either clearly out of date with the blockchain, or reinitializing its database"""
return config.state['caught_up'] \
or ( config.state['cpd_backend_block_height']
and config.state['my_latest_block']['block_index'] >= config.state['cpd_backend_block_height'] - 1
or ( config.state['cpd_backend_block_index']
and config.state['my_latest_block']['block_index'] >= config.state['cpd_backend_block_index'] - 1
)

def process_cpd_blockfeed(zmq_publisher_eventfeed):
Expand All @@ -35,10 +35,10 @@ def process_cpd_blockfeed(zmq_publisher_eventfeed):

#initialize state
config.state['cur_block'] = {'block_index': 0, } #block being currently processed
config.state['cpd_latest_block'] = {'block_index': 0, } #last block that was successfully processed by counterparty
config.state['my_latest_block'] = {'block_index': 0 } #last block that was successfully processed by counterblockd
config.state['last_message_index'] = -1 #initialize (last processed message index)
config.state['cpd_backend_block_height'] = 0 #the latest block height as reported by the cpd blockchain backend
config.state['cpd_latest_block_index'] = 0 #last block that was successfully processed by counterparty
config.state['cpd_backend_block_index'] = 0 #the latest block height as reported by the cpd blockchain backend
config.state['caught_up_started_events'] = False
#^ set after we are caught up and start up the recurring events that depend on us being caught up with the blockchain

Expand Down Expand Up @@ -128,11 +128,11 @@ def parse_block(block_data):

logger.info("Block: %i of %i [message height=%s]" % (
config.state['my_latest_block']['block_index'],
config.state['cpd_backend_block_height'] \
if config.state['cpd_backend_block_height'] else '???',
config.state['cpd_backend_block_index'] \
if config.state['cpd_backend_block_index'] else '???',
config.state['last_message_index'] if config.state['last_message_index'] != -1 else '???'))

if config.state['cpd_latest_block']['block_index'] - cur_block_index < config.MAX_REORG_NUM_BLOCKS: #only when we are near the tip
if config.state['cpd_latest_block_index'] - cur_block_index < config.MAX_REORG_NUM_BLOCKS: #only when we are near the tip
clean_mempool_tx()

#grab our stored preferences, and rebuild the database if necessary
Expand Down Expand Up @@ -165,8 +165,13 @@ def parse_block(block_data):
#start polling counterpartyd for new blocks
while True:
if not autopilot or autopilot_runner == 0:
cpd_running_info = util.jsonrpc_api("get_running_info", abort_on_error=True)['result']

try:
cpd_running_info = util.jsonrpc_api("get_running_info", abort_on_error=True)['result']
except Exception, e:
logger.warn("Cannot contact counterpartyd get_running_info: %s" % e)
time.sleep(3)
continue

#wipe our state data if necessary, if counterpartyd has moved on to a new DB version
wipeState = False
updatePrefs = False
Expand Down Expand Up @@ -206,21 +211,21 @@ def parse_block(block_data):
config.state['caught_up'] = False #You've Come a Long Way, Baby

#work up to what block counterpartyd is at
config.state['cpd_latest_block'] = cpd_running_info['last_block']
config.state['cpd_backend_block_height'] = cpd_running_info['bitcoin_block_count']

if not config.state['cpd_latest_block']['block_index']:
config.state['cpd_latest_block_index'] = cpd_running_info['last_block']['block_index'] \
if isinstance(cpd_running_info['last_block'], dict) else cpd_running_info['last_block']
config.state['cpd_backend_block_index'] = cpd_running_info['bitcoin_block_count']
if not config.state['cpd_latest_block_index']:
logger.warn("counterpartyd has no last processed block (probably is reparsing or was just restarted)."
+ " Waiting 3 seconds before trying again...")
time.sleep(3)
continue
assert config.state['cpd_latest_block']['block_index']
if config.state['my_latest_block']['block_index'] < config.state['cpd_latest_block']['block_index']:
assert config.state['cpd_latest_block_index']
if config.state['my_latest_block']['block_index'] < config.state['cpd_latest_block_index']:
#need to catch up
config.state['caught_up'] = False

#Autopilot and autopilot runner are redundant
if config.state['cpd_latest_block']['block_index'] - config.state['my_latest_block']['block_index'] > 500: #we are safely far from the tip, switch to bulk-everything
if config.state['cpd_latest_block_index'] - config.state['my_latest_block']['block_index'] > 500: #we are safely far from the tip, switch to bulk-everything
autopilot = True
if autopilot_runner == 0:
autopilot_runner = 500
Expand All @@ -231,25 +236,25 @@ def parse_block(block_data):
cur_block_index = config.state['my_latest_block']['block_index'] + 1
try:
block_data = cache.get_block_info(cur_block_index,
min(100, (config.state['cpd_latest_block']['block_index'] - config.state['my_latest_block']['block_index'])))
min(100, (config.state['cpd_latest_block_index'] - config.state['my_latest_block']['block_index'])))
except Exception, e:
logger.warn(str(e) + " Waiting 3 seconds before trying again...")
time.sleep(3)
continue

# clean api cache
if config.state['cpd_latest_block']['block_index'] - cur_block_index <= config.MAX_REORG_NUM_BLOCKS: #only when we are near the tip
if config.state['cpd_latest_block_index'] - cur_block_index <= config.MAX_REORG_NUM_BLOCKS: #only when we are near the tip
cache.clean_block_cache(cur_block_index)

parse_block(block_data)

elif config.state['my_latest_block']['block_index'] > config.state['cpd_latest_block']['block_index']:
elif config.state['my_latest_block']['block_index'] > config.state['cpd_latest_block_index']:
# should get a reorg message. Just to be on the safe side, prune back MAX_REORG_NUM_BLOCKS blocks
# before what counterpartyd is saying if we see this
logger.error("Very odd: Ahead of counterpartyd with block indexes! Pruning back %s blocks to be safe."
% config.MAX_REORG_NUM_BLOCKS)
config.state['my_latest_block'] = database.rollback(
config.state['cpd_latest_block']['block_index'] - config.MAX_REORG_NUM_BLOCKS)
config.state['cpd_latest_block_index'] - config.MAX_REORG_NUM_BLOCKS)
else:
#...we may be caught up (to counterpartyd), but counterpartyd may not be (to the blockchain). And if it isn't, we aren't
config.state['caught_up'] = cpd_running_info['db_caught_up']
Expand Down
13 changes: 10 additions & 3 deletions lib/components/dex.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,9 @@ def calculate_price(base_quantity, quote_quantity, base_divisibility, quote_divi
return price

except Exception, e:
logging.exception(e)
decimal.setcontext(decimal.Context(prec=8, rounding=decimal.ROUND_HALF_EVEN))
return '0'
raise(e)

def get_pairs_with_orders(addresses=[], max_pairs=12):

Expand Down Expand Up @@ -269,7 +270,10 @@ def get_market_orders(asset1, asset2, addresses=[], supplies=None, min_fee_provi

if not exclude:
if order['give_asset'] == base_asset:
price = calculate_price(order['give_quantity'], order['get_quantity'], supplies[order['give_asset']][1], supplies[order['get_asset']][1], 'SELL')
try:
price = calculate_price(order['give_quantity'], order['get_quantity'], supplies[order['give_asset']][1], supplies[order['get_asset']][1], 'SELL')
except:
continue
market_order['type'] = 'SELL'
market_order['amount'] = order['give_remaining']
market_order['total'] = D(order['give_remaining']) * D(price)
Expand All @@ -280,7 +284,10 @@ def get_market_orders(asset1, asset2, addresses=[], supplies=None, min_fee_provi
else:
market_order['total'] = int(market_order['total'])
else:
price = calculate_price(order['get_quantity'], order['give_quantity'], supplies[order['get_asset']][1], supplies[order['give_asset']][1], 'BUY')
try:
price = calculate_price(order['get_quantity'], order['give_quantity'], supplies[order['get_asset']][1], supplies[order['give_asset']][1], 'BUY')
except:
continue
market_order['type'] = 'BUY'
market_order['total'] = order['give_remaining']
market_order['amount'] = D(order['give_remaining']) / D(price)
Expand Down
4 changes: 2 additions & 2 deletions lib/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
##
## CONSTANTS
##
VERSION = "1.5.0" #should keep up with the counterwallet version it works with (for now at least)
VERSION = "1.0.0" #should keep up with counterblockd repo's release tag

DB_VERSION = 22 #a db version increment will cause counterblockd to rebuild its database off of counterpartyd

Expand Down Expand Up @@ -466,4 +466,4 @@ def load_schemas():
def init(args):
init_data_dir(args)
load(args)
load_schemas()
load_schemas()
3 changes: 3 additions & 0 deletions lib/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,9 @@ def reset_db_state():
config.state['my_latest_block'] = {'block_index': 0}
config.state['last_message_index'] = -1

#call any rollback processors for any extension modules
RollbackProcessor.run_active_functions(None)

return app_config

def rollback(max_block_index):
Expand Down
2 changes: 1 addition & 1 deletion lib/modules/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ def reparse_timer_start():

@processor.CaughtUpProcessor.subscribe(priority=90, enabled=False)
def reparse_timer_stop():
msg = "Caught up To Blockchain" if config.state['caught_up'] else "Timer stopped at %i, Counterpartyd is at %i" %(config.state['my_latest_block']['block_index'], config.state['cpd_latest_block']['block_index'])
msg = "Caught up To Blockchain" if config.state['caught_up'] else "Timer stopped at %i, Counterpartyd is at %i" %(config.state['my_latest_block']['block_index'], config.state['cpd_latest_block_index'])
logger.warn("%s, time elapsed %s" %(msg, time.time() - config.state['timer']))

@processor.BlockProcessor.subscribe()
Expand Down
14 changes: 7 additions & 7 deletions lib/processor/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ def is_ready():
return {
'caught_up': blockfeed.fuzzy_is_caught_up(),
'last_message_index': config.state['last_message_index'],
'block_height': config.state['cpd_backend_block_height'],
'block_height': config.state['cpd_backend_block_index'],
'testnet': config.TESTNET,
'ip': ip,
'country': country,
Expand Down Expand Up @@ -87,7 +87,7 @@ def get_messagefeed_messages_by_index(message_indexes):
@API.add_method
def get_chain_block_height():
#DEPRECIATED 1.5
return config.state['cpd_backend_block_height']
return config.state['cpd_backend_block_index']

@API.add_method
def get_insight_block_info(block_hash):
Expand All @@ -107,7 +107,7 @@ def get_chain_address_info(addresses, with_uxtos=True, with_last_txn_hashes=4):
result = {}
result['addr'] = address
result['info'] = info
result['block_height'] = config.state['cpd_backend_block_height']
result['block_height'] = config.state['cpd_backend_block_index']
#^ yeah, hacky...it will be the same block height for each address (we do this to avoid an extra API call to get_block_height)
if with_uxtos:
result['uxtos'] = blockchain.listunspent(address)
Expand Down Expand Up @@ -345,11 +345,11 @@ def get_last_n_messages(count=100):
if count > 1000:
raise Exception("The count is too damn high")
message_indexes = range(max(config.state['last_message_index'] - count, 0) + 1, config.state['last_message_index'] + 1)
messages = util.call_jsonrpc_api("get_messages_by_index",
msgs = util.call_jsonrpc_api("get_messages_by_index",
{ 'message_indexes': message_indexes }, abort_on_error=True)['result']
for i in xrange(len(messages)):
messages[i] = messages.decorate_message_for_feed(messages[i])
return messages
for i in xrange(len(msgs)):
msgs[i] = messages.decorate_message_for_feed(msgs[i])
return msgs

@API.add_method
def get_raw_transactions(address, start_ts=None, end_ts=None, limit=500):
Expand Down
6 changes: 3 additions & 3 deletions lib/processor/messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ def handle_invalid(msg, msg_data):
#don't process invalid messages, but do forward them along to clients
status = msg_data.get('status', 'valid').lower()
if status.startswith('invalid'):
if config.state['cpd_latest_block']['block_index'] - config.state['my_latest_block']['block_index'] < config.MAX_REORG_NUM_BLOCKS:
if config.state['cpd_latest_block_index'] - config.state['my_latest_block']['block_index'] < config.MAX_REORG_NUM_BLOCKS:
#forward along via message feed, except while we're catching up
event = messages.decorate_message_for_feed(msg, msg_data=msg_data)
config.ZMQ_PUBLISHER_EVENTFEED.send_json(event)
Expand Down Expand Up @@ -65,7 +65,7 @@ def handle_reorg(msg, msg_data):
config.state['last_message_index'] = running_info['last_message_index']

#send out the message to listening clients (but don't forward along while we're catching up)
if config.state['cpd_latest_block']['block_index'] - config.state['my_latest_block']['block_index'] < config.MAX_REORG_NUM_BLOCKS:
if config.state['cpd_latest_block_index'] - config.state['my_latest_block']['block_index'] < config.MAX_REORG_NUM_BLOCKS:
msg_data['_last_message_index'] = config.state['last_message_index']
event = messages.decorate_message_for_feed(msg, msg_data=msg_data)
config.ZMQ_PUBLISHER_EVENTFEED.send_json(event)
Expand Down Expand Up @@ -192,7 +192,7 @@ def parse_for_socketio(msg, msg_data):
#if we're catching up beyond MAX_REORG_NUM_BLOCKS blocks out, make sure not to send out any socket.io
# events, as to not flood on a resync (as we may give a 525 to kick the logged in clients out, but we
# can't guarantee that the socket.io connection will always be severed as well??)
if config.state['cpd_latest_block']['block_index'] - config.state['my_latest_block']['block_index'] < config.MAX_REORG_NUM_BLOCKS:
if config.state['cpd_latest_block_index'] - config.state['my_latest_block']['block_index'] < config.MAX_REORG_NUM_BLOCKS:
#send out the message to listening clients
event = messages.decorate_message_for_feed(msg, msg_data=msg_data)
config.ZMQ_PUBLISHER_EVENTFEED.send_json(event)
Expand Down
14 changes: 10 additions & 4 deletions lib/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,9 +119,14 @@ def call_jsonrpc_api(method, params=None, endpoint=None, auth=None, abort_on_err
except Exception, e:
raise Exception("Got call_jsonrpc_api request error: %s" % e)
else:
if r.status_code != 200 and abort_on_error:
raise Exception("Bad status code returned: '%s'. result body: '%s'." % (r.status_code, r.read()))
result = json.loads(r.read())
if r.status_code != 200:
if abort_on_error:
raise Exception("Bad status code returned: '%s'. result body: '%s'." % (r.status_code, r.read()))
else:
logging.warning("Bad status code returned: '%s'. result body: '%s'." % (r.status_code, r.read()))
result = None
else:
result = json.loads(r.read())
finally:
client.close()

Expand Down Expand Up @@ -155,7 +160,8 @@ def get_url(url, abort_on_error=False, is_json=True, fetch_timeout=5, auth=None,
else:
if r.status_code != 200 and abort_on_error:
raise Exception("Bad status code returned: '%s'. result body: '%s'." % (r.status_code, r.read()))
result = json.loads(r.read()) if is_json else r.read()
result = r.read()
result = json.loads(result) if result and is_json else result
finally:
client.close()
return result
Expand Down
2 changes: 1 addition & 1 deletion pip-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ appdirs==1.4.0

prettytable==0.7.2

python-dateutil==2.3
python-dateutil==2.4.0

flask==0.10.1

Expand Down

0 comments on commit 0580c8a

Please sign in to comment.