Skip to content

Commit

Permalink
✨ adds gatt example
Browse files Browse the repository at this point in the history
  • Loading branch information
chriamue committed Oct 15, 2023
1 parent af65fe1 commit e06a581
Show file tree
Hide file tree
Showing 3 changed files with 307 additions and 9 deletions.
205 changes: 205 additions & 0 deletions examples/bt_gatt_client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,205 @@
//! Connects to the Bluetooth GATT echo service and tests it.

use bluer::{gatt::remote::Characteristic, AdapterEvent, Device, Result};
use futures::{pin_mut, StreamExt};
use rand::Rng;
use std::time::Duration;
use tokio::{
io::{AsyncReadExt, AsyncWriteExt},
time::{sleep, timeout},
};
use pretty_env_logger::env_logger;

use ornithology_pi::bluetooth::SERVICE_UUID;
use ornithology_pi::bluetooth::CHARACTERISTIC_UUID;

async fn find_our_characteristic(device: &Device) -> Result<Option<Characteristic>> {
let addr = device.address();
let uuids = device.uuids().await?.unwrap_or_default();
println!("Discovered device {} with service UUIDs {:?}", addr, &uuids);
let md = device.manufacturer_data().await?;
println!(" Manufacturer data: {:x?}", &md);

if uuids.contains(&SERVICE_UUID) {
println!(" Device provides our service!");
if !device.is_connected().await? {
println!(" Connecting...");
let mut retries = 2;
loop {
match device.connect().await {
Ok(()) => break,
Err(err) if retries > 0 => {
println!(" Connect error: {}", &err);
retries -= 1;
}
Err(err) => return Err(err),
}
}
println!(" Connected");
} else {
println!(" Already connected");
}

println!(" Enumerating services...");
for service in device.services().await? {
let uuid = service.uuid().await?;
println!(" Service UUID: {}", &uuid);
if uuid == SERVICE_UUID {
println!(" Found our service!");
for char in service.characteristics().await? {
let uuid = char.uuid().await?;
println!(" Characteristic UUID: {}", &uuid);
if uuid == CHARACTERISTIC_UUID {
println!(" Found our characteristic!");
return Ok(Some(char));
}
}
}
}

println!(" Not found!");
}

Ok(None)
}

async fn exercise_characteristic(char: &Characteristic) -> Result<()> {
let mut write_io = char.write_io().await?;
println!(" Obtained write IO with MTU {} bytes", write_io.mtu());
let mut notify_io = char.notify_io().await?;
println!(" Obtained notification IO with MTU {} bytes", notify_io.mtu());

// Flush notify buffer.
let mut buf = [0; 1024];
while let Ok(Ok(_)) = timeout(Duration::from_secs(1), notify_io.read(&mut buf)).await {}

let mut rng = rand::thread_rng();
for i in 0..1024 {
let mut len = rng.gen_range(0..20000);

// Try to trigger packet reordering over EATT.
if i % 10 == 0 {
// Big packet is split into multiple small packets.
// (by L2CAP layer, because GATT MTU is bigger than L2CAP MTU)
len = write_io.mtu(); // 512
}
if i % 10 == 1 {
// Small packet can use different L2CAP channel when EATT is enabled.
len = 20;
}
// Thus small packet can arrive before big packet.
// The solution is to disable EATT in /etc/bluetooth/main.conf.

println!(" Test iteration {i} with data size {len}");
let data: Vec<u8> = (0..len).map(|_| rng.gen()).collect();

// We must read back the data while sending, otherwise the connection
// buffer will overrun and we will lose data.
let read_task = tokio::spawn(async move {
let mut echo_buf = vec![0u8; len];
let res = match notify_io.read_exact(&mut echo_buf).await {
Ok(_) => Ok(echo_buf),
Err(err) => Err(err),
};
(notify_io, res)
});

// Note that write_all will automatically split the buffer into
// multiple writes of MTU size.
write_io.write_all(&data).await.expect("write failed");

println!(" Waiting for echo");
let (notify_io_back, res) = read_task.await.unwrap();
notify_io = notify_io_back;
let echo_buf = res.expect("read failed");

if echo_buf != data {
println!();
println!("Echo data mismatch!");
println!("Send data: {:x?}", &data);
println!("Received data: {:x?}", &echo_buf);
println!();
println!("By 512 blocks:");
for (sent, recv) in data.chunks(512).zip(echo_buf.chunks(512)) {
println!();
println!(
"Send: {:x?} ... {:x?}",
&sent[0..4.min(sent.len())],
&sent[sent.len().saturating_sub(4)..]
);
println!(
"Recv: {:x?} ... {:x?}",
&recv[0..4.min(recv.len())],
&recv[recv.len().saturating_sub(4)..]
);
}
println!();

panic!("echoed data does not match sent data");
}
println!(" Data matches");
}

println!(" Test okay");
Ok(())
}

#[tokio::main]
async fn main() -> bluer::Result<()> {
env_logger::init();
let session = bluer::Session::new().await?;
let binding = session.adapter_names().await?;
let adapter_name = binding.last().unwrap();
let adapter = session.adapter(adapter_name)?;
adapter.set_powered(true).await?;

{
println!(
"Discovering on Bluetooth adapter {} with address {}\n",
adapter.name(),
adapter.address().await?
);
let discover = adapter.discover_devices().await?;
pin_mut!(discover);
let mut done = false;
while let Some(evt) = discover.next().await {
match evt {
AdapterEvent::DeviceAdded(addr) => {
let device = adapter.device(addr)?;
match find_our_characteristic(&device).await {
Ok(Some(char)) => match exercise_characteristic(&char).await {
Ok(()) => {
println!(" Characteristic exercise completed");
done = true;
}
Err(err) => {
println!(" Characteristic exercise failed: {}", &err);
}
},
Ok(None) => (),
Err(err) => {
println!(" Device failed: {}", &err);
let _ = adapter.remove_device(device.address()).await;
}
}
match device.disconnect().await {
Ok(()) => println!(" Device disconnected"),
Err(err) => println!(" Device disconnection failed: {}", &err),
}
println!();
}
AdapterEvent::DeviceRemoved(addr) => {
println!("Device removed {addr}");
}
_ => (),
}
if done {
break;
}
}
println!("Stopping discovery");
}

sleep(Duration::from_secs(1)).await;
Ok(())
}
107 changes: 100 additions & 7 deletions src/bluetooth/gatt_srv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,32 @@ use crate::bluetooth::setup_session;
use crate::Sighting;
use bluer::{
adv::Advertisement,
gatt::local::{Application, Characteristic, CharacteristicRead, Service},
gatt::{
local::{
characteristic_control, Application, ApplicationHandle, Characteristic,
CharacteristicControl, CharacteristicControlEvent, CharacteristicControlHandle,
CharacteristicNotify, CharacteristicNotifyMethod, CharacteristicRead,
CharacteristicWrite, CharacteristicWriteMethod, Service,
},
CharacteristicReader, CharacteristicWriter,
},
};
use futures::FutureExt;
use std::{
collections::BTreeMap,
sync::{Arc, Mutex},
time::Duration,
};
use futures::{future, pin_mut, StreamExt};
use tokio::{
io::{AsyncBufReadExt, BufReader},
io::{AsyncBufReadExt, BufReader, AsyncReadExt, AsyncWriteExt},
time::sleep,
};

use super::MANUFACTURER_ID;
use super::{MANUFACTURER_ID, CHARACTERISTIC_UUID};

use super::{SERVICE_UUID, CHANNEL, MTU};

pub const SERVICE_UUID: uuid::Uuid = uuid::Uuid::from_u128(0xF00DC0DE00001);
pub const LAST_SIGHTING_CHARACTERISTIC: uuid::Uuid = uuid::Uuid::from_u128(0xF00DC0DE00003);
pub const SIGHTING_COUNT_CHARACTERISTIC: uuid::Uuid = uuid::Uuid::from_u128(0xF00DC0DE00004);
pub const LAST_SPECIES_CHARACTERISTIC: uuid::Uuid = uuid::Uuid::from_u128(0xF00DC0DE00005);
Expand Down Expand Up @@ -96,6 +106,23 @@ pub fn last_species_characteristic(sightings: Arc<Mutex<Vec<Sighting>>>) -> Char
}
}

pub fn stream_characteristic(char_handle: CharacteristicControlHandle, sightings: Arc<Mutex<Vec<Sighting>>>) -> Characteristic {
Characteristic { uuid: CHARACTERISTIC_UUID,
write: Some(CharacteristicWrite {
write_without_response: true,
method: CharacteristicWriteMethod::Io,
..Default::default()
}),
notify: Some(CharacteristicNotify {
notify: true,
method: CharacteristicNotifyMethod::Io,
..Default::default()
}),
control_handle: char_handle,
..Default::default()
}
}

pub async fn run_advertise(
adapter: &bluer::Adapter,
) -> bluer::Result<bluer::adv::AdvertisementHandle> {
Expand All @@ -115,12 +142,17 @@ pub async fn run_advertise(
pub async fn run_app(
adapter: &bluer::Adapter,
sightings: Arc<Mutex<Vec<Sighting>>>,
) -> bluer::Result<bluer::gatt::local::ApplicationHandle> {
) -> bluer::Result<(
ApplicationHandle,
CharacteristicControl
)> {
let (char_control, char_handle) = characteristic_control();
let app = Application {
services: vec![Service {
uuid: SERVICE_UUID,
primary: true,
characteristics: vec![
stream_characteristic(char_handle, sightings.clone()),
last_sighting_characteristic(sightings.clone()),
sighting_count_characteristic(sightings.clone()),
last_species_characteristic(sightings.clone()),
Expand All @@ -130,7 +162,62 @@ pub async fn run_app(
..Default::default()
};
let app_handle = adapter.serve_gatt_application(app).await?;
Ok(app_handle)
Ok((app_handle, char_control))
}

pub async fn listen(char_control: CharacteristicControl, sightings: Arc<Mutex<Vec<Sighting>>>) -> bluer::Result<()> {
let mut read_buf = Vec::new();
let mut reader_opt: Option<CharacteristicReader> = None;
let mut writer_opt: Option<CharacteristicWriter> = None;
pin_mut!(char_control);

loop {
tokio::select! {
evt = char_control.next() => {
match evt {
Some(CharacteristicControlEvent::Write(req)) => {
println!("Accepting write request event with MTU {}", req.mtu());
read_buf = vec![0; req.mtu()];
reader_opt = Some(req.accept()?);
},
Some(CharacteristicControlEvent::Notify(notifier)) => {
println!("Accepting notify request event with MTU {}", notifier.mtu());
writer_opt = Some(notifier);
},
None => break,
}
},
read_res = async {
match &mut reader_opt {
Some(reader) if writer_opt.is_some() => reader.read(&mut read_buf).await,
_ => future::pending().await,
}
} => {
match read_res {
Ok(0) => {
println!("Read stream ended");
reader_opt = None;
}
Ok(n) => {
let value = read_buf[..n].to_vec();
println!("Echoing {} bytes: {:x?} ... {:x?}", value.len(), &value[0..4.min(value.len())], &value[value.len().saturating_sub(4) ..]);
if value.len() < 512 {
println!();
}
if let Err(err) = writer_opt.as_mut().unwrap().write_all(&value).await {
println!("Write failed: {}", &err);
writer_opt = None;
}
}
Err(err) => {
println!("Read stream error: {}", &err);
reader_opt = None;
}
}
}
}
}
Ok(())
}

pub async fn run(sightings: Arc<Mutex<Vec<Sighting>>>) -> bluer::Result<()> {
Expand All @@ -139,7 +226,11 @@ pub async fn run(sightings: Arc<Mutex<Vec<Sighting>>>) -> bluer::Result<()> {
let adapter = session.default_adapter().await?;

let adv_handle = run_advertise(&adapter).await.unwrap();
let app_handle = run_app(&adapter, sightings.clone()).await.unwrap();
let (app_handle, char_control) =
run_app(&adapter, sightings.clone()).await.unwrap();


let listen_handle = tokio::spawn(listen(char_control, sightings.clone()));

log::info!("Service ready. Press enter to quit.");
let stdin = BufReader::new(tokio::io::stdin());
Expand All @@ -149,6 +240,8 @@ pub async fn run(sightings: Arc<Mutex<Vec<Sighting>>>) -> bluer::Result<()> {
log::info!("Removing service and advertisement");
drop(adv_handle);
drop(app_handle);
//drop(char_control);
drop(listen_handle);
sleep(Duration::from_secs(1)).await;
Ok(())
}
4 changes: 2 additions & 2 deletions src/bluetooth/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@ use std::sync::{Arc, Mutex};

use crate::Sighting;

mod handle_message;
pub mod handle_message;
pub use handle_message::handle_message;

mod message;
pub mod message;
pub use message::Message;

pub mod gatt_srv;
Expand Down

0 comments on commit e06a581

Please sign in to comment.