diff --git a/.github/workflows/rust-build-and-test.yml b/.github/workflows/rust-build-and-test.yml index 3e345712..4e0aa753 100644 --- a/.github/workflows/rust-build-and-test.yml +++ b/.github/workflows/rust-build-and-test.yml @@ -121,6 +121,12 @@ jobs: - name: Run local dtntrigger test run: ./tests/local_trigger_test.sh + - name: Run spray and wait routing test + run: ./tests/routing_saw.sh + + - name: Run static routing test + run: ./tests/routing_static.sh + - name: Run external peer management (dtn) test run: ./tests/ext_peer_management.sh diff --git a/Cargo.lock b/Cargo.lock index 6b84f7b0..22db8d98 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -630,6 +630,7 @@ dependencies = [ "enum_dispatch", "futures", "futures-util", + "glob-match", "http", "humansize", "humantime 2.1.0", @@ -989,6 +990,12 @@ version = "0.27.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ad0a93d233ebf96623465aad4046a8d3aa4da22d4f4beba5388838c8a434bbb4" +[[package]] +name = "glob-match" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9985c9503b412198aa4197559e9a318524ebc4519c229bfa05a535828c950b9d" + [[package]] name = "h2" version = "0.3.16" diff --git a/README.md b/README.md index 92bf7a48..1ab8f595 100644 --- a/README.md +++ b/README.md @@ -15,7 +15,7 @@ Plus: * [Minimal TCP Convergence Layer](https://tools.ietf.org/html/draft-ietf-dtn-mtcpcl-01) * A simple [HTTP Convergence Layer](doc/http-cl.md) * A [HTTP pull-based Convergence Layer](doc/http-pull-cl.md) -* An IP neighorhood discovery service +* An IP neighborhood discovery service * Convenient command line tools to interact with the daemon * A simple web interface for status information about `dtnd` * A [web-socket interface](doc/http-client-api.md) for application agents @@ -25,7 +25,7 @@ The actual BP7 implementation (encoding/decoding) is available as a separate [pr Additional dtn extensions and a client library are also [available](https://crates.io/crates/dtn7-plus). -Currently, a service discovery based on IPND but adapted to CBOR and BPv7, TCP, MTCP & HTTP CLs, sprayandwait/flooding/epidemic/sink-routing and restful/websocket command interfaces are implemented. +Currently, a service discovery based on IPND but adapted to CBOR and BPv7, TCP, MTCP & HTTP CLs, sprayandwait/flooding/epidemic/static/sink-routing and restful/websocket command interfaces are implemented. Both addressing schemes, *dtn* as well as *ipn* are supported. Furthermore, some CLI tools are provided to easily integrate *dtn7* into shell scripts. diff --git a/core/dtn7/Cargo.toml b/core/dtn7/Cargo.toml index dd2a0e39..682bbd8d 100644 --- a/core/dtn7/Cargo.toml +++ b/core/dtn7/Cargo.toml @@ -77,6 +77,7 @@ dtn7-codegen = { path = "../codegen", version = "0.1.0" } byteorder = "1.4.3" reqwest = { version = "0.11.13", default-features = false, features = ["json"] } sha1 = "0.10.5" +glob-match = "0.2.1" [lib] name = "dtn7" diff --git a/core/dtn7/src/dtnd/httpd.rs b/core/dtn7/src/dtnd/httpd.rs index 108f7f34..b11bb643 100644 --- a/core/dtn7/src/dtnd/httpd.rs +++ b/core/dtn7/src/dtnd/httpd.rs @@ -9,6 +9,8 @@ use crate::core::peer::PeerType; use crate::core::store::BundleStore; use crate::peers_add; use crate::peers_remove; +use crate::routing_cmd; +use crate::routing_get_data; use crate::store_remove; use crate::CONFIG; use crate::DTNCORE; @@ -329,6 +331,41 @@ async fn debug_rnd_peer() -> String { res } +async fn http_routing_cmd( + Query(params): Query>, +) -> Result { + if let Some(cmd) = params.get("c") { + if routing_cmd(cmd.to_string()).await.is_ok() { + Ok("Sent command to routing agent.".into()) + } else { + Err(( + StatusCode::INTERNAL_SERVER_ERROR, + "error sending cmd to routing agent", + )) + } + } else { + //anyhow::bail!("missing filter criteria"); + Err(( + StatusCode::BAD_REQUEST, + "missing routing command parameter cmd", + )) + } +} + +async fn http_routing_getdata( + Query(params): Query>, +) -> Result { + let param = params.get("p").map_or("".to_string(), |f| f.to_string()); + if let Ok(res) = routing_get_data(param).await { + Ok(res) + } else { + Err(( + StatusCode::INTERNAL_SERVER_ERROR, + "error getting data from routing agent", + )) + } +} + async fn http_peers_add( Query(params): Query>, ) -> Result { @@ -699,6 +736,8 @@ pub async fn spawn_httpd() -> Result<()> { let mut app_local_only = Router::new() .route("/peers/add", get(http_peers_add)) .route("/peers/del", get(http_peers_delete)) + .route("/routing/cmd", get(http_routing_cmd).post(http_routing_cmd)) + .route("/routing/getdata", get(http_routing_getdata)) .route("/send", post(send_post)) .route("/delete", get(delete).delete(delete)) .route("/register", get(register)) diff --git a/core/dtn7/src/lib.rs b/core/dtn7/src/lib.rs index 774acae4..9e91a45f 100644 --- a/core/dtn7/src/lib.rs +++ b/core/dtn7/src/lib.rs @@ -218,6 +218,26 @@ pub fn store_delete_expired() { } } } +pub async fn routing_cmd(cmd: String) -> Result<()> { + let chan = DTNCORE.lock().routing_agent.channel(); + if let Err(err) = chan.send(RoutingCmd::Command(cmd)).await { + bail!("Error while sending notification: {}", err); + } + Ok(()) +} + +pub async fn routing_get_data(param: String) -> Result { + let (reply_tx, reply_rx) = oneshot::channel(); + + let cmd_channel = DTNCORE.lock().routing_agent.channel(); + if let Err(err) = cmd_channel.send(RoutingCmd::GetData(param, reply_tx)).await { + bail!("Error while sending command to routing agent: {}", err); + } + // wait for reply or timeout + let res = tokio::time::timeout(std::time::Duration::from_secs(1), reply_rx).await??; + + Ok(res) +} pub async fn routing_notify(notification: RoutingNotifcation) -> Result<()> { let chan = DTNCORE.lock().routing_agent.channel(); diff --git a/core/dtn7/src/routing/epidemic.rs b/core/dtn7/src/routing/epidemic.rs index 86f67f51..08455d6e 100644 --- a/core/dtn7/src/routing/epidemic.rs +++ b/core/dtn7/src/routing/epidemic.rs @@ -112,6 +112,10 @@ async fn handle_routing_cmd(mut rx: mpsc::Receiver) { super::RoutingCmd::Shutdown => { break; } + super::RoutingCmd::Command(_cmd) => {} + super::RoutingCmd::GetData(_, tx) => { + tx.send(format!("{:?}", core.history)).unwrap(); + } super::RoutingCmd::Notify(notification) => match notification { RoutingNotifcation::SendingFailed(bid, cla_sender) => { core.sending_failed(bid.as_str(), cla_sender.as_str()); diff --git a/core/dtn7/src/routing/external.rs b/core/dtn7/src/routing/external.rs index c5c5b6fd..d7ffa2c8 100644 --- a/core/dtn7/src/routing/external.rs +++ b/core/dtn7/src/routing/external.rs @@ -30,6 +30,10 @@ impl ExternalRoutingAgent { super::RoutingCmd::Shutdown => { break; } + super::RoutingCmd::Command(_cmd) => {} + super::RoutingCmd::GetData(_, tx) => { + tx.send("unimplemented!".to_string()).unwrap(); + } super::RoutingCmd::Notify(notification) => { notify(notification); } diff --git a/core/dtn7/src/routing/flooding.rs b/core/dtn7/src/routing/flooding.rs index ad862e9f..d433c35b 100644 --- a/core/dtn7/src/routing/flooding.rs +++ b/core/dtn7/src/routing/flooding.rs @@ -39,6 +39,10 @@ impl FloodingRoutingAgent { super::RoutingCmd::Shutdown => { break; } + super::RoutingCmd::Command(_cmd) => {} + super::RoutingCmd::GetData(_, tx) => { + tx.send("unimplemented!".to_string()).unwrap(); + } super::RoutingCmd::Notify(_) => {} } } diff --git a/core/dtn7/src/routing/mod.rs b/core/dtn7/src/routing/mod.rs index e39144a2..d0b8cb9e 100644 --- a/core/dtn7/src/routing/mod.rs +++ b/core/dtn7/src/routing/mod.rs @@ -4,6 +4,7 @@ pub mod external; pub mod flooding; pub mod sink; pub mod sprayandwait; +pub mod static_routing; use crate::cla::ClaSenderTask; use crate::core::bundlepack::BundlePack; @@ -17,6 +18,7 @@ use external::ExternalRoutingAgent; use flooding::FloodingRoutingAgent; use sink::SinkRoutingAgent; use sprayandwait::SprayAndWaitRoutingAgent; +use static_routing::StaticRoutingAgent; use std::fmt::Debug; use std::fmt::Display; use tokio::sync::{mpsc, oneshot}; @@ -37,11 +39,14 @@ pub enum RoutingAgentsEnum { SinkRoutingAgent, ExternalRoutingAgent, SprayAndWaitRoutingAgent, + StaticRoutingAgent, } pub enum RoutingCmd { SenderForBundle(BundlePack, oneshot::Sender<(Vec, bool)>), Notify(RoutingNotifcation), + Command(String), + GetData(String, oneshot::Sender), Shutdown, } @@ -60,11 +65,18 @@ pub trait RoutingAgent: Debug + Display { } pub fn routing_algorithms() -> Vec<&'static str> { - vec!["epidemic", "flooding", "sink", "external", "sprayandwait"] + vec![ + "epidemic", + "flooding", + "sink", + "external", + "sprayandwait", + "static", + ] } pub fn routing_options() -> Vec<&'static str> { - vec!["sprayandwait.num_copies="] + vec!["sprayandwait.num_copies=", "static.routes="] } pub fn new(routingagent: &str) -> RoutingAgentsEnum { @@ -72,6 +84,7 @@ pub fn new(routingagent: &str) -> RoutingAgentsEnum { "flooding" => FloodingRoutingAgent::new().into(), "epidemic" => EpidemicRoutingAgent::new().into(), "sink" => SinkRoutingAgent::new().into(), + "static" => StaticRoutingAgent::new().into(), "external" => ExternalRoutingAgent::new().into(), "sprayandwait" => SprayAndWaitRoutingAgent::new().into(), _ => panic!("Unknown routing agent {}", routingagent), diff --git a/core/dtn7/src/routing/sink.rs b/core/dtn7/src/routing/sink.rs index ef87ed5b..b371a74e 100644 --- a/core/dtn7/src/routing/sink.rs +++ b/core/dtn7/src/routing/sink.rs @@ -26,6 +26,10 @@ impl SinkRoutingAgent { super::RoutingCmd::Shutdown => { break; } + super::RoutingCmd::Command(_cmd) => {} + super::RoutingCmd::GetData(_, tx) => { + tx.send("unimplemented!".to_string()).unwrap(); + } super::RoutingCmd::Notify(_) => {} } } diff --git a/core/dtn7/src/routing/sprayandwait.rs b/core/dtn7/src/routing/sprayandwait.rs index cdd04de0..256f612b 100644 --- a/core/dtn7/src/routing/sprayandwait.rs +++ b/core/dtn7/src/routing/sprayandwait.rs @@ -18,6 +18,7 @@ pub struct SprayAndWaitRoutingAgent { tx: mpsc::Sender, } +#[derive(Debug)] pub struct SaWBundleData { /// the number of copies we have left to spread remaining_copies: usize, @@ -203,6 +204,10 @@ async fn handle_routing_cmd(mut rx: mpsc::Receiver) { super::RoutingCmd::Shutdown => { break; } + super::RoutingCmd::Command(_cmd) => {} + super::RoutingCmd::GetData(_, tx) => { + tx.send(format!("{:?}", core.history)).unwrap(); + } super::RoutingCmd::Notify(notification) => { handle_notification(&mut core, notification); } diff --git a/core/dtn7/src/routing/static_routing.rs b/core/dtn7/src/routing/static_routing.rs new file mode 100644 index 00000000..e0f3ef65 --- /dev/null +++ b/core/dtn7/src/routing/static_routing.rs @@ -0,0 +1,178 @@ +use std::fmt::Display; + +use crate::{CONFIG, PEERS}; + +use super::{RoutingAgent, RoutingCmd}; +use async_trait::async_trait; +use glob_match::glob_match; +use log::{debug, info}; +use tokio::sync::mpsc; +use tokio::sync::mpsc::Sender; + +#[derive(Debug)] +pub struct StaticRouteEntry { + /// index in the routing table + pub idx: u16, + /// source eid, wildcards are allowed + pub src: String, + /// destination eid, wildcards are allowed + pub dst: String, + /// next hop eid + pub via: String, +} + +impl Display for StaticRouteEntry { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + write!( + f, + "#{}: route from {} to {} via {}", + self.idx, self.src, self.dst, self.via + ) + } +} + +#[derive(Debug)] +pub struct StaticRoutingAgent { + tx: mpsc::Sender, +} + +#[derive(Debug)] +pub struct StaticRoutingAgentCore { + routes: Vec, +} + +impl Default for StaticRoutingAgent { + fn default() -> Self { + StaticRoutingAgent::new() + } +} + +impl StaticRoutingAgent { + pub fn new() -> Self { + let (tx, rx) = mpsc::channel(1); + tokio::spawn(async move { + handle_routing_cmd(rx).await; + }); + StaticRoutingAgent { tx } + } +} + +#[async_trait] +impl RoutingAgent for StaticRoutingAgent { + fn channel(&self) -> Sender { + self.tx.clone() + } +} + +impl std::fmt::Display for StaticRoutingAgent { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + write!(f, "StaticRoutingAgent") + } +} + +fn parse_route_from_str(s: &str) -> Option { + let s = s.trim(); + if s.starts_with('#') || s.is_empty() { + return None; + } + let mut parts = s.split_whitespace(); + let idx = parts.next().unwrap().parse::().unwrap(); + let src = parts.next().unwrap(); + let dst = parts.next().unwrap(); + let via = parts.next().unwrap(); + Some(StaticRouteEntry { + idx, + src: src.to_string(), + dst: dst.to_string(), + via: via.to_string(), + }) +} + +async fn handle_routing_cmd(mut rx: mpsc::Receiver) { + let mut route_entries = vec![]; + let settings = CONFIG.lock().routing_settings.clone(); + + if let Some(static_settings) = settings.get("static") { + if let Some(routes_file) = static_settings.get("routes") { + // open file and read routes line by line + let routes = std::fs::read_to_string(routes_file).unwrap(); + for line in routes.lines() { + if let Some(entry) = parse_route_from_str(line) { + debug!("Adding static route: {}", entry); + route_entries.push(entry); + } + } + } + } + + let mut core: StaticRoutingAgentCore = StaticRoutingAgentCore { + routes: route_entries, + }; + + while let Some(cmd) = rx.recv().await { + match cmd { + super::RoutingCmd::SenderForBundle(bp, reply) => { + let mut clas = vec![]; + let mut delete_afterwards = false; + 'route_loop: for route in &core.routes { + if glob_match(&route.src, &bp.source.to_string()) + && glob_match(&route.dst, &bp.destination.to_string()) + { + debug!( + "Found route: {}, looking for valid peer ({})", + route, route.via + ); + for (_, p) in (*PEERS.lock()).iter() { + if p.eid.to_string() == route.via { + if let Some(cla) = p.first_cla() { + clas.push(cla); + delete_afterwards = + p.node_name() == bp.destination.node().unwrap(); + break 'route_loop; + } + } + } + debug!("No valid peer found for route {}", route) + } + } + if clas.is_empty() { + debug!("No route found for bundle {}", bp); + } + reply.send((clas, delete_afterwards)).unwrap(); + } + super::RoutingCmd::Shutdown => { + break; + } + super::RoutingCmd::Command(cmd) => { + if cmd == "reload" { + let settings = CONFIG.lock().routing_settings.clone(); + if let Some(static_settings) = settings.get("static") { + if let Some(routes_file) = static_settings.get("routes") { + info!("Reloading static routes from {}", routes_file); + // open file and read routes line by line + let routes = std::fs::read_to_string(routes_file).unwrap(); + let mut route_entries = vec![]; + for line in routes.lines() { + if let Some(entry) = parse_route_from_str(line) { + debug!("Adding static route: {}", entry); + route_entries.push(entry); + } + } + core.routes = route_entries; + } + } + } else { + debug!("Unknown command: {}", cmd); + } + } + super::RoutingCmd::GetData(_, tx) => { + let routes_as_str = core + .routes + .iter() + .fold(String::new(), |acc, r| acc + &format!("{}\n", r)); + tx.send(routes_as_str).unwrap(); + } + super::RoutingCmd::Notify(_) => {} + } + } +} diff --git a/doc/http-client-api.md b/doc/http-client-api.md index 83e9ab10..61031df4 100644 --- a/doc/http-client-api.md +++ b/doc/http-client-api.md @@ -88,6 +88,23 @@ $ curl "http://127.0.0.1:3000/peers/del?p=tcp://127.0.0.1:4223/node2" Removed peer ``` +### **GET**, **POST** `/routing/cmd?c=` + +Send a command to the routing daemon, e.g., *static* accepts to `reload` to reload the routing information from the configured file. +``` +$ curl http://127.0.0.1:3000/routing/cmd?cmd='reload' +Sent command to routing agent. +``` +### **GET** `/routing/getdata?p=` + +Get internal data from routing agent. +Not all routing agents respond to this. + +``` +$ curl http://127.0.0.1:3000/routing/getdata +#1: route from * to ipn:[2-3].* via ipn:2.0 +``` + ### **GET**, **POST** `/insert` Insert is used to send a newly constructed bundle from this node instance. diff --git a/run_all_tests.sh b/run_all_tests.sh index 7cda16c9..d3f395a1 100755 --- a/run_all_tests.sh +++ b/run_all_tests.sh @@ -52,6 +52,7 @@ cargo test $TARGET_OPT && filter_output ./tests/erouting_epidemic.sh && filter_output ./tests/ecla_erouting_test_mtcp.sh && filter_output ./tests/routing_saw.sh && + filter_output ./tests/routing_static.sh && filter_output ./tests/ext_peer_management.sh && filter_output ./tests/ext_peer_management_ipn.sh && echo "SUCCESS" diff --git a/tests/routes_1.csv b/tests/routes_1.csv new file mode 100644 index 00000000..1b81c2fe --- /dev/null +++ b/tests/routes_1.csv @@ -0,0 +1,2 @@ +# rule #1: match any source and route for destination ipn:2.* and ipn:3.* via ipn:2.0 node +1 * ipn:[2-3].* ipn:2.0 diff --git a/tests/routes_2.csv b/tests/routes_2.csv new file mode 100644 index 00000000..20fb8836 --- /dev/null +++ b/tests/routes_2.csv @@ -0,0 +1,2 @@ +1 ipn:1.* ipn:3.* ipn:3.0 +2 ipn:3.* ipn:1.* ipn:1.0 diff --git a/tests/routes_3.csv b/tests/routes_3.csv new file mode 100644 index 00000000..7cfb44bf --- /dev/null +++ b/tests/routes_3.csv @@ -0,0 +1 @@ +1 * * ipn:2.0 \ No newline at end of file diff --git a/tests/routing_static.sh b/tests/routing_static.sh new file mode 100755 index 00000000..177f8bea --- /dev/null +++ b/tests/routing_static.sh @@ -0,0 +1,45 @@ +#!/bin/bash + +. $(dirname $(perl -MCwd -e 'print Cwd::abs_path shift' "$0"))/libshelltests.sh + +prepare_test + +STATUS_REPORTS="-g" + +PORT_NODE1=$(get_current_port) +start_dtnd -d -j0 -i0 -C mtcp:port=2342 -e 42 -r static -n 1 -s mtcp://127.0.0.1:4223/2 $STATUS_REPORTS -R static.routes=tests/routes_1.csv + +PORT_NODE2=$(get_current_port) +#DB2="-W /tmp/node2 -D sled" +#DB2="-W /tmp/node2 -D sneakers" +start_dtnd -d -j0 -i0 -C mtcp:port=4223 -e 42 -r static \ + -n 2 \ + -s mtcp://127.0.0.1:2342/1 \ + -s mtcp://127.0.0.1:2432/3 \ + $STATUS_REPORTS \ + -R static.routes=tests/routes_2.csv + +PORT_NODE3=$(get_current_port) + +start_dtnd -d -j0 -i0 -C mtcp:port=2432 -e 42 -r static -n 3 -s mtcp://127.0.0.1:4223/2 $STATUS_REPORTS -R static.routes=tests/routes_3.csv + +sleep 1 + +echo + +echo "Sending 'test' to ipn:3.42" +echo test | $BINS/dtnsend -r ipn:3.42 -p $PORT_NODE1 + +sleep 1 + +echo -n "Receiving on node 3: " +$BINS/dtnrecv -v -e 42 -p $PORT_NODE3 +RC=$? +echo "RET: $RC" +echo + +wait_for_key $1 + +cleanup + +exit $RC \ No newline at end of file