diff --git a/mixnet/mixnet.py b/mixnet/mixnet.py index fbc1ece9..caa75f47 100644 --- a/mixnet/mixnet.py +++ b/mixnet/mixnet.py @@ -2,20 +2,10 @@ import random from dataclasses import dataclass -from typing import List, TypeAlias +from typing import List -from cryptography.hazmat.primitives.asymmetric.x25519 import ( - X25519PrivateKey, - X25519PublicKey, -) -from pysphinx.node import Node - -from mixnet.bls import BlsPrivateKey, BlsPublicKey from mixnet.fisheryates import FisherYates - -NodeId: TypeAlias = BlsPublicKey -# 32-byte that represents an IP address and a port of a mix node. -NodeAddress: TypeAlias = bytes +from mixnet.node import MixNode @dataclass @@ -49,22 +39,6 @@ def choose_mixnode(self) -> MixNode: return random.choice(self.mix_nodes) -@dataclass -class MixNode: - identity_private_key: BlsPrivateKey - encryption_private_key: X25519PrivateKey - addr: NodeAddress - - def identity_public_key(self) -> BlsPublicKey: - return self.identity_private_key.get_g1() - - def encryption_public_key(self) -> X25519PublicKey: - return self.encryption_private_key.public_key() - - def sphinx_node(self) -> Node: - return Node(self.encryption_private_key, self.addr) - - @dataclass class MixnetTopology: layers: List[List[MixNode]] diff --git a/mixnet/node.py b/mixnet/node.py new file mode 100644 index 00000000..defd5b36 --- /dev/null +++ b/mixnet/node.py @@ -0,0 +1,174 @@ +from __future__ import annotations + +import queue +import threading +import time +from dataclasses import dataclass +from threading import Thread +from typing import Tuple, TypeAlias + +from cryptography.hazmat.primitives.asymmetric.x25519 import ( + X25519PrivateKey, + X25519PublicKey, +) +from pysphinx.node import Node +from pysphinx.sphinx import ( + Payload, + ProcessedFinalHopPacket, + ProcessedForwardHopPacket, + SphinxPacket, + UnknownHeaderTypeError, +) + +from mixnet.bls import BlsPrivateKey, BlsPublicKey +from mixnet.poisson import poisson_interval_sec + +NodeId: TypeAlias = BlsPublicKey +# 32-byte that represents an IP address and a port of a mix node. +NodeAddress: TypeAlias = bytes + +PacketQueue: TypeAlias = "queue.Queue[Tuple[NodeAddress, SphinxPacket]]" +PacketPayloadQueue: TypeAlias = ( + "queue.Queue[Tuple[NodeAddress, SphinxPacket | Payload]]" +) + + +@dataclass +class MixNode: + identity_private_key: BlsPrivateKey + encryption_private_key: X25519PrivateKey + addr: NodeAddress + + def identity_public_key(self) -> BlsPublicKey: + return self.identity_private_key.get_g1() + + def encryption_public_key(self) -> X25519PublicKey: + return self.encryption_private_key.public_key() + + def sphinx_node(self) -> Node: + return Node(self.encryption_private_key, self.addr) + + def start( + self, + delay_rate_per_min: int, + inbound_socket: PacketQueue, + outbound_socket: PacketPayloadQueue, + ) -> MixNodeRunner: + thread = MixNodeRunner( + self.encryption_private_key, + delay_rate_per_min, + inbound_socket, + outbound_socket, + ) + thread.daemon = True + thread.start() + return thread + + +class MixNodeRunner(Thread): + """ + Read SphinxPackets from inbound socket and spawn a thread for each packet to process it. + + This thread approximates a M/M/inf queue. + """ + + def __init__( + self, + encryption_private_key: X25519PrivateKey, + delay_rate_per_min: int, # Poisson rate parameter: mu + inbound_socket: PacketQueue, + outbound_socket: PacketPayloadQueue, + ): + super().__init__() + self.encryption_private_key = encryption_private_key + self.delay_rate_per_min = delay_rate_per_min + self.inbound_socket = inbound_socket + self.outbound_socket = outbound_socket + self.num_processing = AtomicInt(0) + + def run(self) -> None: + # Here in Python, this thread is implemented in synchronous manner. + # In the real implementation, consider implementing this in asynchronous if possible, + # to approximate a M/M/inf queue + while True: + _, packet = self.inbound_socket.get() + thread = MixNodePacketProcessor( + packet, + self.encryption_private_key, + self.delay_rate_per_min, + self.outbound_socket, + self.num_processing, + ) + thread.daemon = True + self.num_processing.add(1) + thread.start() + + def num_jobs(self) -> int: + """ + Return the number of packets that are being processed or still in the inbound socket. + + If this thread works as a M/M/inf queue completely, + the number of packets that are still in the inbound socket must be always 0. + """ + return self.num_processing.get() + self.inbound_socket.qsize() + + +class MixNodePacketProcessor(Thread): + """ + Process a single packet with a delay that follows exponential distribution, + and forward it to the next mix node or the mix destination + + This thread is a single server (worker) in a M/M/inf queue that MixNodeRunner approximates. + """ + + def __init__( + self, + packet: SphinxPacket, + encryption_private_key: X25519PrivateKey, + delay_rate_per_min: int, # Poisson rate parameter: mu + outbound_socket: PacketPayloadQueue, + num_processing: AtomicInt, + ): + super().__init__() + self.packet = packet + self.encryption_private_key = encryption_private_key + self.delay_rate_per_min = delay_rate_per_min + self.outbound_socket = outbound_socket + self.num_processing = num_processing + + def run(self) -> None: + delay_sec = poisson_interval_sec(self.delay_rate_per_min) + time.sleep(delay_sec) + + processed = self.packet.process(self.encryption_private_key) + match processed: + case ProcessedForwardHopPacket(): + self.outbound_socket.put( + (processed.next_node_address, processed.next_packet) + ) + case ProcessedFinalHopPacket(): + self.outbound_socket.put( + (processed.destination_node_address, processed.payload) + ) + case _: + raise UnknownHeaderTypeError + + self.num_processing.sub(1) + + +class AtomicInt: + def __init__(self, initial: int) -> None: + self.lock = threading.Lock() + self.value = initial + + def add(self, v: int): + with self.lock: + self.value += v + + def sub(self, v: int): + with self.lock: + self.value -= v + + def get(self) -> int: + with self.lock: + return self.value diff --git a/mixnet/poisson.py b/mixnet/poisson.py new file mode 100644 index 00000000..86a4b66a --- /dev/null +++ b/mixnet/poisson.py @@ -0,0 +1,13 @@ +import numpy + + +def poisson_interval_sec(rate_per_min: int) -> float: + # If events occur in a Poisson distribution with rate_per_min, + # the interval between events follows the exponential distribution + # with the rate_per_min (i.e. with the scale 1/rate_per_min). + interval_min = numpy.random.exponential(scale=1 / rate_per_min, size=1)[0] + return interval_min * 60 + + +def poisson_mean_interval_sec(rate_per_min: int) -> float: + return 1 / rate_per_min * 60 diff --git a/mixnet/test_node.py b/mixnet/test_node.py new file mode 100644 index 00000000..8783e9d7 --- /dev/null +++ b/mixnet/test_node.py @@ -0,0 +1,114 @@ +import queue +import threading +import time +from datetime import datetime +from typing import Tuple +from unittest import TestCase + +import numpy +import timeout_decorator +from cryptography.hazmat.primitives.asymmetric.x25519 import X25519PrivateKey +from pysphinx.sphinx import SphinxPacket + +from mixnet.bls import generate_bls +from mixnet.mixnet import Mixnet, MixnetTopology +from mixnet.node import MixNode, NodeAddress, PacketPayloadQueue, PacketQueue +from mixnet.packet import PacketBuilder +from mixnet.poisson import poisson_interval_sec, poisson_mean_interval_sec +from mixnet.utils import random_bytes + + +class TestMixNodeRunner(TestCase): + @timeout_decorator.timeout(180) + def test_mixnode_runner_emission_rate(self): + """ + Test if MixNodeRunner works as a M/M/inf queue. + + If inputs are arrived at Poisson rate `lambda`, + and if processing is delayed according to an exponential distribution with a rate `mu`, + the rate of outputs should be `lambda`. + """ + mixnet, topology = self.init() + inbound_socket: PacketQueue = queue.Queue() + outbound_socket: PacketPayloadQueue = queue.Queue() + + packet, route = PacketBuilder.real(b"msg", mixnet, topology).next() + + delay_rate_per_min = 30 # mu (= 2s delay on average) + # Start only the first mix node for testing + runner = route[0].start(delay_rate_per_min, inbound_socket, outbound_socket) + + # Send packets to the first mix node in a Poisson distribution + packet_count = 100 + emission_rate_per_min = 120 # lambda (= 2msg/sec) + sender = threading.Thread( + target=self.send_packets, + args=( + inbound_socket, + packet, + route[0].addr, + packet_count, + emission_rate_per_min, + ), + ) + sender.daemon = True + sender.start() + + # Calculate intervals between outputs and gather num_jobs in the first mix node. + intervals = [] + num_jobs = [] + ts = datetime.now() + for _ in range(packet_count): + _ = outbound_socket.get() + now = datetime.now() + intervals.append((now - ts).total_seconds()) + num_jobs.append(runner.num_jobs()) + ts = now + # Remove the first interval that would be much larger than other intervals, + # because of the delay in mix node. + intervals = intervals[1:] + num_jobs = num_jobs[1:] + + # Check if the emission rate of the first mix node is the same as + # the emission rate of the message sender, but with a delay. + # If outputs follow the Poisson distribution with a rate `lambda`, + # a mean interval between outputs must be `1/lambda`. + self.assertAlmostEqual( + float(numpy.mean(intervals)), + poisson_mean_interval_sec(emission_rate_per_min), + delta=1.0, + ) + # If runner is a M/M/inf queue, + # a mean number of jobs being processed/scheduled in the runner must be `lambda/mu`. + self.assertAlmostEqual( + float(numpy.mean(num_jobs)), + round(emission_rate_per_min / delay_rate_per_min), + delta=1.0, + ) + + @staticmethod + def send_packets( + inbound_socket: PacketQueue, + packet: SphinxPacket, + node_addr: NodeAddress, + cnt: int, + rate_per_min: int, + ): + for _ in range(cnt): + time.sleep(poisson_interval_sec(rate_per_min)) + inbound_socket.put((node_addr, packet)) + + @staticmethod + def init() -> Tuple[Mixnet, MixnetTopology]: + mixnet = Mixnet( + [ + MixNode( + generate_bls(), + X25519PrivateKey.generate(), + random_bytes(32), + ) + for _ in range(12) + ] + ) + topology = mixnet.build_topology(b"entropy", 3, 3) + return mixnet, topology diff --git a/mixnet/test_packet.py b/mixnet/test_packet.py index 676da7e1..e306e3a9 100644 --- a/mixnet/test_packet.py +++ b/mixnet/test_packet.py @@ -5,7 +5,8 @@ from pysphinx.sphinx import ProcessedFinalHopPacket, SphinxPacket from mixnet.bls import generate_bls -from mixnet.mixnet import Mixnet, MixnetTopology, MixNode +from mixnet.mixnet import Mixnet, MixnetTopology +from mixnet.node import MixNode from mixnet.packet import ( Fragment, MessageFlag, diff --git a/requirements.txt b/requirements.txt index 0013983a..81d737e2 100644 --- a/requirements.txt +++ b/requirements.txt @@ -6,4 +6,5 @@ pycparser==2.21 pysphinx==0.0.1 scipy==1.11.4 setuptools==69.0.3 +timeout-decorator==0.5.0 wheel==0.42.0