Skip to content

Commit

Permalink
Mixnet: integrate all the pieces together (#57)
Browse files Browse the repository at this point in the history
  • Loading branch information
youngjoon-lee authored Feb 5, 2024
1 parent b1ffb4d commit fe7d47c
Show file tree
Hide file tree
Showing 14 changed files with 560 additions and 413 deletions.
21 changes: 21 additions & 0 deletions mixnet/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# Mixnet Specification

This is the executable specification of Mixnet, which can be used as a networking layer of the Nomos network.

![](structure.png)

## Public Components

- [`mixnet.py`](mixnet.py): A public interface of the Mixnet layer, which can be used by upper layers
- [`robustness.py`](robustness.py): A public interface of the Robustness layer, which can be on top of the Mixnet layer and used by upper layers

## Private Components

There are two primary components in the Mixnet layer.

- [`client.py`](client.py): A mix client interface, which splits a message into Sphinx packets, sends packets to mix nodes, and receives messages via gossip. Also, this emits cover packets periodically.
- [`node.py`](node.py): A mix node interface, which receives Sphinx packets from other mix nodes, processes packets, and forwards packets to other mix nodes. This works only when selected by the topology construction.

Each component receives a new topology from the Robustness layer.

There is no interaction between mix client and mix node components.
180 changes: 114 additions & 66 deletions mixnet/client.py
Original file line number Diff line number Diff line change
@@ -1,76 +1,124 @@
from __future__ import annotations

import asyncio
from contextlib import suppress
from typing import Self

from mixnet.mixnet import Mixnet
from mixnet.config import MixnetConfig
from mixnet.node import PacketQueue
from mixnet.packet import PacketBuilder
from mixnet.poisson import poisson_interval_sec


async def mixclient_emitter(
mixnet: Mixnet,
emission_rate_per_min: int, # Poisson rate parameter: lambda in the spec
redundancy: int, # b in the spec
real_packet_queue: PacketQueue,
outbound_socket: PacketQueue,
):
"""
Emit packets at the Poisson emission_rate_per_min.
If a real packet is scheduled to be sent, this thread sends the real packet to the mixnet,
and schedules redundant real packets to be emitted in the next turns.
If no real packet is not scheduled, this thread emits a cover packet according to the emission_rate_per_min.
"""

redundant_real_packet_queue: PacketQueue = asyncio.Queue()

emission_notifier_queue = asyncio.Queue()
_ = asyncio.create_task(
emission_notifier(emission_rate_per_min, emission_notifier_queue)
)

while True:
# Wait until the next emission time
_ = await emission_notifier_queue.get()
try:
await emit(
mixnet,
redundancy,
real_packet_queue,
redundant_real_packet_queue,
outbound_socket,
class MixClient:
__config: MixnetConfig

__real_packet_queue: PacketQueue
__outbound_socket: PacketQueue
__task: asyncio.Task # A reference just to prevent task from being garbage collected

@classmethod
async def new(
cls,
config: MixnetConfig,
) -> Self:
self = cls()
self.__config = config
self.__real_packet_queue = asyncio.Queue()
self.__outbound_socket = asyncio.Queue()
self.__task = asyncio.create_task(self.__run())
return self

def set_config(self, config: MixnetConfig) -> None:
"""
Replace the old config with the new config received
In real implementations, this method may be integrated in a long-running task.
Here in the spec, this method has been simplified as a setter, assuming the single-thread test environment.
"""
self.__config = config

def get_config(self) -> MixnetConfig:
return self.__config

async def send_message(self, msg: bytes) -> None:
packets_and_routes = PacketBuilder.build_real_packets(
msg, self.__config.topology
)
for packet, route in packets_and_routes:
await self.__real_packet_queue.put((route[0].addr, packet))

def subscribe_messages(self) -> "asyncio.Queue[bytes]":
"""
Subscribe messages, which went through mix nodes and were broadcasted via gossip
"""
return asyncio.Queue()

@property
def outbound_socket(self) -> PacketQueue:
return self.__outbound_socket

async def __run(self):
"""
Emit packets at the Poisson emission_rate_per_min.
If a real packet is scheduled to be sent, this thread sends the real packet to the mixnet,
and schedules redundant real packets to be emitted in the next turns.
If no real packet is not scheduled, this thread emits a cover packet according to the emission_rate_per_min.
"""

redundant_real_packet_queue: PacketQueue = asyncio.Queue()

emission_notifier_queue = asyncio.Queue()
_ = asyncio.create_task(
self.__emission_notifier(
self.__config.emission_rate_per_min, emission_notifier_queue
)
finally:
# Python convention: indicate that the previously enqueued task has been processed
emission_notifier_queue.task_done()


async def emit(
mixnet: Mixnet,
redundancy: int, # b in the spec
real_packet_queue: PacketQueue,
redundant_real_packet_queue: PacketQueue,
outbound_socket: PacketQueue,
):
if not redundant_real_packet_queue.empty():
addr, packet = redundant_real_packet_queue.get_nowait()
await outbound_socket.put((addr, packet))
return

if not real_packet_queue.empty():
addr, packet = real_packet_queue.get_nowait()
# Schedule redundant real packets
for _ in range(redundancy - 1):
redundant_real_packet_queue.put_nowait((addr, packet))
await outbound_socket.put((addr, packet))

packet, route = PacketBuilder.drop_cover(b"drop cover", mixnet).next()
await outbound_socket.put((route[0].addr, packet))


async def emission_notifier(emission_rate_per_min: int, queue: asyncio.Queue):
while True:
await asyncio.sleep(poisson_interval_sec(emission_rate_per_min))
queue.put_nowait(None)
)

while True:
# Wait until the next emission time
_ = await emission_notifier_queue.get()
try:
await self.__emit(self.__config.redundancy, redundant_real_packet_queue)
finally:
# Python convention: indicate that the previously enqueued task has been processed
emission_notifier_queue.task_done()

async def __emit(
self,
redundancy: int, # b in the spec
redundant_real_packet_queue: PacketQueue,
):
if not redundant_real_packet_queue.empty():
addr, packet = redundant_real_packet_queue.get_nowait()
await self.__outbound_socket.put((addr, packet))
return

if not self.__real_packet_queue.empty():
addr, packet = self.__real_packet_queue.get_nowait()
# Schedule redundant real packets
for _ in range(redundancy - 1):
redundant_real_packet_queue.put_nowait((addr, packet))
await self.__outbound_socket.put((addr, packet))

packets_and_routes = PacketBuilder.build_drop_cover_packets(
b"drop cover", self.__config.topology
)
# We have a for loop here, but we expect that the total num of packets is 1
# because the dummy message is short.
for packet, route in packets_and_routes:
await self.__outbound_socket.put((route[0].addr, packet))

async def __emission_notifier(
self, emission_rate_per_min: int, queue: asyncio.Queue
):
while True:
await asyncio.sleep(poisson_interval_sec(emission_rate_per_min))
queue.put_nowait(None)

async def cancel(self) -> None:
self.__task.cancel()
with suppress(asyncio.CancelledError):
await self.__task
65 changes: 65 additions & 0 deletions mixnet/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
from __future__ import annotations

import random
from dataclasses import dataclass
from typing import List, TypeAlias

from cryptography.hazmat.primitives.asymmetric.x25519 import (
X25519PrivateKey,
X25519PublicKey,
)
from pysphinx.node import Node

from mixnet.bls import BlsPrivateKey, BlsPublicKey


@dataclass
class MixnetConfig:
emission_rate_per_min: int # Poisson rate parameter: lambda
redundancy: int
delay_rate_per_min: int # Poisson rate parameter: mu
topology: MixnetTopology


@dataclass
class MixnetTopology:
# In production, this can be a 1-D array, which is accessible by indexes.
# Here, we use a 2-D array for readability.
layers: List[List[MixNodeInfo]]

def generate_route(self, mix_destination: MixNodeInfo) -> list[MixNodeInfo]:
"""
Generate a mix route for a Sphinx packet.
The pre-selected mix_destination is used as a last mix node in the route,
so that associated packets can be merged together into a original message.
"""
route = [random.choice(layer) for layer in self.layers[:-1]]
route.append(mix_destination)
return route

def choose_mix_destination(self) -> MixNodeInfo:
"""
Choose a mix node from the last mix layer as a mix destination
that will reconstruct a message from Sphinx packets.
"""
return random.choice(self.layers[-1])


# 32-byte that represents an IP address and a port of a mix node.
NodeAddress: TypeAlias = bytes


@dataclass
class MixNodeInfo:
identity_private_key: BlsPrivateKey
encryption_private_key: X25519PrivateKey
addr: NodeAddress

def identity_public_key(self) -> BlsPublicKey:
return self.identity_private_key.get_g1()

def encryption_public_key(self) -> X25519PublicKey:
return self.encryption_private_key.public_key()

def sphinx_node(self) -> Node:
return Node(self.encryption_private_key, self.addr)
89 changes: 35 additions & 54 deletions mixnet/mixnet.py
Original file line number Diff line number Diff line change
@@ -1,70 +1,51 @@
from __future__ import annotations

import random
from dataclasses import dataclass
from typing import List
import asyncio
from typing import Self

from cryptography.hazmat.primitives.asymmetric.x25519 import (
X25519PrivateKey,
)

from mixnet.client import MixClient
from mixnet.config import MixnetConfig
from mixnet.node import MixNode


class Mixnet:
__topology: MixnetTopology | None = None

def get_topology(self) -> MixnetTopology:
if self.__topology is None:
raise RuntimeError("topology is not set yet")
return self.__topology

def set_topology(self, topology: MixnetTopology) -> None:
"""
Replace the old topology with the new topology received, and start establishing new network connections in background.
__mixclient: MixClient
__mixnode: MixNode

In real implementations, this method should be a long-running task, accepting topologies periodically.
Here in the spec, this method has been simplified as a setter, assuming the single-thread test environment.
"""
self.__topology = topology
self.__establish_connections()
@classmethod
async def new(
cls,
encryption_private_key: X25519PrivateKey,
config: MixnetConfig,
) -> Self:
self = cls()
self.__mixclient = await MixClient.new(config)
self.__mixnode = await MixNode.new(encryption_private_key, config)
return self

def __establish_connections(self) -> None:
"""
Establish network connections in advance based on the topology received.
async def publish_message(self, msg: bytes) -> None:
await self.__mixclient.send_message(msg)

This is just a preparation to forward subsequent packets as quickly as possible,
but this is not a strict requirement.
def subscribe_messages(self) -> "asyncio.Queue[bytes]":
return self.__mixclient.subscribe_messages()

In real implementations, this should be a background task.
def set_config(self, config: MixnetConfig) -> None:
"""
pass

Replace the old config with the new config received.
@dataclass
class MixnetTopology:
# In production, this can be a 1-D array, which is accessible by indexes.
# Here, we use a 2-D array for readability.
layers: List[List[MixNode]]

def generate_route(self, mix_destination: MixNode) -> list[MixNode]:
"""
Generate a mix route for a Sphinx packet.
The pre-selected mix_destination is used as a last mix node in the route,
so that associated packets can be merged together into a original message.
"""
route = [random.choice(layer) for layer in self.layers[:-1]]
route.append(mix_destination)
return route

def choose_mix_destination(self) -> MixNode:
"""
Choose a mix node from the last mix layer as a mix destination
that will reconstruct a message from Sphinx packets.
In real implementations, this method should be a long-running task, accepting configs periodically.
Here in the spec, this method has been simplified as a setter, assuming the single-thread test environment.
"""
return random.choice(self.layers[-1])

self.__mixclient.set_config(config)
self.__mixnode.set_config(config)

@dataclass
class MixnetTopologySize:
num_layers: int
num_mixnodes_per_layer: int
def get_config(self) -> MixnetConfig:
return self.__mixclient.get_config()

def num_total_mixnodes(self) -> int:
return self.num_layers * self.num_mixnodes_per_layer
async def cancel(self) -> None:
await self.__mixclient.cancel()
await self.__mixnode.cancel()
Loading

0 comments on commit fe7d47c

Please sign in to comment.