From ebd04e7905a5ca0cf4908b2f44589dee2261ee90 Mon Sep 17 00:00:00 2001 From: Christian M Date: Sat, 14 Oct 2023 09:29:46 +0200 Subject: [PATCH] :recycle: refactored bluetooth --- Cargo.toml | 7 +- examples/bt_rfcomm_client.rs | 5 +- src/bluetooth/mod.rs | 10 +- src/bluetooth/rfcomm_srv.rs | 326 ++++++++++++++++------------------- 4 files changed, 162 insertions(+), 186 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index e066254..402b828 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,9 +11,10 @@ build = "build.rs" [features] default = ["server", "camera", "yolo"] -full = ["bluetooth", "camera", "detect", "hotspot", "server"] -pi = ["bluetooth", "camera", "detect", "server"] -bluetooth = ["base64", "bluer"] +full = ["bt-server", "camera", "detect", "hotspot", "server"] +pi = ["bt-server", "camera", "detect", "server"] +bluetooth = ["base64"] +bt-server = ["bluetooth", "bluer"] camera = ["nokhwa/input-native"] hotspot = ["wifi-rs"] server = ["axum", "base64", "tower", "tower-http"] diff --git a/examples/bt_rfcomm_client.rs b/examples/bt_rfcomm_client.rs index 2246bf7..2894c24 100644 --- a/examples/bt_rfcomm_client.rs +++ b/examples/bt_rfcomm_client.rs @@ -3,10 +3,7 @@ use bluer::{ Address, }; -use ornithology_pi::bluetooth::{ - rfcomm_srv::{CHANNEL, MTU}, - Message, -}; +use ornithology_pi::bluetooth::{Message, CHANNEL, MTU}; use std::time::Duration; use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::time::sleep; diff --git a/src/bluetooth/mod.rs b/src/bluetooth/mod.rs index 478f6db..4d65543 100644 --- a/src/bluetooth/mod.rs +++ b/src/bluetooth/mod.rs @@ -9,6 +9,10 @@ pub mod gatt_srv; pub mod rfcomm_srv; pub const MANUFACTURER_ID: u16 = 0xf00d; +pub const SERVICE_UUID: uuid::Uuid = uuid::Uuid::from_u128(0xF00DC0DE00001); +pub const CHARACTERISTIC_UUID: uuid::Uuid = uuid::Uuid::from_u128(0xF00DC0DE00002); +pub const CHANNEL: u8 = 7; +pub const MTU: u16 = 8192; pub async fn setup_session(session: &bluer::Session) -> bluer::Result<()> { let adapter_names = session.adapter_names().await?; @@ -35,11 +39,7 @@ pub async fn run_bluetooth(sightings: Arc>>) -> bluer::Resul .unwrap(); */ - rfcomm_srv::run_session(&session, sightings.clone()) - .await - .unwrap(); - - //drop(gatt_handle); + rfcomm_srv::run_session(&session, sightings.clone()).await?; Ok(()) } diff --git a/src/bluetooth/rfcomm_srv.rs b/src/bluetooth/rfcomm_srv.rs index 0e9b0c7..1126db7 100644 --- a/src/bluetooth/rfcomm_srv.rs +++ b/src/bluetooth/rfcomm_srv.rs @@ -1,4 +1,5 @@ use super::Message; +use crate::bluetooth::setup_session; use crate::sighting::save_to_file; use crate::Sighting; use base64; @@ -21,11 +22,136 @@ use tokio::{ time::sleep, }; +use super::CHANNEL; +use super::CHARACTERISTIC_UUID; use super::MANUFACTURER_ID; -pub const SERVICE_UUID: uuid::Uuid = uuid::Uuid::from_u128(0xF00DC0DE00001); -pub const CHARACTERISTIC_UUID: uuid::Uuid = uuid::Uuid::from_u128(0xF00DC0DE00002); -pub const CHANNEL: u8 = 7; -pub const MTU: u16 = 8192; +use super::MTU; +use super::SERVICE_UUID; + +async fn handle_message( + message: Message, + sightings: &Arc>>, + stream: &mut Stream, +) -> Result<(), Box> { + match message { + Message::Ping => { + log::debug!("{:?}", Message::Ping); + let response = serde_json::to_vec(&Message::Pong).unwrap(); + + stream.write_all(&response).await?; + } + Message::Pong => { + log::debug!("{:?}", Message::Pong); + } + Message::CountRequest => { + let count = { + let len = sightings.lock().unwrap().len(); + len as u64 + }; + let response = serde_json::to_vec(&Message::CountResponse { count }).unwrap(); + stream.write_all(&response).await?; + } + Message::LastRequest => { + let sighting = { + let mutex = sightings.lock().unwrap(); + let last = mutex.last(); + + last.unwrap().clone() + }; + log::debug!("{:?}", sighting); + let response = serde_json::to_vec(&Message::LastResponse { + last: sighting.clone(), + })?; + + stream.write_all(&response).await?; + } + Message::SightingIdsRequest => { + let sightings = { + let mutex: Vec = sightings.lock().unwrap().to_vec(); + let sightings: Vec = mutex.into_iter().map(|i| i.uuid).collect(); + sightings + }; + let response = serde_json::to_vec(&Message::SightingIdsResponse { + ids: sightings.clone(), + }) + .unwrap(); + + stream.write_all(&response).await?; + } + Message::SightingRequest { uuid } => { + log::debug!("sighting {}", uuid); + let sighting = { + let sightings = sightings.lock().unwrap(); + let sighting = sightings + .iter() + .filter(|sighting| sighting.uuid == uuid) + .last() + .cloned(); + sighting.unwrap_or_default() + }; + let response = serde_json::to_vec(&Message::SightingResponse { sighting }).unwrap(); + + stream.write_all(&response).await?; + } + Message::RemoveSightingRequest { uuid } => { + log::debug!("remove sighting {}", uuid); + let sightings = { + let mut sightings = sightings.lock().unwrap(); + let index = sightings.iter().position(|x| x.uuid == uuid).unwrap(); + sightings.remove(index); + sightings.to_vec() + }; + save_to_file(sightings.clone(), "sightings/sightings.db").unwrap(); + let sightings = { + let sightings: Vec = sightings.into_iter().map(|i| i.uuid).collect(); + sightings + }; + let response = serde_json::to_vec(&Message::SightingIdsResponse { + ids: sightings.clone(), + })?; + + stream.write_all(&response).await?; + } + Message::ImageRequest { uuid } => { + log::debug!("{}", uuid); + let filename = { + let sightings = sightings.lock().unwrap(); + let sighting = sightings + .iter() + .filter(|sighting| sighting.uuid == uuid) + .last() + .cloned(); + let sighting = sighting.unwrap_or_default(); + format!("{}_{}.jpg", sighting.species, sighting.uuid) + }; + let buf = match image::open(format!("sightings/{}", filename)) { + Ok(base_img) => { + let base_img = base_img.resize(640, 480, FilterType::Gaussian); + let mut buf = Cursor::new(Vec::new()); + base_img + .write_to(&mut buf, image::ImageOutputFormat::Jpeg(60)) + .unwrap(); + buf.into_inner() + } + Err(err) => { + log::debug!("{:?}", err); + vec![] + } + }; + let base64_img = format!("data:image/jpeg;{}", base64::encode(&buf)); + let response = serde_json::to_vec(&Message::ImageResponse { + uuid, + base64: base64_img.clone(), + })?; + + stream.write_all(&response).await? + } + _ => { + log::debug!("Read {:?} bytes", message); + } + } + Ok(()) +} async fn handle_connection( sightings: Arc>>, @@ -34,16 +160,17 @@ async fn handle_connection( ) -> Result<(), Box> { let recv_mtu = MTU; - println!( + log::debug!( "Accepted connection from {:?} with receive MTU {} bytes", - &addr, &recv_mtu + &addr, + &recv_mtu ); if let Err(err) = stream .write_all(format!("{:?}", sightings.lock().unwrap().len()).as_bytes()) .await { - println!("Write failed: {}", &err); + log::debug!("Write failed: {}", &err); } loop { @@ -52,12 +179,12 @@ async fn handle_connection( let n = match stream.read(&mut buf).await { Ok(0) => { - println!("Stream ended"); + log::debug!("Stream ended"); break; } Ok(n) => n, Err(err) => { - println!("Read failed: {}", &err); + log::debug!("Read failed: {}", &err); break; } }; @@ -67,157 +194,18 @@ async fn handle_connection( for message in message_stream { match message { - Ok(Message::Ping) => { - println!("{:?}", Message::Ping); - let response = serde_json::to_vec(&Message::Pong).unwrap(); - - if let Err(err) = stream.write_all(&response).await { - println!("Write failed: {}", &err); - continue; - } - } - Ok(Message::Pong) => { - println!("{:?}", Message::Pong); - } - Ok(Message::CountRequest) => { - let count = { - let len = sightings.lock().unwrap().len(); - len as u64 - }; - let response = serde_json::to_vec(&Message::CountResponse { count }).unwrap(); - if let Err(err) = stream.write_all(&response).await { - println!("Write failed: {}", &err); - continue; - } - } - Ok(Message::LastRequest) => { - let sighting = { - let mutex = sightings.lock().unwrap(); - let last = mutex.last(); - - last.unwrap().clone() - }; - println!("{:?}", sighting); - let response = serde_json::to_vec(&Message::LastResponse { - last: sighting.clone(), - }) - .unwrap(); - - if let Err(err) = stream.write_all(&response).await { - println!("Write failed: {}", &err); - continue; - } - } - Ok(Message::SightingIdsRequest) => { - let sightings = { - let mutex: Vec = sightings.lock().unwrap().to_vec(); - let sightings: Vec = mutex.into_iter().map(|i| i.uuid).collect(); - sightings - }; - let response = serde_json::to_vec(&Message::SightingIdsResponse { - ids: sightings.clone(), - }) - .unwrap(); - - if let Err(err) = stream.write_all(&response).await { - println!("Write failed: {}", &err); - continue; - } - } - Ok(Message::SightingRequest { uuid }) => { - println!("sighting {}", uuid); - let sighting = { - let sightings = sightings.lock().unwrap(); - let sighting = sightings - .iter() - .filter(|sighting| sighting.uuid == uuid) - .last() - .cloned(); - sighting.unwrap_or_default() - }; - let response = - serde_json::to_vec(&Message::SightingResponse { sighting }).unwrap(); - - if let Err(err) = stream.write_all(&response).await { - println!("Write failed: {}", &err); - continue; - } - } - Ok(Message::RemoveSightingRequest { uuid }) => { - println!("remove sighting {}", uuid); - let sightings = { - let mut sightings = sightings.lock().unwrap(); - let index = sightings.iter().position(|x| x.uuid == uuid).unwrap(); - sightings.remove(index); - sightings.to_vec() - }; - save_to_file(sightings.clone(), "sightings/sightings.db").unwrap(); - let sightings = { - let sightings: Vec = - sightings.into_iter().map(|i| i.uuid).collect(); - sightings - }; - let response = serde_json::to_vec(&Message::SightingIdsResponse { - ids: sightings.clone(), - }) - .unwrap(); - - if let Err(err) = stream.write_all(&response).await { - println!("Write failed: {}", &err); - continue; - } - } - Ok(Message::ImageRequest { uuid }) => { - println!("{}", uuid); - let filename = { - let sightings = sightings.lock().unwrap(); - let sighting = sightings - .iter() - .filter(|sighting| sighting.uuid == uuid) - .last() - .cloned(); - let sighting = sighting.unwrap_or_default(); - format!("{}_{}.jpg", sighting.species, sighting.uuid) - }; - let buf = match image::open(format!("sightings/{}", filename)) { - Ok(base_img) => { - let base_img = base_img.resize(640, 480, FilterType::Gaussian); - let mut buf = Cursor::new(Vec::new()); - base_img - .write_to(&mut buf, image::ImageOutputFormat::Jpeg(60)) - .unwrap(); - buf.into_inner() - } - Err(err) => { - println!("{:?}", err); - vec![] - } - }; - let base64_img = format!("data:image/jpeg;{}", base64::encode(&buf)); - let response = serde_json::to_vec(&Message::ImageResponse { - uuid, - base64: base64_img.clone(), - }) - .unwrap(); - - if let Err(err) = stream.write_all(&response).await { - println!("Write failed: {}", &err); - continue; - } - } - _ => { - let text = std::str::from_utf8(buf).unwrap(); - println!("Echoing {} bytes: {}", buf.len(), text); - if let Err(err) = stream.write_all(buf).await { - println!("Write failed: {}", &err); - continue; + Ok(valid_message) => { + match handle_message(valid_message, &sightings, stream).await { + Ok(_) => (), + Err(e) => log::debug!("Error handling message: {:?}", e), } } + Err(e) => log::debug!("Error parsing message: {:?}", e), } } } - println!("{} disconnected", &addr); + log::debug!("{} disconnected", &addr); Ok(()) } @@ -239,55 +227,45 @@ pub async fn run_session( ..Default::default() }; - println!("Registered profile {}", profile.uuid); + log::debug!("Registered profile {}", profile.uuid); let mut hndl = session.register_profile(profile).await?; - println!("Listening on channel {}", CHANNEL); + log::debug!("Listening on channel {}", CHANNEL); loop { - println!("\nWaiting for connection..."); + log::debug!("\nWaiting for connection..."); let req = hndl.next().await.expect("received no connect request"); let sa = req.device(); let mut stream = match req.accept() { Ok(v) => v, Err(err) => { - println!("Accepting connection failed: {}", &err); + log::debug!("Accepting connection failed: {}", &err); continue; } }; let recv_mtu = MTU; - println!( + log::debug!( "Accepted connection from {:?} with receive MTU {} bytes", - &sa, &recv_mtu + &sa, + &recv_mtu ); match handle_connection(sightings.clone(), &mut stream, sa).await { - Err(err) => println!("{:?}", err), + Err(err) => log::debug!("{:?}", err), _ => (), } } - println!("Removing advertisement"); + log::debug!("Removing advertisement"); drop(hndl); drop(_agent_hndl); } pub async fn run(sightings: Arc>>) -> bluer::Result<()> { let session = bluer::Session::new().await?; - let adapter_names = session.adapter_names().await?; - let adapter_name = adapter_names.first().expect("No Bluetooth adapter present"); - let adapter = session.adapter(adapter_name)?; - adapter.set_powered(true).await?; - adapter.set_discoverable(true).await?; - adapter.set_discoverable_timeout(0).await?; - adapter.set_pairable(false).await?; - println!( - "Advertising on Bluetooth adapter {} with address {}", - &adapter_name, - adapter.address().await? - ); + setup_session(&session).await?; run_session(&session, sightings).await.unwrap(); sleep(Duration::from_secs(1)).await;