Skip to content

Commit

Permalink
feat: libp2p relay and gossipsub server (#1459)
Browse files Browse the repository at this point in the history
* feat: start writing libp2p server

* feat: working libp2p example

* feat: relay server

* feat: basic chat room server

* feat(server): relay & gossipsub for sending messages in topics (rooms)

* feat: set up server in torii-server and satrt working on client

* feat(client): implemented ping & identify and sending messages to topic

* ferat: stasrt integrating into torii-client & message channel

* feat: integration into torii-client & error handling

* feat: tests for libp2p server & client and finish integration

* chore: tests

* chore: webrtc base

* fix: server webrtc correctly compiling & use tokio

* chore: tests

* feat: client compiling for wasm

* chore: fmt & clippy

* chore: update cargo.lock

* chore: rebase main branch

* fix: deps

* feat: wasm tests for client connectivity

* feat: specify port in torii cli and cert/local key for libp2p

* chore: clippy & fmt

* feat(client): event loop

* feat(client): add quic support

* fix: issue with ping timeout

* chore: clippy and fmt

* refactor: review changes

* chore: change to torii-relay

* chore: fmt

* refactor: use unbounded channel for messages & commands

* chore: remove unused imports

* chore: clippy torii client

* feat: expose full message metadata

* chore: prefix cli args with relay

* feat: oneshot channel for commands result and expose mesages on torii client

* refactor(server): better error handling & type scopes
  • Loading branch information
Larkooo authored Jan 25, 2024
1 parent 5d61184 commit 09783a4
Show file tree
Hide file tree
Showing 17 changed files with 3,047 additions and 170 deletions.
2,303 changes: 2,137 additions & 166 deletions Cargo.lock

Large diffs are not rendered by default.

5 changes: 1 addition & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ torii-core = { path = "crates/torii/core" }
torii-graphql = { path = "crates/torii/graphql" }
torii-grpc = { path = "crates/torii/grpc" }
torii-server = { path = "crates/torii/server" }
torii-relay = { path = "crates/torii/libp2p" }

# sozo
sozo-signers = { path = "crates/sozo/signers" }
Expand All @@ -88,7 +89,6 @@ assert_matches = "1.5.0"
async-trait = "0.1.68"
base64 = "0.21.2"
blockifier = { git = "https://github.com/starkware-libs/blockifier", tag = "v0.4.0-rc9.2" }
cairo-lang-casm = "2.4.0"
cairo-lang-compiler = "2.4.0"
cairo-lang-debug = "2.4.0"
cairo-lang-defs = "2.4.0"
Expand All @@ -115,12 +115,10 @@ camino = { version = "1.1.2", features = [ "serde1" ] }
chrono = { version = "0.4.24", features = [ "serde" ] }
clap = { version = "4.2", features = [ "derive" ] }
clap_complete = "4.3"
colored = "2"
console = "0.15.7"
convert_case = "0.6.0"
crypto-bigint = { version = "0.5.3", features = [ "serde" ] }
derive_more = "0.99.17"
env_logger = "0.10.0"
flate2 = "1.0.24"
futures = "0.3.28"
hex = "0.4.3"
Expand All @@ -129,7 +127,6 @@ itertools = "0.10.3"
jsonrpsee = { version = "0.16.2", default-features = false }
lazy_static = "1.4.0"
metrics-process = "1.0.9"
num-bigint = "0.4"
once_cell = "1.0"
parking_lot = "0.12.1"
pretty_assertions = "1.2.1"
Expand Down
1 change: 1 addition & 0 deletions bin/torii/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ tower-http = "0.4.4"
tracing-subscriber.workspace = true
tracing.workspace = true
url.workspace = true
torii-relay.workspace = true

[dev-dependencies]
camino.workspace = true
Expand Down
26 changes: 26 additions & 0 deletions bin/torii/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,23 @@ struct Args {
#[arg(long, value_name = "SOCKET", default_value = ":8080", value_parser = parse_socket_address)]
addr: SocketAddr,

/// Port to serve Libp2p TCP & UDP Quic transports
#[arg(long, value_name = "PORT", default_value = "9090")]
relay_port: u16,

/// Port to serve Libp2p WebRTC transport
#[arg(long, value_name = "PORT", default_value = "9091")]
relay_webrtc_port: u16,

/// Path to a local identity key file. If not specified, a new identity will be generated
#[arg(long, value_name = "PATH")]
relay_local_key_path: Option<String>,

/// Path to a local certificate file. If not specified, a new certificate will be generated
/// for WebRTC connections
#[arg(long, value_name = "PATH")]
relay_cert_path: Option<String>,

/// Specify allowed origins for api endpoints (comma-separated list of allowed origins, or "*"
/// for all)
#[arg(long, default_value = "*")]
Expand Down Expand Up @@ -163,6 +180,14 @@ async fn main() -> anyhow::Result<()> {
proxy_server.clone(),
);

let mut libp2p_relay_server = torii_relay::server::Relay::new(
args.relay_port,
args.relay_webrtc_port,
args.relay_local_key_path,
args.relay_cert_path,
)
.expect("Failed to start libp2p relay server");

info!(target: "torii::cli", "Starting torii endpoint: {}", format!("http://{}", args.addr));
info!(target: "torii::cli", "Serving Graphql playground: {}\n", format!("http://{}/graphql", args.addr));

Expand All @@ -183,6 +208,7 @@ async fn main() -> anyhow::Result<()> {
_ = proxy_server.start(shutdown_tx.subscribe()) => {},
_ = graphql_server => {},
_ = grpc_server => {},
_ = libp2p_relay_server.run() => {},
};

Ok(())
Expand Down
2 changes: 2 additions & 0 deletions crates/torii/client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ starknet.workspace = true
thiserror.workspace = true
tokio = { version = "1.32.0", features = [ "sync" ], default-features = false }
torii-grpc = { path = "../grpc", features = [ "client" ] }
torii-relay = { path = "../libp2p" }
url.workspace = true
libp2p-gossipsub = "0.46.1"

[target.'cfg(not(target_arch = "wasm32"))'.dependencies]
prost.workspace = true
Expand Down
2 changes: 2 additions & 0 deletions crates/torii/client/src/client/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ pub enum Error {
#[error(transparent)]
GrpcClient(#[from] torii_grpc::client::Error),
#[error(transparent)]
RelayClient(#[from] torii_relay::errors::Error),
#[error(transparent)]
Model(#[from] ModelError),
#[error("Unsupported query")]
UnsupportedQuery,
Expand Down
43 changes: 43 additions & 0 deletions crates/torii/client/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ use dojo_types::packing::unpack;
use dojo_types::schema::Ty;
use dojo_types::WorldMetadata;
use dojo_world::contracts::WorldContractReader;
use futures::channel::mpsc::UnboundedReceiver;
use futures_util::lock::Mutex;
use parking_lot::{RwLock, RwLockReadGuard};
use starknet::core::utils::cairo_short_string_to_felt;
use starknet::providers::jsonrpc::HttpTransport;
Expand All @@ -20,6 +22,7 @@ use torii_grpc::client::{EntityUpdateStreaming, ModelDiffsStreaming};
use torii_grpc::proto::world::RetrieveEntitiesResponse;
use torii_grpc::types::schema::Entity;
use torii_grpc::types::{KeysClause, Query};
use torii_relay::client::Message;

use crate::client::error::{Error, ParseError};
use crate::client::storage::ModelStorage;
Expand All @@ -34,6 +37,8 @@ pub struct Client {
metadata: Arc<RwLock<WorldMetadata>>,
/// The grpc client.
inner: AsyncRwLock<torii_grpc::client::WorldClient>,
/// Libp2p client.
relay_client: torii_relay::client::RelayClient,
/// Model storage
storage: Arc<ModelStorage>,
/// Models the client are subscribed to.
Expand All @@ -49,11 +54,14 @@ impl Client {
pub async fn new(
torii_url: String,
rpc_url: String,
libp2p_relay_url: String,
world: FieldElement,
models_keys: Option<Vec<KeysClause>>,
) -> Result<Self, Error> {
let mut grpc_client = torii_grpc::client::WorldClient::new(torii_url, world).await?;

let libp2p_client = torii_relay::client::RelayClient::new(libp2p_relay_url)?;

let metadata = grpc_client.metadata().await?;

let shared_metadata: Arc<_> = RwLock::new(metadata).into();
Expand Down Expand Up @@ -88,10 +96,45 @@ impl Client {
metadata: shared_metadata,
sub_client_handle: OnceCell::new(),
inner: AsyncRwLock::new(grpc_client),
relay_client: libp2p_client,
subscribed_models: subbed_models,
})
}

/// Subscribes to a topic.
/// Returns true if the topic was subscribed to.
/// Returns false if the topic was already subscribed to.
pub async fn subscribe_topic(&mut self, topic: String) -> Result<bool, Error> {
self.relay_client.command_sender.subscribe(topic).await.map_err(Error::RelayClient)
}

/// Unsubscribes from a topic.
/// Returns true if the topic was subscribed to.
pub async fn unsubscribe_topic(&mut self, topic: String) -> Result<bool, Error> {
self.relay_client.command_sender.unsubscribe(topic).await.map_err(Error::RelayClient)
}

/// Publishes a message to a topic.
/// Returns the message id.
pub async fn publish_message(&mut self, topic: &str, message: &[u8]) -> Result<Vec<u8>, Error> {
self.relay_client
.command_sender
.publish(topic.to_string(), message.to_vec())
.await
.map_err(Error::RelayClient)
.map(|m| m.0)
}

/// Runs the libp2p event loop which processes incoming messages and commands.
/// And sends events in the channel
pub async fn run_libp2p(&mut self) {
self.relay_client.event_loop.run().await;
}

pub fn libp2p_message_stream(&self) -> Arc<Mutex<UnboundedReceiver<Message>>> {
self.relay_client.message_receiver.clone()
}

/// Returns a read lock on the World metadata that the client is connected to.
pub fn metadata(&self) -> RwLockReadGuard<'_, WorldMetadata> {
self.metadata.read()
Expand Down
36 changes: 36 additions & 0 deletions crates/torii/libp2p/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
[package]
edition.workspace = true
license-file.workspace = true
name = "torii-relay"
repository.workspace = true
version.workspace = true

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
futures.workspace = true
rand = "0.8.5"
serde.workspace = true
serde_json.workspace = true
thiserror.workspace = true
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
tracing.workspace = true
async-trait = "0.1.77"
regex = "1.10.3"
anyhow.workspace = true

[dev-dependencies]
tempfile = "3.9.0"

[target.'cfg(not(target_arch = "wasm32"))'.dependencies]
tokio.workspace = true
libp2p = { git = "https://github.com/libp2p/rust-libp2p", features = [ "ed25519", "gossipsub", "identify", "macros", "noise", "ping", "quic", "relay", "tcp", "tokio", "yamux" ] }
libp2p-webrtc = { git = "https://github.com/libp2p/rust-libp2p", features = [ "tokio", "pem" ] }

[target.'cfg(target_arch = "wasm32")'.dependencies]
libp2p = { git = "https://github.com/libp2p/rust-libp2p", features = [ "ed25519", "gossipsub", "identify", "macros", "ping", "tcp", "wasm-bindgen" ] }
libp2p-webrtc-websys = { git = "https://github.com/libp2p/rust-libp2p" }
tracing-wasm = "0.2.1"
wasm-bindgen-test = "0.3.40"
wasm-bindgen-futures = "0.4.40"
wasm-timer = "0.2.5"
27 changes: 27 additions & 0 deletions crates/torii/libp2p/src/client/events.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
use gossipsub::Event as GossipsubEvent;
use libp2p::{gossipsub, identify, ping};

#[derive(Debug)]
pub(crate) enum ClientEvent {
Gossipsub(GossipsubEvent),
Identify(identify::Event),
Ping(ping::Event),
}

impl From<GossipsubEvent> for ClientEvent {
fn from(event: GossipsubEvent) -> Self {
Self::Gossipsub(event)
}
}

impl From<identify::Event> for ClientEvent {
fn from(event: identify::Event) -> Self {
Self::Identify(event)
}
}

impl From<ping::Event> for ClientEvent {
fn from(event: ping::Event) -> Self {
Self::Ping(event)
}
}
Loading

0 comments on commit 09783a4

Please sign in to comment.