Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mixnet: Packet delay in mix node #49

Merged
merged 4 commits into from
Jan 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 2 additions & 28 deletions mixnet/mixnet.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,10 @@

import random
from dataclasses import dataclass
from typing import List, TypeAlias
from typing import List

from cryptography.hazmat.primitives.asymmetric.x25519 import (
X25519PrivateKey,
X25519PublicKey,
)
from pysphinx.node import Node

from mixnet.bls import BlsPrivateKey, BlsPublicKey
from mixnet.fisheryates import FisherYates

NodeId: TypeAlias = BlsPublicKey
# 32-byte that represents an IP address and a port of a mix node.
NodeAddress: TypeAlias = bytes
from mixnet.node import MixNode


@dataclass
Expand Down Expand Up @@ -49,22 +39,6 @@ def choose_mixnode(self) -> MixNode:
return random.choice(self.mix_nodes)


@dataclass
class MixNode:
identity_private_key: BlsPrivateKey
encryption_private_key: X25519PrivateKey
addr: NodeAddress

def identity_public_key(self) -> BlsPublicKey:
return self.identity_private_key.get_g1()

def encryption_public_key(self) -> X25519PublicKey:
return self.encryption_private_key.public_key()

def sphinx_node(self) -> Node:
return Node(self.encryption_private_key, self.addr)


@dataclass
class MixnetTopology:
layers: List[List[MixNode]]
Expand Down
174 changes: 174 additions & 0 deletions mixnet/node.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
from __future__ import annotations

import queue
import threading
import time
from dataclasses import dataclass
from threading import Thread
from typing import Tuple, TypeAlias

from cryptography.hazmat.primitives.asymmetric.x25519 import (
X25519PrivateKey,
X25519PublicKey,
)
from pysphinx.node import Node
from pysphinx.sphinx import (
Payload,
ProcessedFinalHopPacket,
ProcessedForwardHopPacket,
SphinxPacket,
UnknownHeaderTypeError,
)

from mixnet.bls import BlsPrivateKey, BlsPublicKey
from mixnet.poisson import poisson_interval_sec

NodeId: TypeAlias = BlsPublicKey
# 32-byte that represents an IP address and a port of a mix node.
NodeAddress: TypeAlias = bytes

PacketQueue: TypeAlias = "queue.Queue[Tuple[NodeAddress, SphinxPacket]]"
PacketPayloadQueue: TypeAlias = (
"queue.Queue[Tuple[NodeAddress, SphinxPacket | Payload]]"
)


@dataclass
class MixNode:
identity_private_key: BlsPrivateKey
encryption_private_key: X25519PrivateKey
addr: NodeAddress

def identity_public_key(self) -> BlsPublicKey:
return self.identity_private_key.get_g1()

def encryption_public_key(self) -> X25519PublicKey:
return self.encryption_private_key.public_key()

def sphinx_node(self) -> Node:
return Node(self.encryption_private_key, self.addr)

def start(
self,
delay_rate_per_min: int,
inbound_socket: PacketQueue,
outbound_socket: PacketPayloadQueue,
) -> MixNodeRunner:
thread = MixNodeRunner(
self.encryption_private_key,
delay_rate_per_min,
inbound_socket,
outbound_socket,
)
thread.daemon = True
thread.start()
return thread
Comment on lines +51 to +65
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nitpick: could this be async instead?

Copy link
Contributor Author

@youngjoon-lee youngjoon-lee Jan 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we can use async, but I'm thinking if it would be not readable for someone who is not familar with async programming.

Also in previous PRs I remember to see some async code, we should choose one style

I haven't used async code in this repo. But, let me translate this into async. If it doesn't look complicated, we can go with async because it's much easier to approximate M/M/∞ queue using async rather than sync.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't used async code in this repo. But, hm, let me translate this into async. If it doesn't look complicated, we can go with async because it's much easier to approximate M/M/∞ queue using async rather than sync.

It is not necessary!, we can go ahead with this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, but after opening subsequent PRs #50 and #51, now I have a feeling that async code would be even simpler. I'll try it and open a separate PR soon. So, I hope this and subsequent PRs are reviewed and merged independently.



class MixNodeRunner(Thread):
"""
Read SphinxPackets from inbound socket and spawn a thread for each packet to process it.

This thread approximates a M/M/inf queue.
"""

def __init__(
self,
encryption_private_key: X25519PrivateKey,
delay_rate_per_min: int, # Poisson rate parameter: mu
inbound_socket: PacketQueue,
outbound_socket: PacketPayloadQueue,
):
super().__init__()
self.encryption_private_key = encryption_private_key
self.delay_rate_per_min = delay_rate_per_min
self.inbound_socket = inbound_socket
self.outbound_socket = outbound_socket
self.num_processing = AtomicInt(0)

def run(self) -> None:
# Here in Python, this thread is implemented in synchronous manner.
# In the real implementation, consider implementing this in asynchronous if possible,
# to approximate a M/M/inf queue
while True:
_, packet = self.inbound_socket.get()
thread = MixNodePacketProcessor(
packet,
self.encryption_private_key,
self.delay_rate_per_min,
self.outbound_socket,
self.num_processing,
)
thread.daemon = True
self.num_processing.add(1)
thread.start()

def num_jobs(self) -> int:
"""
Return the number of packets that are being processed or still in the inbound socket.

If this thread works as a M/M/inf queue completely,
the number of packets that are still in the inbound socket must be always 0.
"""
return self.num_processing.get() + self.inbound_socket.qsize()


class MixNodePacketProcessor(Thread):
"""
Process a single packet with a delay that follows exponential distribution,
and forward it to the next mix node or the mix destination

This thread is a single server (worker) in a M/M/inf queue that MixNodeRunner approximates.
"""

def __init__(
self,
packet: SphinxPacket,
encryption_private_key: X25519PrivateKey,
delay_rate_per_min: int, # Poisson rate parameter: mu
outbound_socket: PacketPayloadQueue,
num_processing: AtomicInt,
):
super().__init__()
self.packet = packet
self.encryption_private_key = encryption_private_key
self.delay_rate_per_min = delay_rate_per_min
self.outbound_socket = outbound_socket
self.num_processing = num_processing

def run(self) -> None:
delay_sec = poisson_interval_sec(self.delay_rate_per_min)
time.sleep(delay_sec)

processed = self.packet.process(self.encryption_private_key)
match processed:
case ProcessedForwardHopPacket():
self.outbound_socket.put(
(processed.next_node_address, processed.next_packet)
)
case ProcessedFinalHopPacket():
self.outbound_socket.put(
(processed.destination_node_address, processed.payload)
)
case _:
raise UnknownHeaderTypeError

self.num_processing.sub(1)


class AtomicInt:
def __init__(self, initial: int) -> None:
self.lock = threading.Lock()
self.value = initial

def add(self, v: int):
with self.lock:
self.value += v

def sub(self, v: int):
with self.lock:
self.value -= v

def get(self) -> int:
with self.lock:
return self.value
13 changes: 13 additions & 0 deletions mixnet/poisson.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import numpy


def poisson_interval_sec(rate_per_min: int) -> float:
# If events occur in a Poisson distribution with rate_per_min,
# the interval between events follows the exponential distribution
# with the rate_per_min (i.e. with the scale 1/rate_per_min).
interval_min = numpy.random.exponential(scale=1 / rate_per_min, size=1)[0]
return interval_min * 60


def poisson_mean_interval_sec(rate_per_min: int) -> float:
return 1 / rate_per_min * 60
114 changes: 114 additions & 0 deletions mixnet/test_node.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
import queue
import threading
import time
from datetime import datetime
from typing import Tuple
from unittest import TestCase

import numpy
import timeout_decorator
from cryptography.hazmat.primitives.asymmetric.x25519 import X25519PrivateKey
from pysphinx.sphinx import SphinxPacket

from mixnet.bls import generate_bls
from mixnet.mixnet import Mixnet, MixnetTopology
from mixnet.node import MixNode, NodeAddress, PacketPayloadQueue, PacketQueue
from mixnet.packet import PacketBuilder
from mixnet.poisson import poisson_interval_sec, poisson_mean_interval_sec
from mixnet.utils import random_bytes


class TestMixNodeRunner(TestCase):
@timeout_decorator.timeout(180)
def test_mixnode_runner_emission_rate(self):
"""
Test if MixNodeRunner works as a M/M/inf queue.

If inputs are arrived at Poisson rate `lambda`,
and if processing is delayed according to an exponential distribution with a rate `mu`,
the rate of outputs should be `lambda`.
"""
mixnet, topology = self.init()
inbound_socket: PacketQueue = queue.Queue()
outbound_socket: PacketPayloadQueue = queue.Queue()

packet, route = PacketBuilder.real(b"msg", mixnet, topology).next()

delay_rate_per_min = 30 # mu (= 2s delay on average)
# Start only the first mix node for testing
runner = route[0].start(delay_rate_per_min, inbound_socket, outbound_socket)

# Send packets to the first mix node in a Poisson distribution
packet_count = 100
emission_rate_per_min = 120 # lambda (= 2msg/sec)
sender = threading.Thread(
target=self.send_packets,
args=(
inbound_socket,
packet,
route[0].addr,
packet_count,
emission_rate_per_min,
),
)
sender.daemon = True
sender.start()

# Calculate intervals between outputs and gather num_jobs in the first mix node.
intervals = []
num_jobs = []
ts = datetime.now()
for _ in range(packet_count):
_ = outbound_socket.get()
now = datetime.now()
intervals.append((now - ts).total_seconds())
num_jobs.append(runner.num_jobs())
ts = now
# Remove the first interval that would be much larger than other intervals,
# because of the delay in mix node.
intervals = intervals[1:]
num_jobs = num_jobs[1:]

# Check if the emission rate of the first mix node is the same as
# the emission rate of the message sender, but with a delay.
# If outputs follow the Poisson distribution with a rate `lambda`,
# a mean interval between outputs must be `1/lambda`.
self.assertAlmostEqual(
float(numpy.mean(intervals)),
poisson_mean_interval_sec(emission_rate_per_min),
delta=1.0,
)
# If runner is a M/M/inf queue,
# a mean number of jobs being processed/scheduled in the runner must be `lambda/mu`.
self.assertAlmostEqual(
float(numpy.mean(num_jobs)),
round(emission_rate_per_min / delay_rate_per_min),
delta=1.0,
)

@staticmethod
def send_packets(
inbound_socket: PacketQueue,
packet: SphinxPacket,
node_addr: NodeAddress,
cnt: int,
rate_per_min: int,
):
for _ in range(cnt):
time.sleep(poisson_interval_sec(rate_per_min))
inbound_socket.put((node_addr, packet))

@staticmethod
def init() -> Tuple[Mixnet, MixnetTopology]:
mixnet = Mixnet(
[
MixNode(
generate_bls(),
X25519PrivateKey.generate(),
random_bytes(32),
)
for _ in range(12)
]
)
topology = mixnet.build_topology(b"entropy", 3, 3)
return mixnet, topology
3 changes: 2 additions & 1 deletion mixnet/test_packet.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@
from pysphinx.sphinx import ProcessedFinalHopPacket, SphinxPacket

from mixnet.bls import generate_bls
from mixnet.mixnet import Mixnet, MixnetTopology, MixNode
from mixnet.mixnet import Mixnet, MixnetTopology
from mixnet.node import MixNode
from mixnet.packet import (
Fragment,
MessageFlag,
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,5 @@ pycparser==2.21
pysphinx==0.0.1
scipy==1.11.4
setuptools==69.0.3
timeout-decorator==0.5.0
wheel==0.42.0