Skip to content

Commit

Permalink
Mixnet: Packet delay in mix node (#49)
Browse files Browse the repository at this point in the history
  • Loading branch information
youngjoon-lee authored Jan 23, 2024
1 parent 1fc319d commit d963d6c
Show file tree
Hide file tree
Showing 6 changed files with 306 additions and 29 deletions.
30 changes: 2 additions & 28 deletions mixnet/mixnet.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,10 @@

import random
from dataclasses import dataclass
from typing import List, TypeAlias
from typing import List

from cryptography.hazmat.primitives.asymmetric.x25519 import (
X25519PrivateKey,
X25519PublicKey,
)
from pysphinx.node import Node

from mixnet.bls import BlsPrivateKey, BlsPublicKey
from mixnet.fisheryates import FisherYates

NodeId: TypeAlias = BlsPublicKey
# 32-byte that represents an IP address and a port of a mix node.
NodeAddress: TypeAlias = bytes
from mixnet.node import MixNode


@dataclass
Expand Down Expand Up @@ -49,22 +39,6 @@ def choose_mixnode(self) -> MixNode:
return random.choice(self.mix_nodes)


@dataclass
class MixNode:
identity_private_key: BlsPrivateKey
encryption_private_key: X25519PrivateKey
addr: NodeAddress

def identity_public_key(self) -> BlsPublicKey:
return self.identity_private_key.get_g1()

def encryption_public_key(self) -> X25519PublicKey:
return self.encryption_private_key.public_key()

def sphinx_node(self) -> Node:
return Node(self.encryption_private_key, self.addr)


@dataclass
class MixnetTopology:
layers: List[List[MixNode]]
Expand Down
174 changes: 174 additions & 0 deletions mixnet/node.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
from __future__ import annotations

import queue
import threading
import time
from dataclasses import dataclass
from threading import Thread
from typing import Tuple, TypeAlias

from cryptography.hazmat.primitives.asymmetric.x25519 import (
X25519PrivateKey,
X25519PublicKey,
)
from pysphinx.node import Node
from pysphinx.sphinx import (
Payload,
ProcessedFinalHopPacket,
ProcessedForwardHopPacket,
SphinxPacket,
UnknownHeaderTypeError,
)

from mixnet.bls import BlsPrivateKey, BlsPublicKey
from mixnet.poisson import poisson_interval_sec

NodeId: TypeAlias = BlsPublicKey
# 32-byte that represents an IP address and a port of a mix node.
NodeAddress: TypeAlias = bytes

PacketQueue: TypeAlias = "queue.Queue[Tuple[NodeAddress, SphinxPacket]]"
PacketPayloadQueue: TypeAlias = (
"queue.Queue[Tuple[NodeAddress, SphinxPacket | Payload]]"
)


@dataclass
class MixNode:
identity_private_key: BlsPrivateKey
encryption_private_key: X25519PrivateKey
addr: NodeAddress

def identity_public_key(self) -> BlsPublicKey:
return self.identity_private_key.get_g1()

def encryption_public_key(self) -> X25519PublicKey:
return self.encryption_private_key.public_key()

def sphinx_node(self) -> Node:
return Node(self.encryption_private_key, self.addr)

def start(
self,
delay_rate_per_min: int,
inbound_socket: PacketQueue,
outbound_socket: PacketPayloadQueue,
) -> MixNodeRunner:
thread = MixNodeRunner(
self.encryption_private_key,
delay_rate_per_min,
inbound_socket,
outbound_socket,
)
thread.daemon = True
thread.start()
return thread


class MixNodeRunner(Thread):
"""
Read SphinxPackets from inbound socket and spawn a thread for each packet to process it.
This thread approximates a M/M/inf queue.
"""

def __init__(
self,
encryption_private_key: X25519PrivateKey,
delay_rate_per_min: int, # Poisson rate parameter: mu
inbound_socket: PacketQueue,
outbound_socket: PacketPayloadQueue,
):
super().__init__()
self.encryption_private_key = encryption_private_key
self.delay_rate_per_min = delay_rate_per_min
self.inbound_socket = inbound_socket
self.outbound_socket = outbound_socket
self.num_processing = AtomicInt(0)

def run(self) -> None:
# Here in Python, this thread is implemented in synchronous manner.
# In the real implementation, consider implementing this in asynchronous if possible,
# to approximate a M/M/inf queue
while True:
_, packet = self.inbound_socket.get()
thread = MixNodePacketProcessor(
packet,
self.encryption_private_key,
self.delay_rate_per_min,
self.outbound_socket,
self.num_processing,
)
thread.daemon = True
self.num_processing.add(1)
thread.start()

def num_jobs(self) -> int:
"""
Return the number of packets that are being processed or still in the inbound socket.
If this thread works as a M/M/inf queue completely,
the number of packets that are still in the inbound socket must be always 0.
"""
return self.num_processing.get() + self.inbound_socket.qsize()


class MixNodePacketProcessor(Thread):
"""
Process a single packet with a delay that follows exponential distribution,
and forward it to the next mix node or the mix destination
This thread is a single server (worker) in a M/M/inf queue that MixNodeRunner approximates.
"""

def __init__(
self,
packet: SphinxPacket,
encryption_private_key: X25519PrivateKey,
delay_rate_per_min: int, # Poisson rate parameter: mu
outbound_socket: PacketPayloadQueue,
num_processing: AtomicInt,
):
super().__init__()
self.packet = packet
self.encryption_private_key = encryption_private_key
self.delay_rate_per_min = delay_rate_per_min
self.outbound_socket = outbound_socket
self.num_processing = num_processing

def run(self) -> None:
delay_sec = poisson_interval_sec(self.delay_rate_per_min)
time.sleep(delay_sec)

processed = self.packet.process(self.encryption_private_key)
match processed:
case ProcessedForwardHopPacket():
self.outbound_socket.put(
(processed.next_node_address, processed.next_packet)
)
case ProcessedFinalHopPacket():
self.outbound_socket.put(
(processed.destination_node_address, processed.payload)
)
case _:
raise UnknownHeaderTypeError

self.num_processing.sub(1)


class AtomicInt:
def __init__(self, initial: int) -> None:
self.lock = threading.Lock()
self.value = initial

def add(self, v: int):
with self.lock:
self.value += v

def sub(self, v: int):
with self.lock:
self.value -= v

def get(self) -> int:
with self.lock:
return self.value
13 changes: 13 additions & 0 deletions mixnet/poisson.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import numpy


def poisson_interval_sec(rate_per_min: int) -> float:
# If events occur in a Poisson distribution with rate_per_min,
# the interval between events follows the exponential distribution
# with the rate_per_min (i.e. with the scale 1/rate_per_min).
interval_min = numpy.random.exponential(scale=1 / rate_per_min, size=1)[0]
return interval_min * 60


def poisson_mean_interval_sec(rate_per_min: int) -> float:
return 1 / rate_per_min * 60
114 changes: 114 additions & 0 deletions mixnet/test_node.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
import queue
import threading
import time
from datetime import datetime
from typing import Tuple
from unittest import TestCase

import numpy
import timeout_decorator
from cryptography.hazmat.primitives.asymmetric.x25519 import X25519PrivateKey
from pysphinx.sphinx import SphinxPacket

from mixnet.bls import generate_bls
from mixnet.mixnet import Mixnet, MixnetTopology
from mixnet.node import MixNode, NodeAddress, PacketPayloadQueue, PacketQueue
from mixnet.packet import PacketBuilder
from mixnet.poisson import poisson_interval_sec, poisson_mean_interval_sec
from mixnet.utils import random_bytes


class TestMixNodeRunner(TestCase):
@timeout_decorator.timeout(180)
def test_mixnode_runner_emission_rate(self):
"""
Test if MixNodeRunner works as a M/M/inf queue.
If inputs are arrived at Poisson rate `lambda`,
and if processing is delayed according to an exponential distribution with a rate `mu`,
the rate of outputs should be `lambda`.
"""
mixnet, topology = self.init()
inbound_socket: PacketQueue = queue.Queue()
outbound_socket: PacketPayloadQueue = queue.Queue()

packet, route = PacketBuilder.real(b"msg", mixnet, topology).next()

delay_rate_per_min = 30 # mu (= 2s delay on average)
# Start only the first mix node for testing
runner = route[0].start(delay_rate_per_min, inbound_socket, outbound_socket)

# Send packets to the first mix node in a Poisson distribution
packet_count = 100
emission_rate_per_min = 120 # lambda (= 2msg/sec)
sender = threading.Thread(
target=self.send_packets,
args=(
inbound_socket,
packet,
route[0].addr,
packet_count,
emission_rate_per_min,
),
)
sender.daemon = True
sender.start()

# Calculate intervals between outputs and gather num_jobs in the first mix node.
intervals = []
num_jobs = []
ts = datetime.now()
for _ in range(packet_count):
_ = outbound_socket.get()
now = datetime.now()
intervals.append((now - ts).total_seconds())
num_jobs.append(runner.num_jobs())
ts = now
# Remove the first interval that would be much larger than other intervals,
# because of the delay in mix node.
intervals = intervals[1:]
num_jobs = num_jobs[1:]

# Check if the emission rate of the first mix node is the same as
# the emission rate of the message sender, but with a delay.
# If outputs follow the Poisson distribution with a rate `lambda`,
# a mean interval between outputs must be `1/lambda`.
self.assertAlmostEqual(
float(numpy.mean(intervals)),
poisson_mean_interval_sec(emission_rate_per_min),
delta=1.0,
)
# If runner is a M/M/inf queue,
# a mean number of jobs being processed/scheduled in the runner must be `lambda/mu`.
self.assertAlmostEqual(
float(numpy.mean(num_jobs)),
round(emission_rate_per_min / delay_rate_per_min),
delta=1.0,
)

@staticmethod
def send_packets(
inbound_socket: PacketQueue,
packet: SphinxPacket,
node_addr: NodeAddress,
cnt: int,
rate_per_min: int,
):
for _ in range(cnt):
time.sleep(poisson_interval_sec(rate_per_min))
inbound_socket.put((node_addr, packet))

@staticmethod
def init() -> Tuple[Mixnet, MixnetTopology]:
mixnet = Mixnet(
[
MixNode(
generate_bls(),
X25519PrivateKey.generate(),
random_bytes(32),
)
for _ in range(12)
]
)
topology = mixnet.build_topology(b"entropy", 3, 3)
return mixnet, topology
3 changes: 2 additions & 1 deletion mixnet/test_packet.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@
from pysphinx.sphinx import ProcessedFinalHopPacket, SphinxPacket

from mixnet.bls import generate_bls
from mixnet.mixnet import Mixnet, MixnetTopology, MixNode
from mixnet.mixnet import Mixnet, MixnetTopology
from mixnet.node import MixNode
from mixnet.packet import (
Fragment,
MessageFlag,
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,5 @@ pycparser==2.21
pysphinx==0.0.1
scipy==1.11.4
setuptools==69.0.3
timeout-decorator==0.5.0
wheel==0.42.0

0 comments on commit d963d6c

Please sign in to comment.