diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index cb02a549da1..92f7b449fde 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -63,6 +63,7 @@ jobs: crate: ${{ env.CRATE }} - name: Enforce no dependency on meta crate + if: env.CRATE != 'libp2p-server' run: | cargo metadata --format-version=1 --no-deps | \ jq -e -r '.packages[] | select(.name == "'"$CRATE"'") | .dependencies | all(.name != "libp2p")' diff --git a/.github/workflows/docker-image.yml b/.github/workflows/docker-image.yml new file mode 100644 index 00000000000..45a4db14385 --- /dev/null +++ b/.github/workflows/docker-image.yml @@ -0,0 +1,35 @@ +name: Publish docker images + +on: + push: + branches: + - '**' + tags: + - 'libp2p-server-**' + +jobs: + server: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v3 + + - uses: docker/login-action@v2 + with: + registry: ghcr.io + username: ${{ github.actor }} + password: ${{ github.token }} + + - name: Set up Docker Buildx + uses: docker/setup-buildx-action@v2 + + - name: Get branch or tag name + id: ref-name + run: echo ::set-output name=ref::${GITHUB_REF#refs/*/} + + - name: Build and push + uses: docker/build-push-action@v3 + with: + context: . + file: ./misc/server/Dockerfile + push: true + tags: ghcr.io/${{ github.repository }}-server:${{ steps.ref-name.outputs.ref }} diff --git a/Cargo.lock b/Cargo.lock index 21c81836922..8ae14bea796 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3174,6 +3174,26 @@ dependencies = [ "void", ] +[[package]] +name = "libp2p-server" +version = "0.12.1" +dependencies = [ + "base64 0.21.2", + "clap", + "env_logger 0.10.0", + "futures", + "futures-timer", + "hyper", + "libp2p", + "log", + "prometheus-client", + "serde", + "serde_derive", + "serde_json", + "tokio", + "zeroize", +] + [[package]] name = "libp2p-swarm" version = "0.43.3" diff --git a/Cargo.toml b/Cargo.toml index 03f80065f2d..979365d1131 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -24,6 +24,7 @@ members = [ "misc/quick-protobuf-codec", "misc/quickcheck-ext", "misc/rw-stream-sink", + "misc/server", "muxers/mplex", "muxers/test-harness", "muxers/yamux", @@ -89,6 +90,7 @@ libp2p-quic = { version = "0.9.2", path = "transports/quic" } libp2p-relay = { version = "0.16.1", path = "protocols/relay" } libp2p-rendezvous = { version = "0.13.0", path = "protocols/rendezvous" } libp2p-request-response = { version = "0.25.1", path = "protocols/request-response" } +libp2p-server = { version = "0.12.1", path = "misc/server" } libp2p-swarm = { version = "0.43.3", path = "swarm" } libp2p-swarm-derive = { version = "0.33.0", path = "swarm-derive" } libp2p-swarm-test = { version = "0.2.0", path = "swarm-test" } diff --git a/misc/server/CHANGELOG.md b/misc/server/CHANGELOG.md new file mode 100644 index 00000000000..f419bb7cebd --- /dev/null +++ b/misc/server/CHANGELOG.md @@ -0,0 +1,53 @@ +# Changelog +All notable changes to this project will be documented in this file. + +The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), +and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). + +## [0.12.1] - unreleased + +### Changed +- Move to tokio and hyper. + See [PR 4311]. +- Move to distroless Docker base image. + See [PR 4311]. + +[PR 4311]: https://github.com/libp2p/rust-libp2p/pull/4311 + +## [0.8.0] +### Changed +- Remove mplex support. + +## [0.7.0] +### Changed +- Update to libp2p v0.47.0. + +## [0.6.0] - [2022-05-05] +### Changed +- Update to libp2p v0.44.0. + +## [0.5.4] - [2022-01-11] +### Changed +- Pull latest autonat changes. + +## [0.5.3] - [2021-12-25] +### Changed +- Update dependencies. +- Pull in autonat fixes. + +## [0.5.2] - [2021-12-20] +### Added +- Add support for libp2p autonat protocol via `--enable-autonat`. + +## [0.5.1] - [2021-12-20] +### Fixed +- Update dependencies. +- Fix typo in command line flag `--enable-kademlia`. + +## [0.5.0] - 2021-11-18 +### Changed +- Disable Kademlia protocol by default. + +## [0.4.0] - 2021-11-18 +### Fixed +- Update dependencies. diff --git a/misc/server/Cargo.toml b/misc/server/Cargo.toml new file mode 100644 index 00000000000..baf5fbab13c --- /dev/null +++ b/misc/server/Cargo.toml @@ -0,0 +1,26 @@ +[package] +name = "libp2p-server" +version = "0.12.1" +authors = ["Max Inden "] +edition = "2021" +rust-version = { workspace = true } +description = "A rust-libp2p server binary." +license = "MIT" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +base64 = "0.21" +clap = { version = "4.3.12", features = ["derive"] } +env_logger = "0.10.0" +futures = "0.3" +futures-timer = "3" +hyper = { version = "0.14", features = ["server", "tcp", "http1"] } +libp2p = { path = "../../libp2p", features = ["autonat", "dns", "tokio", "noise", "tcp", "yamux", "identify", "kad", "ping", "relay", "metrics", "rsa", "macros", "quic"] } +log = "0.4" +prometheus-client = "0.21.2" +serde = "1.0.183" +serde_derive = "1.0.125" +serde_json = "1.0" +tokio = { version = "1", features = ["rt-multi-thread", "macros"] } +zeroize = "1" diff --git a/misc/server/Dockerfile b/misc/server/Dockerfile new file mode 100644 index 00000000000..34f0c7d0bf9 --- /dev/null +++ b/misc/server/Dockerfile @@ -0,0 +1,16 @@ +FROM rust:1.66-bullseye as builder +WORKDIR /usr/src/rust-libp2p-server + +# Run with access to the target cache to speed up builds +WORKDIR /workspace +ADD . . +RUN --mount=type=cache,target=./target \ + --mount=type=cache,target=/usr/local/cargo/registry \ + cargo build --release --package libp2p-server + +RUN --mount=type=cache,target=./target \ + mv ./target/release/libp2p-server /usr/local/bin/libp2p-server + +FROM gcr.io/distroless/cc +COPY --from=builder /usr/local/bin/libp2p-server /usr/local/bin/libp2p-server +CMD ["libp2p-server"] diff --git a/misc/server/README.md b/misc/server/README.md new file mode 100644 index 00000000000..ec4012562a2 --- /dev/null +++ b/misc/server/README.md @@ -0,0 +1,35 @@ +# Rust libp2p Server + +A rust-libp2p based server implementation running: + +- the [Kademlia protocol](https://github.com/libp2p/specs/tree/master/kad-dht) + +- the [Circuit Relay v2 protocol](https://github.com/libp2p/specs/blob/master/relay/circuit-v2.md) + +- the [AutoNAT protocol](https://github.com/libp2p/specs/blob/master/autonat/README.md) + +## Usage + +``` +cargo run -- --help + +A rust-libp2p server binary. + +Usage: libp2p-server [OPTIONS] --config + +Options: + --config Path to IPFS config file + --metrics-path Metric endpoint path [default: /metrics] + --enable-kademlia Whether to run the libp2p Kademlia protocol and join the IPFS DHT + --enable-autonat Whether to run the libp2p Autonat protocol + -h, --help Print help +``` + + +``` +cargo run -- --config ~/.ipfs/config + +Local peer id: PeerId("12D3KooWSa1YEeQVSwvoqAMhwjKQ6kqZQckhWPb3RWEGV3sZGU6Z") +Listening on "/ip4/127.0.0.1/udp/4001/quic" +[...] +``` diff --git a/misc/server/src/behaviour.rs b/misc/server/src/behaviour.rs new file mode 100644 index 00000000000..b21d58862d7 --- /dev/null +++ b/misc/server/src/behaviour.rs @@ -0,0 +1,78 @@ +use libp2p::autonat; +use libp2p::identify; +use libp2p::kad::{record::store::MemoryStore, Kademlia, KademliaConfig}; +use libp2p::ping; +use libp2p::relay; +use libp2p::swarm::behaviour::toggle::Toggle; +use libp2p::{identity, swarm::NetworkBehaviour, Multiaddr, PeerId}; +use std::str::FromStr; +use std::time::Duration; + +const BOOTNODES: [&str; 4] = [ + "QmNnooDu7bfjPFoTZYxMNLWUQJyrVwtbZg5gBMjTezGAJN", + "QmQCU2EcMqAqQPR2i9bChDtGNJchTbq5TbXJJ16u19uLTa", + "QmbLHAnMoJPWSCR5Zhtx6BHJX9KiKNN6tpvbUcqanj75Nb", + "QmcZf59bWwK5XFi76CZX8cbJ4BhTzzA3gU1ZjYZcYW3dwt", +]; + +#[derive(NetworkBehaviour)] +pub(crate) struct Behaviour { + relay: relay::Behaviour, + ping: ping::Behaviour, + identify: identify::Behaviour, + pub(crate) kademlia: Toggle>, + autonat: Toggle, +} + +impl Behaviour { + pub(crate) fn new( + pub_key: identity::PublicKey, + enable_kademlia: bool, + enable_autonat: bool, + ) -> Self { + let kademlia = if enable_kademlia { + let mut kademlia_config = KademliaConfig::default(); + // Instantly remove records and provider records. + // + // TODO: Replace hack with option to disable both. + kademlia_config.set_record_ttl(Some(Duration::from_secs(0))); + kademlia_config.set_provider_record_ttl(Some(Duration::from_secs(0))); + let mut kademlia = Kademlia::with_config( + pub_key.to_peer_id(), + MemoryStore::new(pub_key.to_peer_id()), + kademlia_config, + ); + let bootaddr = Multiaddr::from_str("/dnsaddr/bootstrap.libp2p.io").unwrap(); + for peer in &BOOTNODES { + kademlia.add_address(&PeerId::from_str(peer).unwrap(), bootaddr.clone()); + } + kademlia.bootstrap().unwrap(); + Some(kademlia) + } else { + None + } + .into(); + + let autonat = if enable_autonat { + Some(autonat::Behaviour::new( + PeerId::from(pub_key.clone()), + Default::default(), + )) + } else { + None + } + .into(); + + Self { + relay: relay::Behaviour::new(PeerId::from(pub_key.clone()), Default::default()), + ping: ping::Behaviour::new(ping::Config::new()), + identify: identify::Behaviour::new( + identify::Config::new("ipfs/0.1.0".to_string(), pub_key).with_agent_version( + format!("rust-libp2p-server/{}", env!("CARGO_PKG_VERSION")), + ), + ), + kademlia, + autonat, + } + } +} diff --git a/misc/server/src/config.rs b/misc/server/src/config.rs new file mode 100644 index 00000000000..c3e3ec529c1 --- /dev/null +++ b/misc/server/src/config.rs @@ -0,0 +1,39 @@ +use libp2p::Multiaddr; +use serde_derive::Deserialize; +use std::error::Error; +use std::path::Path; + +#[derive(Clone, Deserialize)] +#[serde(rename_all = "PascalCase")] +pub(crate) struct Config { + pub(crate) identity: Identity, + pub(crate) addresses: Addresses, +} + +impl Config { + pub(crate) fn from_file(path: &Path) -> Result> { + Ok(serde_json::from_str(&std::fs::read_to_string(path)?)?) + } +} + +#[derive(Clone, Deserialize)] +#[serde(rename_all = "PascalCase")] +pub(crate) struct Identity { + #[serde(rename = "PeerID")] + pub(crate) peer_id: String, + pub(crate) priv_key: String, +} + +#[derive(Clone, Deserialize)] +#[serde(rename_all = "PascalCase")] +pub(crate) struct Addresses { + pub(crate) swarm: Vec, + pub(crate) append_announce: Vec, +} + +impl zeroize::Zeroize for Config { + fn zeroize(&mut self) { + self.identity.peer_id.zeroize(); + self.identity.priv_key.zeroize(); + } +} diff --git a/misc/server/src/http_service.rs b/misc/server/src/http_service.rs new file mode 100644 index 00000000000..46cb7aacb84 --- /dev/null +++ b/misc/server/src/http_service.rs @@ -0,0 +1,131 @@ +// Copyright 2022 Protocol Labs. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +use hyper::http::StatusCode; +use hyper::service::Service; +use hyper::{Body, Method, Request, Response, Server}; +use log::{error, info}; +use prometheus_client::encoding::text::encode; +use prometheus_client::registry::Registry; +use std::future::Future; +use std::pin::Pin; +use std::sync::{Arc, Mutex}; +use std::task::{Context, Poll}; + +const METRICS_CONTENT_TYPE: &str = "application/openmetrics-text;charset=utf-8;version=1.0.0"; + +pub(crate) async fn metrics_server(registry: Registry) -> Result<(), std::io::Error> { + // Serve on localhost. + let addr = ([127, 0, 0, 1], 8080).into(); + + // Use the tokio runtime to run the hyper server. + let rt = tokio::runtime::Runtime::new()?; + rt.block_on(async { + let server = Server::bind(&addr).serve(MakeMetricService::new(registry)); + info!("Metrics server on http://{}/metrics", server.local_addr()); + if let Err(e) = server.await { + error!("server error: {}", e); + } + Ok(()) + }) +} + +pub(crate) struct MetricService { + reg: Arc>, +} + +type SharedRegistry = Arc>; + +impl MetricService { + fn get_reg(&mut self) -> SharedRegistry { + Arc::clone(&self.reg) + } + fn respond_with_metrics(&mut self) -> Response { + let mut response: Response = Response::default(); + + response.headers_mut().insert( + hyper::header::CONTENT_TYPE, + METRICS_CONTENT_TYPE.try_into().unwrap(), + ); + + let reg = self.get_reg(); + encode(&mut response.body_mut(), ®.lock().unwrap()).unwrap(); + + *response.status_mut() = StatusCode::OK; + + response + } + fn respond_with_404_not_found(&mut self) -> Response { + Response::builder() + .status(StatusCode::NOT_FOUND) + .body("Not found try localhost:[port]/metrics".to_string()) + .unwrap() + } +} + +impl Service> for MetricService { + type Response = Response; + type Error = hyper::Error; + type Future = Pin> + Send>>; + + fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } + + fn call(&mut self, req: Request) -> Self::Future { + let req_path = req.uri().path(); + let req_method = req.method(); + let resp = if (req_method == Method::GET) && (req_path == "/metrics") { + // Encode and serve metrics from registry. + self.respond_with_metrics() + } else { + self.respond_with_404_not_found() + }; + Box::pin(async { Ok(resp) }) + } +} + +pub(crate) struct MakeMetricService { + reg: SharedRegistry, +} + +impl MakeMetricService { + pub(crate) fn new(registry: Registry) -> MakeMetricService { + MakeMetricService { + reg: Arc::new(Mutex::new(registry)), + } + } +} + +impl Service for MakeMetricService { + type Response = MetricService; + type Error = hyper::Error; + type Future = Pin> + Send>>; + + fn poll_ready(&mut self, _: &mut Context) -> Poll> { + Poll::Ready(Ok(())) + } + + fn call(&mut self, _: T) -> Self::Future { + let reg = self.reg.clone(); + let fut = async move { Ok(MetricService { reg }) }; + Box::pin(fut) + } +} diff --git a/misc/server/src/main.rs b/misc/server/src/main.rs new file mode 100644 index 00000000000..c2dff1f9228 --- /dev/null +++ b/misc/server/src/main.rs @@ -0,0 +1,213 @@ +use base64::Engine; +use clap::Parser; +use futures::executor::block_on; +use futures::future::Either; +use futures::stream::StreamExt; +use futures_timer::Delay; +use libp2p::core::muxing::StreamMuxerBox; +use libp2p::core::upgrade; +use libp2p::dns; +use libp2p::identify; +use libp2p::identity; +use libp2p::identity::PeerId; +use libp2p::kad; +use libp2p::metrics::{Metrics, Recorder}; +use libp2p::noise; +use libp2p::quic; +use libp2p::swarm::{SwarmBuilder, SwarmEvent}; +use libp2p::tcp; +use libp2p::yamux; +use libp2p::Transport; +use log::{debug, info}; +use prometheus_client::metrics::info::Info; +use prometheus_client::registry::Registry; +use std::error::Error; +use std::io; +use std::path::PathBuf; +use std::str::FromStr; +use std::task::Poll; +use std::thread; +use std::time::Duration; +use zeroize::Zeroizing; + +mod behaviour; +mod config; +mod http_service; + +const BOOTSTRAP_INTERVAL: Duration = Duration::from_secs(5 * 60); + +#[derive(Debug, Parser)] +#[clap(name = "libp2p server", about = "A rust-libp2p server binary.")] +struct Opts { + /// Path to IPFS config file. + #[clap(long)] + config: PathBuf, + + /// Metric endpoint path. + #[clap(long, default_value = "/metrics")] + metrics_path: String, + + /// Whether to run the libp2p Kademlia protocol and join the IPFS DHT. + #[clap(long)] + enable_kademlia: bool, + + /// Whether to run the libp2p Autonat protocol. + #[clap(long)] + enable_autonat: bool, +} + +#[tokio::main] +async fn main() -> Result<(), Box> { + env_logger::init(); + + let opt = Opts::parse(); + + let config = Zeroizing::new(config::Config::from_file(opt.config.as_path())?); + + let (local_peer_id, local_keypair) = { + let keypair = identity::Keypair::from_protobuf_encoding(&Zeroizing::new( + base64::engine::general_purpose::STANDARD + .decode(config.identity.priv_key.as_bytes())?, + ))?; + + let peer_id = keypair.public().into(); + assert_eq!( + PeerId::from_str(&config.identity.peer_id)?, + peer_id, + "Expect peer id derived from private key and peer id retrieved from config to match." + ); + + (peer_id, keypair) + }; + println!("Local peer id: {local_peer_id}"); + + let transport = { + let tcp_transport = + tcp::tokio::Transport::new(tcp::Config::new().port_reuse(true).nodelay(true)) + .upgrade(upgrade::Version::V1) + .authenticate(noise::Config::new(&local_keypair)?) + .multiplex(yamux::Config::default()) + .timeout(Duration::from_secs(20)); + + let quic_transport = { + let mut config = quic::Config::new(&local_keypair); + config.support_draft_29 = true; + quic::tokio::Transport::new(config) + }; + + dns::TokioDnsConfig::system(libp2p::core::transport::OrTransport::new( + quic_transport, + tcp_transport, + ))? + .map(|either_output, _| match either_output { + Either::Left((peer_id, muxer)) => (peer_id, StreamMuxerBox::new(muxer)), + Either::Right((peer_id, muxer)) => (peer_id, StreamMuxerBox::new(muxer)), + }) + .map_err(|err| io::Error::new(io::ErrorKind::Other, err)) + .boxed() + }; + + let behaviour = behaviour::Behaviour::new( + local_keypair.public(), + opt.enable_kademlia, + opt.enable_autonat, + ); + let mut swarm = SwarmBuilder::with_tokio_executor(transport, behaviour, local_peer_id).build(); + + if config.addresses.swarm.is_empty() { + log::warn!("No listen addresses configured."); + } + for address in &config.addresses.swarm { + match swarm.listen_on(address.clone()) { + Ok(_) => {} + Err(e @ libp2p::TransportError::MultiaddrNotSupported(_)) => { + log::warn!("Failed to listen on {address}, continuing anyways, {e}") + } + Err(e) => return Err(e.into()), + } + } + if config.addresses.append_announce.is_empty() { + log::warn!("No external addresses configured."); + } + for address in &config.addresses.append_announce { + swarm.add_external_address(address.clone()) + } + log::info!( + "External addresses: {:?}", + swarm.external_addresses().collect::>() + ); + + let mut metric_registry = Registry::default(); + let metrics = Metrics::new(&mut metric_registry); + let build_info = Info::new(vec![("version".to_string(), env!("CARGO_PKG_VERSION"))]); + metric_registry.register( + "build", + "A metric with a constant '1' value labeled by version", + build_info, + ); + thread::spawn(move || block_on(http_service::metrics_server(metric_registry))); + + let mut bootstrap_timer = Delay::new(BOOTSTRAP_INTERVAL); + + loop { + if let Poll::Ready(()) = futures::poll!(&mut bootstrap_timer) { + bootstrap_timer.reset(BOOTSTRAP_INTERVAL); + let _ = swarm + .behaviour_mut() + .kademlia + .as_mut() + .map(|k| k.bootstrap()); + } + + let event = swarm.next().await.expect("Swarm not to terminate."); + metrics.record(&event); + match event { + SwarmEvent::Behaviour(behaviour::BehaviourEvent::Identify(e)) => { + info!("{:?}", e); + metrics.record(&e); + + if let identify::Event::Received { + peer_id, + info: + identify::Info { + listen_addrs, + protocols, + .. + }, + } = e + { + if protocols.iter().any(|p| *p == kad::PROTOCOL_NAME) { + for addr in listen_addrs { + swarm + .behaviour_mut() + .kademlia + .as_mut() + .map(|k| k.add_address(&peer_id, addr)); + } + } + } + } + SwarmEvent::Behaviour(behaviour::BehaviourEvent::Ping(e)) => { + debug!("{:?}", e); + metrics.record(&e); + } + SwarmEvent::Behaviour(behaviour::BehaviourEvent::Kademlia(e)) => { + debug!("{:?}", e); + metrics.record(&e); + } + SwarmEvent::Behaviour(behaviour::BehaviourEvent::Relay(e)) => { + info!("{:?}", e); + metrics.record(&e) + } + SwarmEvent::Behaviour(behaviour::BehaviourEvent::Autonat(e)) => { + info!("{:?}", e); + // TODO: Add metric recording for `NatStatus`. + // metrics.record(&e) + } + SwarmEvent::NewListenAddr { address, .. } => { + println!("Listening on {address:?}"); + } + _ => {} + } + } +}