Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Hello Shard! Intershard communication example app #429

Draft
wants to merge 16 commits into
base: dev
Choose a base branch
from
Draft
2 changes: 1 addition & 1 deletion .codecov.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ coverage:
threshold: 10%
patch:
default:
threshold: 50%
threshold: 95%

ignore:
- "docs/*"
Expand Down
107 changes: 88 additions & 19 deletions apps/asynchromix/asynchromix.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

from web3 import HTTPProvider, Web3
from web3.contract import ConciseContract
from web3.exceptions import TransactionNotFound

from apps.asynchromix.butterfly_network import iterated_butterfly_network

Expand All @@ -36,7 +37,10 @@

async def wait_for_receipt(w3, tx_hash):
while True:
tx_receipt = w3.eth.getTransactionReceipt(tx_hash)
try:
tx_receipt = w3.eth.getTransactionReceipt(tx_hash)
except TransactionNotFound:
tx_receipt = None
if tx_receipt is not None:
break
await asyncio.sleep(5)
Expand All @@ -48,8 +52,30 @@ async def wait_for_receipt(w3, tx_hash):
########


class AsynchromixClient(object):
class AsynchromixClient:
"""An Asynchromix client sends "masked" messages to an Ethereum contract.
...
"""

def __init__(self, sid, myid, send, recv, w3, contract, req_mask):
"""
Parameters
----------
sid: int
Session id.
myid: int
Client id.
send:
Function used to send messages. Not used?
recv:
Function used to receive messages. Not used?
w3:
Connection instance to an Ethereum node.
contract:
Contract instance on the Ethereum blockchain.
req_mask:
Function used to request an input mask from a server.
"""
self.sid = sid
self.myid = myid
self.contract = contract
Expand All @@ -62,7 +88,8 @@ async def _run(self):
contract_concise = ConciseContract(self.contract)
await asyncio.sleep(60) # give the servers a head start
# Client sends several batches of messages then quits
for epoch in range(1000):
# for epoch in range(1000):
for epoch in range(10):
logging.info(f"[Client] Starting Epoch {epoch}")
receipts = []
for i in range(32):
Expand Down Expand Up @@ -121,13 +148,13 @@ async def send_message(self, m):
# Step 3. Fetch the input mask from the servers
inputmask = await self._get_inputmask(inputmask_idx)
message = int.from_bytes(m.encode(), "big")
maskedinput = message + inputmask
maskedinput_bytes = self.w3.toBytes(hexstr=hex(maskedinput.value))
maskedinput_bytes = maskedinput_bytes.rjust(32, b"\x00")
masked_message = message + inputmask
masked_message_bytes = self.w3.toBytes(hexstr=hex(masked_message.value))
masked_message_bytes = masked_message_bytes.rjust(32, b"\x00")

# Step 4. Publish the masked input
tx_hash = self.contract.functions.submit_message(
inputmask_idx, maskedinput_bytes
inputmask_idx, masked_message_bytes
).transact({"from": self.w3.eth.accounts[0]})
tx_receipt = await wait_for_receipt(self.w3, tx_hash)

Expand All @@ -138,19 +165,42 @@ async def send_message(self, m):


class AsynchromixServer(object):
"""Asynchromix server class to ..."""

def __init__(self, sid, myid, send, recv, w3, contract):
"""
Parameters
----------
sid: int
Session id.
myid: int
Client id.
send:
Function used to send messages.
recv:
Function used to receive messages.
w3:
Connection instance to an Ethereum node.
contract:
Contract instance on the Ethereum blockchain.
"""
self.sid = sid
self.myid = myid
self.contract = contract
self.w3 = w3

self._task1a = asyncio.ensure_future(self._offline_inputmasks_loop())
self._task1a.add_done_callback(print_exception_callback)

self._task1b = asyncio.ensure_future(self._offline_mixes_loop())
self._task1b.add_done_callback(print_exception_callback)

self._task2 = asyncio.ensure_future(self._client_request_loop())
self._task2.add_done_callback(print_exception_callback)

self._task3 = asyncio.ensure_future(self._mixing_loop())
self._task3.add_done_callback(print_exception_callback)

self._task4 = asyncio.ensure_future(self._mixing_initiate_loop())
self._task4.add_done_callback(print_exception_callback)

Expand Down Expand Up @@ -182,7 +232,7 @@ async def join(self):
The bits and triples are consumed by each mixing epoch.

The input masks may be claimed at a different rate than
than the mixing epochs so they are replenished in a separate
the mixing epochs so they are replenished in a separate
task
"""

Expand Down Expand Up @@ -322,13 +372,13 @@ async def _mixing_loop(self):
# 3.b. Collect the inputs
inputs = []
for idx in range(epoch * K, (epoch + 1) * K):
# Get the public input
masked_input, inputmask_idx = contract_concise.input_queue(idx)
masked_input = field(int.from_bytes(masked_input, "big"))
# Get the input masks
# Get the public input (masked message)
masked_message_bytes, inputmask_idx = contract_concise.input_queue(idx)
masked_message = field(int.from_bytes(masked_message_bytes, "big"))
# Get the input mask
inputmask = self._inputmasks[inputmask_idx]

m_share = masked_input - inputmask
m_share = masked_message - inputmask
inputs.append(m_share)

# 3.c. Collect the preprocessing
Expand All @@ -349,18 +399,28 @@ async def prog(ctx):
pp_elements._init_data_dir()

# Overwrite triples and one_minus_ones
for kind, elems in zip(("triples", "one_minus_one"), (triples, bits)):
logging.info("overwriting triples and one_minus_ones")
for kind, elems in zip(("triples", "one_minus_ones"), (triples, bits)):
if kind == "triples":
elems = flatten_lists(elems)
elems = [e.value for e in elems]

mixin = pp_elements.mixins[kind]
# mixin = pp_elements.mixins[kind]
mixin = getattr(pp_elements, f"_{kind}")
mixin_filename = mixin.build_filename(ctx.N, ctx.t, ctx.myid)
logging.info(
f"writing preprocessed {kind} to file {mixin_filename}"
)
logging.info(f"number of elements is: {len(elems)}")
mixin._write_preprocessing_file(
mixin_filename, ctx.t, ctx.myid, elems, append=False
)

pp_elements._init_mixins()
# FIXME Not sure what this is supposed to be ...
# the method does not exist.
# pp_elements._init_mixins()
pp_elements._triples._refresh_cache()
pp_elements._one_minus_ones._refresh_cache()

logging.info(f"[{ctx.myid}] Running permutation network")
inps = list(map(ctx.Share, inputs))
Expand Down Expand Up @@ -419,9 +479,17 @@ async def _mixing_initiate_loop(self):
await asyncio.sleep(5)

# Step 4.b. Call initiate mix
tx_hash = self.contract.functions.initiate_mix().transact(
{"from": self.w3.eth.accounts[0]}
)
try:
tx_hash = self.contract.functions.initiate_mix().transact(
{"from": self.w3.eth.accounts[0]}
)
except ValueError as err:
logging.info("\n")
logging.info(79 * "*")
logging.info(err)
logging.info(79 * "*")
logging.info("\n")
continue
tx_receipt = await wait_for_receipt(self.w3, tx_hash)
rich_logs = self.contract.events.MixingEpochInitiated().processReceipt(
tx_receipt
Expand Down Expand Up @@ -484,6 +552,7 @@ async def main_loop(w3):
]

# Step 3. Create the client
# TODO communicate with server instead of fetching from list of servers
async def req_mask(i, idx):
# client requests input mask {idx} from server {i}
return servers[i]._inputmasks[idx]
Expand Down
Loading