From 5c1f3ed5b9a4ed81b669cfeab250116a06abf8fa Mon Sep 17 00:00:00 2001 From: Terkwood Date: Sat, 20 Jul 2019 17:26:11 -0400 Subject: [PATCH 1/7] hack out the measurements entirely --- services/sensor_tracker/Cargo.lock | 2 +- services/sensor_tracker/Cargo.toml | 2 +- services/sensor_tracker/README.md | 11 +++--- services/sensor_tracker/src/logic.rs | 5 ++- services/sensor_tracker/src/model.rs | 41 ++------------------- services/sensor_tracker/src/predis.rs | 52 ++++++++------------------- 6 files changed, 26 insertions(+), 87 deletions(-) diff --git a/services/sensor_tracker/Cargo.lock b/services/sensor_tracker/Cargo.lock index 2b6df651..5acc6a59 100644 --- a/services/sensor_tracker/Cargo.lock +++ b/services/sensor_tracker/Cargo.lock @@ -514,7 +514,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" [[package]] name = "sensor_tracker" -version = "0.3.0" +version = "0.4.0" dependencies = [ "dotenv 0.13.0 (registry+https://github.com/rust-lang/crates.io-index)", "envy 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)", diff --git a/services/sensor_tracker/Cargo.toml b/services/sensor_tracker/Cargo.toml index 320a2035..74cf0f85 100644 --- a/services/sensor_tracker/Cargo.toml +++ b/services/sensor_tracker/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "sensor_tracker" -version = "0.3.0" +version = "0.4.0" authors = ["Terkwood "] edition = "2018" diff --git a/services/sensor_tracker/README.md b/services/sensor_tracker/README.md index 6930ce25..c22518a5 100644 --- a/services/sensor_tracker/README.md +++ b/services/sensor_tracker/README.md @@ -12,25 +12,24 @@ Additionally, it creates an entry in the Redis `/sensors/ This is useful for sensors generating temperature and/or pH data. -Such data comes into an MQTT topic looking like this: +Such data might come into an MQTT topic looking like this: ``` { "device_id": , "temp_f": 81.71, "temp_c": 23.45, "ph": 7.77, "ph_mv": 453.05 } ``` -If the sensor isn't, it will create the following type of stub record -for the temp sensor based on a UUID V5 ID conversion: +If the device hasn't ever been tracked, it will create the following type of stub record with an internal device ID. The internal device ID is a (namespaced) UUID V5: ``` -HMSET /sensors// create_time +HMSET /devices/ create_time ``` The operator is encouraged to later amend the hash to include -a helpful reference to the tank which the sensor serves, so +a helpful reference to the area which the sensing device serves, so that the LED status utility can properly format messages. ``` -HSET /sensors// tank 0 +HSET /devices/ area 0 ``` ### Sample redis records diff --git a/services/sensor_tracker/src/logic.rs b/services/sensor_tracker/src/logic.rs index 3dceb591..4ec56bc9 100644 --- a/services/sensor_tracker/src/logic.rs +++ b/services/sensor_tracker/src/logic.rs @@ -15,15 +15,14 @@ pub fn receive_updates( if let Some(sensor_message) = prawnqtt::deser_message(paho) { let ext_device_id: &str = &sensor_message.device_id; - sensor_message.measurements().iter().for_each(|measure| { - if let Ok(delta_events) = predis::update(redis_ctx, &measure, ext_device_id) + if let Ok(delta_events) = predis::update(redis_ctx, &sensor_message, ext_device_id) { // emit all changed keys & hash field names to redis // on the appropriate redis pub/sub topic. // these will be processed later by the gcloud_push utility predis::publish_updates(redis_ctx, delta_event_topic, delta_events) } - }); + ; } } Err(_) if !mqtt_cli.is_connected() => { diff --git a/services/sensor_tracker/src/model.rs b/services/sensor_tracker/src/model.rs index 3628ce5d..1bc50363 100644 --- a/services/sensor_tracker/src/model.rs +++ b/services/sensor_tracker/src/model.rs @@ -1,5 +1,8 @@ /// This message is emitted to an MQTT channel by /// some device with access to a temp sensor (DS18B20, etc) +/// `external_device_id` is usually reported as a +/// e.g. "28654597090000e4" + #[derive(Serialize, Deserialize, Debug)] pub struct SensorMessage { pub device_id: String, @@ -13,45 +16,7 @@ pub struct SensorMessage { pub heat_index_f: Option, } -/// `external_device_id` is usually reported as a -/// e.g. "28654597090000e4" -impl SensorMessage { - pub fn measurements(&self) -> Vec { - let mut v: Vec = vec![]; - if let ( - Some(humidity), - Some(status), - Some(temp_f), - Some(temp_c), - Some(heat_index_f), - Some(heat_index_c), - ) = ( - self.humidity, - &self.status, - self.temp_f, - self.temp_c, - self.heat_index_f, - self.heat_index_c, - ) { - v.push(Measurement::DHT { - status: status.to_owned(), - humidity, - temp_f, - temp_c, - heat_index_f, - heat_index_c, - }) - } else if let (Some(temp_f), Some(temp_c)) = (self.temp_f, self.temp_c) { - v.push(Measurement::Temp { temp_f, temp_c }) - } - if let (Some(ph), Some(ph_mv)) = (self.ph, self.ph_mv) { - v.push(Measurement::PH { ph, ph_mv }) - } - - v - } -} #[derive(Debug)] pub enum Measurement { diff --git a/services/sensor_tracker/src/predis.rs b/services/sensor_tracker/src/predis.rs index 3d60ad78..841b6259 100644 --- a/services/sensor_tracker/src/predis.rs +++ b/services/sensor_tracker/src/predis.rs @@ -13,20 +13,20 @@ use uuid::Uuid; /// Will create a new sensor record for this device if one does not already exist. pub fn update<'a, 'b>( redis_ctx: &RedisContext, - measure: &model::Measurement, + sensor_message: &model::SensorMessage, ext_device_id: &str, ) -> Result, redis::RedisError> { let mut delta_events: Vec = vec![]; - println!("Received redis {} update: {:?}", measure.name(), measure); + println!("Received redis update: {:?}", sensor_message); - let ext_device_namespace = &redis_ctx.get_external_device_namespace(measure.name())?; + let ext_device_namespace = &redis_ctx.get_external_device_namespace()?; let device_id = internal_device_id(ext_device_id, ext_device_namespace).unwrap(); println!("\tDevice ID (internal): {}", device_id); let rn = &redis_ctx.namespace; - let sensor_set_event = update_sensor_set(redis_ctx, rn, measure, device_id); + let sensor_set_event = update_sensor_set(redis_ctx, rn, sensor_message, device_id); if let Some(e) = sensor_set_event { delta_events.push(e) } @@ -42,12 +42,8 @@ pub fn update<'a, 'b>( if let Ok(v) = tank_and_area_and_update_count { // Tank associated with this sensor? let revent = match (v.get(0).unwrap_or(&None), v.get(1).unwrap_or(&None)) { - (Some(tank_num), _) => { - update_container_hash(redis_ctx, Container::Tanks, tank_num, &measure) - } - (_, Some(area_num)) => { - update_container_hash(redis_ctx, Container::Areas, area_num, &measure) - } + (Some(tank_num), _) => update_area_hash(redis_ctx, tank_num, &measure), + (_, Some(area_num)) => update_area_hash(redis_ctx, area_num, &measure), (None, None) => ensure_sensor_hash_exists(redis_ctx, sensor_hash_key, ext_device_id), }; @@ -93,39 +89,19 @@ fn update_sensor_set( } } -enum Container { - Tanks, - Areas, -} - -impl Container { - pub fn to_string(self) -> String { - match self { - Container::Tanks => "tanks".to_string(), - Container::Areas => "areas".to_string(), - } - } -} - -fn update_container_hash( +fn update_area_hash( redis_ctx: &RedisContext, - container: Container, container_num: &u64, measure: &model::Measurement, ) -> Option { // We found the area associated with this // sensor ID, so we should update that area's // current reading. - let container_key = format!( - "{}/{}/{}", - redis_ctx.namespace, - container.to_string(), - container_num - ); + let area_key = format!("{}/areas/{}", redis_ctx.namespace, container_num); - let container_measure_count: Result, _> = redis_ctx + let area_measure_count: Result, _> = redis_ctx .conn - .hget(&container_key, &format!("{}_update_count", measure.name())); + .hget(&area_key, &format!("{}_update_count", measure.name())); let uc_name = format!("{}_update_count", measure.name()); let ut_name = format!("{}_update_time", measure.name()); @@ -134,7 +110,7 @@ fn update_container_hash( data.push(( &uc_name, - container_measure_count + area_measure_count .unwrap_or(None) .map(|u| u + 1) .unwrap_or(1) @@ -143,20 +119,20 @@ fn update_container_hash( data.push((&ut_name, epoch_secs().to_string())); ( - redis_ctx.conn.hset_multiple(&container_key, &data[..]), + redis_ctx.conn.hset_multiple(&area_key, &data[..]), data.iter().map(|(a, _)| *a).collect(), ) }; match update { (Err(e), _) => { - println!("update fails for {}: {:?}", container_key, e); + println!("update fails for {}: {:?}", area_key, e); None } (Ok(_), fields) if fields.len() > 0 => { let fs = fields.iter().map(|s| s.to_string()).collect(); Some(REvent::HashUpdated { - key: container_key.to_string(), + key: area_key.to_string(), fields: fs, }) } From 44cec69876aa19b4b06cfed0cb206e8afde22fee Mon Sep 17 00:00:00 2001 From: Terkwood Date: Sat, 20 Jul 2019 17:33:08 -0400 Subject: [PATCH 2/7] hack it up --- services/redis_context/Cargo.lock | 4 +++- services/redis_context/Cargo.toml | 4 ++-- services/redis_context/src/lib.rs | 13 ++++--------- services/sensor_tracker/Cargo.lock | 6 ++---- services/sensor_tracker/Cargo.toml | 2 +- services/sensor_tracker/src/predis.rs | 28 +++++++++++++-------------- 6 files changed, 26 insertions(+), 31 deletions(-) diff --git a/services/redis_context/Cargo.lock b/services/redis_context/Cargo.lock index 5f8415c7..2b8eb013 100644 --- a/services/redis_context/Cargo.lock +++ b/services/redis_context/Cargo.lock @@ -1,3 +1,5 @@ +# This file is automatically @generated by Cargo. +# It is not intended for manual editing. [[package]] name = "ascii" version = "0.7.1" @@ -278,7 +280,7 @@ dependencies = [ [[package]] name = "redis_context" -version = "0.1.0" +version = "0.2.0" dependencies = [ "redis 0.9.1 (registry+https://github.com/rust-lang/crates.io-index)", "uuid 0.7.1 (registry+https://github.com/rust-lang/crates.io-index)", diff --git a/services/redis_context/Cargo.toml b/services/redis_context/Cargo.toml index 0a8cec4f..44849648 100644 --- a/services/redis_context/Cargo.toml +++ b/services/redis_context/Cargo.toml @@ -1,9 +1,9 @@ [package] name = "redis_context" -version = "0.1.0" +version = "0.2.0" authors = ["Terkwood "] edition = "2018" [dependencies] redis = "0.9" -uuid = { version = "0.7", features = ["v4", "v5"] } # v4 is random, v5 is name-based +uuid = { version = "0.7", features = ["v4", "v5"] } diff --git a/services/redis_context/src/lib.rs b/services/redis_context/src/lib.rs index 238a3beb..49fac269 100644 --- a/services/redis_context/src/lib.rs +++ b/services/redis_context/src/lib.rs @@ -24,12 +24,9 @@ impl RedisContext { } /// This is the "name" field that will be used to form a V5 UUID - pub fn get_external_device_namespace( - &self, - device_type: String, - ) -> Result { + pub fn get_external_device_namespace(&self) -> Result { let key = format!("{}/external_device_namespace", self.namespace); - let r: Option = self.conn.hget(&key, device_type)?; + let r: Option = self.conn.get(&key)?; match r { None => { @@ -44,20 +41,18 @@ impl RedisContext { } } - pub enum ExternalDevice { Temp, PH, - Unknown + Unknown, } - impl From for ExternalDevice { fn from(device_type: String) -> Self { match device_type.to_lowercase().trim() { "temp" => ExternalDevice::Temp, "ph" => ExternalDevice::PH, - _ => ExternalDevice::Unknown + _ => ExternalDevice::Unknown, } } } diff --git a/services/sensor_tracker/Cargo.lock b/services/sensor_tracker/Cargo.lock index 5acc6a59..099b1317 100644 --- a/services/sensor_tracker/Cargo.lock +++ b/services/sensor_tracker/Cargo.lock @@ -422,8 +422,7 @@ dependencies = [ [[package]] name = "redis_context" -version = "0.1.0" -source = "git+https://github.com/Terkwood/prawnalith/?branch=unstable#7087164ca6a90be0389e466a216cdc4cc55ae781" +version = "0.2.0" dependencies = [ "redis 0.9.1 (registry+https://github.com/rust-lang/crates.io-index)", "uuid 0.7.1 (registry+https://github.com/rust-lang/crates.io-index)", @@ -520,7 +519,7 @@ dependencies = [ "envy 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)", "paho-mqtt 0.4.0 (git+https://github.com/Terkwood/paho.mqtt.rust.git?rev=1b72f84)", "redis 0.9.1 (registry+https://github.com/rust-lang/crates.io-index)", - "redis_context 0.1.0 (git+https://github.com/Terkwood/prawnalith/?branch=unstable)", + "redis_context 0.2.0", "redis_delta 0.1.2", "serde 1.0.80 (registry+https://github.com/rust-lang/crates.io-index)", "serde_derive 1.0.80 (registry+https://github.com/rust-lang/crates.io-index)", @@ -841,7 +840,6 @@ dependencies = [ "checksum rand_core 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "1961a422c4d189dfb50ffa9320bf1f2a9bd54ecb92792fb9477f99a1045f3372" "checksum rand_core 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "0905b6b7079ec73b314d4c748701f6931eb79fd97c668caa3f1899b22b32c6db" "checksum redis 0.9.1 (registry+https://github.com/rust-lang/crates.io-index)" = "f0c747d743d48233f9bc3ed3fb00cb84c1d98d8c7f54ed2d4cca9adf461a7ef3" -"checksum redis_context 0.1.0 (git+https://github.com/Terkwood/prawnalith/?branch=unstable)" = "" "checksum regex 0.1.80 (registry+https://github.com/rust-lang/crates.io-index)" = "4fd4ace6a8cf7860714a2c2280d6c1f7e6a413486c13298bbc86fd3da019402f" "checksum regex 1.0.5 (registry+https://github.com/rust-lang/crates.io-index)" = "2069749032ea3ec200ca51e4a31df41759190a88edca0d2d86ee8bedf7073341" "checksum regex-syntax 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)" = "f9ec002c35e86791825ed294b50008eea9ddfc8def4420124fbc6b08db834957" diff --git a/services/sensor_tracker/Cargo.toml b/services/sensor_tracker/Cargo.toml index 74cf0f85..0438322c 100644 --- a/services/sensor_tracker/Cargo.toml +++ b/services/sensor_tracker/Cargo.toml @@ -9,7 +9,7 @@ dotenv = "0.13" envy = "0.3" paho-mqtt = { git = "https://github.com/Terkwood/paho.mqtt.rust.git", rev = "1b72f84" } redis = "0.9" -redis_context = { git = "https://github.com/Terkwood/prawnalith/", branch = "unstable" } +redis_context = { path = "../redis_context" } redis_delta = { path = "../redis_delta" } serde = "1.0" serde_derive = "1.0" diff --git a/services/sensor_tracker/src/predis.rs b/services/sensor_tracker/src/predis.rs index 841b6259..9fa351a7 100644 --- a/services/sensor_tracker/src/predis.rs +++ b/services/sensor_tracker/src/predis.rs @@ -32,7 +32,7 @@ pub fn update<'a, 'b>( } // lookup associated tank - let sensor_hash_key = &format!("{}/sensors/{}/{}", rn, measure.name(), device_id).to_string(); + let sensor_hash_key = &format!("{}/devices/{}", rn, device_id).to_string(); let tank_and_area_and_update_count: Result>, _> = redis_ctx.conn.hget( sensor_hash_key, @@ -54,10 +54,10 @@ pub fn update<'a, 'b>( // record a hit on the updates that the sensor has seen // and also record the most recent measurement on the record // for this individual sensor - let sensor_updated = update_sensor_hash( + let sensor_updated = update_device_hash( redis_ctx, sensor_hash_key, - measure, + sensor_message, v.get(2).unwrap_or(&None), ); @@ -72,18 +72,18 @@ pub fn update<'a, 'b>( fn update_sensor_set( redis_ctx: &RedisContext, rn: &str, - measure: &model::Measurement, + sensor_message: &model::SensorMessage, device_id: Uuid, ) -> Option { - let set_sensor_type_key = format!("{}/sensors/{}", rn, measure.name()); + let set_device_key = format!("{}/devices", rn); // add to the member set if it doesn't already exist - let sensors_added: Result = redis_ctx + let devices_added: Result = redis_ctx .conn - .sadd(&set_sensor_type_key, &format!("{}", device_id)); + .sadd(&set_device_key, &format!("{}", device_id)); - match sensors_added { + match devices_added { Ok(n) if n > 0 => Some(REvent::SetUpdated { - key: set_sensor_type_key, + key: set_device_key, }), _ => None, } @@ -179,10 +179,10 @@ fn ensure_sensor_hash_exists( result } -fn update_sensor_hash( +fn update_device_hash( redis_ctx: &RedisContext, - sensor_hash_key: &str, - measure: &model::Measurement, + device_hash_key: &str, + measure: &model::SensorMessage, maybe_sensor_upd_count: &Option, ) -> Option { let upd_c = &format!("{}_update_count", measure.name()); @@ -197,9 +197,9 @@ fn update_sensor_hash( let ut = &format!("{}_update_time", measure.name()); data.push((ut, epoch_secs().to_string())); - let redis_result: Result<(), _> = redis_ctx.conn.hset_multiple(sensor_hash_key, &data[..]); + let redis_result: Result<(), _> = redis_ctx.conn.hset_multiple(device_hash_key, &data[..]); if let Err(e) = redis_result { - println!("couldn't update sensor record {}: {:?}", sensor_hash_key, e); + println!("couldn't update device record {}: {:?}", device_hash_key, e); None } else { let mut fields: Vec = vec![]; From 0df3c52d72ce7101d18acf7310b97caa711aeb0f Mon Sep 17 00:00:00 2001 From: Terkwood Date: Sat, 20 Jul 2019 17:37:15 -0400 Subject: [PATCH 3/7] hack it up --- services/redis_context/src/lib.rs | 16 ----------- services/sensor_tracker/src/predis.rs | 41 +++++++++++++-------------- 2 files changed, 20 insertions(+), 37 deletions(-) diff --git a/services/redis_context/src/lib.rs b/services/redis_context/src/lib.rs index 49fac269..17d6fb18 100644 --- a/services/redis_context/src/lib.rs +++ b/services/redis_context/src/lib.rs @@ -40,19 +40,3 @@ impl RedisContext { } } } - -pub enum ExternalDevice { - Temp, - PH, - Unknown, -} - -impl From for ExternalDevice { - fn from(device_type: String) -> Self { - match device_type.to_lowercase().trim() { - "temp" => ExternalDevice::Temp, - "ph" => ExternalDevice::PH, - _ => ExternalDevice::Unknown, - } - } -} diff --git a/services/sensor_tracker/src/predis.rs b/services/sensor_tracker/src/predis.rs index 9fa351a7..94c76dac 100644 --- a/services/sensor_tracker/src/predis.rs +++ b/services/sensor_tracker/src/predis.rs @@ -31,20 +31,19 @@ pub fn update<'a, 'b>( delta_events.push(e) } - // lookup associated tank + // lookup associated area let sensor_hash_key = &format!("{}/devices/{}", rn, device_id).to_string(); - let tank_and_area_and_update_count: Result>, _> = redis_ctx.conn.hget( + let area_and_sensors_update_count: Result>, _> = redis_ctx.conn.hget( sensor_hash_key, - vec!["tank", "area", &format!("{}_update_count", measure.name())], + vec!["area", "sensors_update_count"], ); - if let Ok(v) = tank_and_area_and_update_count { + if let Ok(v) = area_and_sensors_update_count { // Tank associated with this sensor? - let revent = match (v.get(0).unwrap_or(&None), v.get(1).unwrap_or(&None)) { - (Some(tank_num), _) => update_area_hash(redis_ctx, tank_num, &measure), - (_, Some(area_num)) => update_area_hash(redis_ctx, area_num, &measure), - (None, None) => ensure_sensor_hash_exists(redis_ctx, sensor_hash_key, ext_device_id), + let revent = match v.get(0).unwrap_or(&None) { + Some(area_num) => update_area_hash(redis_ctx, area_num, &sensor_message), + None => ensure_device_hash_exists(redis_ctx, sensor_hash_key, ext_device_id), }; if let Some(ev) = revent { @@ -92,7 +91,7 @@ fn update_sensor_set( fn update_area_hash( redis_ctx: &RedisContext, container_num: &u64, - measure: &model::Measurement, + sensor_message: &model::SensorMessage, ) -> Option { // We found the area associated with this // sensor ID, so we should update that area's @@ -101,12 +100,12 @@ fn update_area_hash( let area_measure_count: Result, _> = redis_ctx .conn - .hget(&area_key, &format!("{}_update_count", measure.name())); + .hget(&area_key, &format!("sensors_update_count")); - let uc_name = format!("{}_update_count", measure.name()); - let ut_name = format!("{}_update_time", measure.name()); + let uc_name = format!("sensors_update_count"); + let ut_name = format!("sensors_update_time"); let update: (Result, Vec<&str>) = { - let mut data: Vec<(&str, String)> = measure.to_redis(); + let mut data: Vec<(&str, String)> = sensor_message.to_redis(); data.push(( &uc_name, @@ -140,9 +139,9 @@ fn update_area_hash( } } -fn ensure_sensor_hash_exists( +fn ensure_device_hash_exists( redis_ctx: &RedisContext, - sensor_hash_key: &str, + device_hash_key: &str, ext_device_id_str: &str, ) -> Option { // We know that there's no associated "tank" @@ -154,7 +153,7 @@ fn ensure_sensor_hash_exists( redis_ctx .conn - .exists(sensor_hash_key) + .exists(device_hash_key) .iter() .for_each(|e: &bool| { if !e { @@ -166,7 +165,7 @@ fn ensure_sensor_hash_exists( ][..]; // new sensor, make note of when it is created let _: Result, _> = - redis_ctx.conn.hset_multiple(sensor_hash_key, field_vals); + redis_ctx.conn.hset_multiple(device_hash_key, field_vals); let fields = vec![cf, ed]; result = Some(REvent::HashUpdated { @@ -182,10 +181,10 @@ fn ensure_sensor_hash_exists( fn update_device_hash( redis_ctx: &RedisContext, device_hash_key: &str, - measure: &model::SensorMessage, + sensor_message: &model::SensorMessage, maybe_sensor_upd_count: &Option, ) -> Option { - let upd_c = &format!("{}_update_count", measure.name()); + let upd_c = &format!("sensors_update_count"); let mut data: Vec<(&str, String)> = vec![( upd_c, maybe_sensor_upd_count @@ -193,8 +192,8 @@ fn update_device_hash( .unwrap_or(1) .to_string(), )]; - data.extend(measure.to_redis()); - let ut = &format!("{}_update_time", measure.name()); + data.extend(sensor_message.to_redis()); + let ut = &format!("sensors_update_time"); data.push((ut, epoch_secs().to_string())); let redis_result: Result<(), _> = redis_ctx.conn.hset_multiple(device_hash_key, &data[..]); From 1e8fa6a97e33ffc71239462fa27eb3c63d395718 Mon Sep 17 00:00:00 2001 From: Terkwood Date: Sat, 20 Jul 2019 17:44:16 -0400 Subject: [PATCH 4/7] drop the hammer --- services/sensor_tracker/src/logic.rs | 16 +++++++------- services/sensor_tracker/src/model.rs | 31 ++++++++++++++++++++++++++- services/sensor_tracker/src/predis.rs | 13 ++++++----- 3 files changed, 44 insertions(+), 16 deletions(-) diff --git a/services/sensor_tracker/src/logic.rs b/services/sensor_tracker/src/logic.rs index 4ec56bc9..09e9f756 100644 --- a/services/sensor_tracker/src/logic.rs +++ b/services/sensor_tracker/src/logic.rs @@ -15,14 +15,14 @@ pub fn receive_updates( if let Some(sensor_message) = prawnqtt::deser_message(paho) { let ext_device_id: &str = &sensor_message.device_id; - if let Ok(delta_events) = predis::update(redis_ctx, &sensor_message, ext_device_id) - { - // emit all changed keys & hash field names to redis - // on the appropriate redis pub/sub topic. - // these will be processed later by the gcloud_push utility - predis::publish_updates(redis_ctx, delta_event_topic, delta_events) - } - ; + if let Ok(delta_events) = + predis::update(redis_ctx, &sensor_message, ext_device_id) + { + // emit all changed keys & hash field names to redis + // on the appropriate redis pub/sub topic. + // these will be processed later by the gcloud_push utility + predis::publish_updates(redis_ctx, delta_event_topic, delta_events) + }; } } Err(_) if !mqtt_cli.is_connected() => { diff --git a/services/sensor_tracker/src/model.rs b/services/sensor_tracker/src/model.rs index 1bc50363..e24cf37c 100644 --- a/services/sensor_tracker/src/model.rs +++ b/services/sensor_tracker/src/model.rs @@ -16,7 +16,36 @@ pub struct SensorMessage { pub heat_index_f: Option, } - +impl SensorMessage { + pub fn to_redis(&self) -> Vec<(&str, String)> { + let mut data = vec![]; + if let Some(s) = &self.status { + data.push(("status", s.to_string())); + } + if let Some(humidity) = self.humidity { + data.push(("humidity", humidity.to_string())); + } + if let Some(tf) = self.temp_f { + data.push(("temp_f", tf.to_string())); + } + if let Some(tc) = self.temp_c { + data.push(("temp_c", tc.to_string())); + } + if let Some(hf) = self.heat_index_f { + data.push(("heat_index_f", hf.to_string())); + } + if let Some(hc) = self.heat_index_c { + data.push(("heat_index_c", hc.to_string())); + } + if let Some(ph) = self.ph { + data.push(("ph", ph.to_string())) + } + if let Some(ph_mv) = self.ph_mv { + data.push(("ph_mv", ph_mv.to_string())) + } + data + } +} #[derive(Debug)] pub enum Measurement { diff --git a/services/sensor_tracker/src/predis.rs b/services/sensor_tracker/src/predis.rs index 94c76dac..ddf65e49 100644 --- a/services/sensor_tracker/src/predis.rs +++ b/services/sensor_tracker/src/predis.rs @@ -34,16 +34,15 @@ pub fn update<'a, 'b>( // lookup associated area let sensor_hash_key = &format!("{}/devices/{}", rn, device_id).to_string(); - let area_and_sensors_update_count: Result>, _> = redis_ctx.conn.hget( - sensor_hash_key, - vec!["area", "sensors_update_count"], - ); + let area_and_sensors_update_count: Result>, _> = redis_ctx + .conn + .hget(sensor_hash_key, vec!["area", "sensors_update_count"]); if let Ok(v) = area_and_sensors_update_count { // Tank associated with this sensor? let revent = match v.get(0).unwrap_or(&None) { Some(area_num) => update_area_hash(redis_ctx, area_num, &sensor_message), - None => ensure_device_hash_exists(redis_ctx, sensor_hash_key, ext_device_id), + None => ensure_device_hash_exists(redis_ctx, sensor_hash_key, ext_device_id), }; if let Some(ev) = revent { @@ -169,7 +168,7 @@ fn ensure_device_hash_exists( let fields = vec![cf, ed]; result = Some(REvent::HashUpdated { - key: sensor_hash_key.to_string(), + key: device_hash_key.to_string(), fields, }) } @@ -205,7 +204,7 @@ fn update_device_hash( data.iter().for_each(|(f, _)| fields.push(f.to_string())); Some(REvent::HashUpdated { - key: sensor_hash_key.to_string(), + key: device_hash_key.to_string(), fields, }) } From c8dfee2b2bc4f6c4b1750667308dbe92b0669cce Mon Sep 17 00:00:00 2001 From: Terkwood Date: Mon, 22 Jul 2019 17:07:50 -0400 Subject: [PATCH 5/7] trim --- services/sensor_tracker/src/model.rs | 68 --------------------------- services/sensor_tracker/src/predis.rs | 2 +- 2 files changed, 1 insertion(+), 69 deletions(-) diff --git a/services/sensor_tracker/src/model.rs b/services/sensor_tracker/src/model.rs index e24cf37c..f6bcb128 100644 --- a/services/sensor_tracker/src/model.rs +++ b/services/sensor_tracker/src/model.rs @@ -46,71 +46,3 @@ impl SensorMessage { data } } - -#[derive(Debug)] -pub enum Measurement { - Temp { - temp_f: f64, - temp_c: f64, - }, - PH { - ph: f64, - ph_mv: f64, - }, - /// Digital humidity and temp, e.g. DHT11 sensor - DHT { - status: String, - humidity: f64, - temp_f: f64, - temp_c: f64, - heat_index_f: f64, - heat_index_c: f64, - }, -} - -impl Measurement { - pub fn name(&self) -> String { - match self { - Measurement::Temp { - temp_f: _, - temp_c: _, - } => "temp".to_string(), - Measurement::PH { ph: _, ph_mv: _ } => "ph".to_string(), - Measurement::DHT { - status: _, - humidity: _, - temp_f: _, - temp_c: _, - heat_index_f: _, - heat_index_c: _, - } => "dht".to_string(), - } - } - - pub fn to_redis(&self) -> Vec<(&str, String)> { - match self { - Measurement::Temp { temp_f, temp_c } => vec![ - ("temp_f", temp_f.to_string()), - ("temp_c", temp_c.to_string()), - ], - Measurement::PH { ph, ph_mv } => { - vec![("ph", ph.to_string()), ("ph_mv", ph_mv.to_string())] - } - Measurement::DHT { - status, - humidity, - temp_f, - temp_c, - heat_index_f, - heat_index_c, - } => vec![ - ("status", status.to_string()), - ("humidity", humidity.to_string()), - ("temp_f", temp_f.to_string()), - ("temp_c", temp_c.to_string()), - ("heat_index_f", heat_index_f.to_string()), - ("heat_index_c", heat_index_c.to_string()), - ], - } - } -} diff --git a/services/sensor_tracker/src/predis.rs b/services/sensor_tracker/src/predis.rs index ddf65e49..55e39c5b 100644 --- a/services/sensor_tracker/src/predis.rs +++ b/services/sensor_tracker/src/predis.rs @@ -70,7 +70,7 @@ pub fn update<'a, 'b>( fn update_sensor_set( redis_ctx: &RedisContext, rn: &str, - sensor_message: &model::SensorMessage, + sensor_message: &model::SensorMessage, // TODO wat ? device_id: Uuid, ) -> Option { let set_device_key = format!("{}/devices", rn); From 2a3a3788c2d46d00985084cdf65c66c3ce0bf00b Mon Sep 17 00:00:00 2001 From: Terkwood Date: Thu, 25 Jul 2019 12:44:14 -0400 Subject: [PATCH 6/7] hack version --- services/sensor_tracker/Cargo.lock | 11 ++++++----- services/sensor_tracker/Cargo.toml | 3 ++- services/sensor_tracker/src/predis.rs | 4 ++-- 3 files changed, 10 insertions(+), 8 deletions(-) diff --git a/services/sensor_tracker/Cargo.lock b/services/sensor_tracker/Cargo.lock index 1442ddb8..1b762e12 100644 --- a/services/sensor_tracker/Cargo.lock +++ b/services/sensor_tracker/Cargo.lock @@ -679,8 +679,8 @@ dependencies = [ [[package]] name = "redis_context" -version = "0.1.0" -source = "git+https://github.com/Terkwood/prawnalith/?branch=unstable#f92a34261f726b904498ff8b34ee5f1a0e810c09" +version = "0.2.0" +source = "git+https://github.com/Terkwood/prawnalith/?branch=feature/simplified-sensor-tracker-data#c8dfee2b2bc4f6c4b1750667308dbe92b0669cce" dependencies = [ "redis 0.9.1 (registry+https://github.com/rust-lang/crates.io-index)", "uuid 0.7.4 (registry+https://github.com/rust-lang/crates.io-index)", @@ -689,8 +689,9 @@ dependencies = [ [[package]] name = "redis_delta" version = "0.1.2" -source = "git+https://github.com/Terkwood/prawnalith/?branch=unstable#f92a34261f726b904498ff8b34ee5f1a0e810c09" +source = "git+https://github.com/Terkwood/prawnalith/?branch=unstable#1a6bf54663c794eb540172b16c2c5a01b674d2a4" dependencies = [ + "rand_core 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)", "serde 1.0.97 (registry+https://github.com/rust-lang/crates.io-index)", "serde_derive 1.0.97 (registry+https://github.com/rust-lang/crates.io-index)", "serde_json 1.0.40 (registry+https://github.com/rust-lang/crates.io-index)", @@ -838,7 +839,7 @@ dependencies = [ "envy 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)", "rand_core 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)", "redis 0.9.1 (registry+https://github.com/rust-lang/crates.io-index)", - "redis_context 0.1.0 (git+https://github.com/Terkwood/prawnalith/?branch=unstable)", + "redis_context 0.2.0 (git+https://github.com/Terkwood/prawnalith/?branch=feature/simplified-sensor-tracker-data)", "redis_delta 0.1.2 (git+https://github.com/Terkwood/prawnalith/?branch=unstable)", "rumqtt 0.30.1 (registry+https://github.com/rust-lang/crates.io-index)", "serde 1.0.97 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1332,7 +1333,7 @@ dependencies = [ "checksum rand_xorshift 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "cbf7e9e623549b0e21f6e97cf8ecf247c1a8fd2e8a992ae265314300b2455d5c" "checksum rdrand 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "678054eb77286b51581ba43620cc911abf02758c91f93f479767aed0f90458b2" "checksum redis 0.9.1 (registry+https://github.com/rust-lang/crates.io-index)" = "f0c747d743d48233f9bc3ed3fb00cb84c1d98d8c7f54ed2d4cca9adf461a7ef3" -"checksum redis_context 0.1.0 (git+https://github.com/Terkwood/prawnalith/?branch=unstable)" = "" +"checksum redis_context 0.2.0 (git+https://github.com/Terkwood/prawnalith/?branch=feature/simplified-sensor-tracker-data)" = "" "checksum redis_delta 0.1.2 (git+https://github.com/Terkwood/prawnalith/?branch=unstable)" = "" "checksum redox_syscall 0.1.56 (registry+https://github.com/rust-lang/crates.io-index)" = "2439c63f3f6139d1b57529d16bc3b8bb855230c8efcc5d3a896c8bea7c3b1e84" "checksum regex 1.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "6b23da8dfd98a84bd7e08700190a5d9f7d2d38abd4369dd1dae651bc40bfd2cc" diff --git a/services/sensor_tracker/Cargo.toml b/services/sensor_tracker/Cargo.toml index 20fa1a1a..9fc363c1 100644 --- a/services/sensor_tracker/Cargo.toml +++ b/services/sensor_tracker/Cargo.toml @@ -13,7 +13,8 @@ envy = "*" rand_core="0.2.2" rumqtt = "*" redis = "^0.9" -redis_context = { git = "https://github.com/Terkwood/prawnalith/", branch = "unstable" } +# TODO restore branch +redis_context = { git = "https://github.com/Terkwood/prawnalith/", branch = "feature/simplified-sensor-tracker-data" } redis_delta = { git = "https://github.com/Terkwood/prawnalith/", branch = "unstable" } serde = "*" serde_derive = "*" diff --git a/services/sensor_tracker/src/predis.rs b/services/sensor_tracker/src/predis.rs index 55e39c5b..0dcf1daf 100644 --- a/services/sensor_tracker/src/predis.rs +++ b/services/sensor_tracker/src/predis.rs @@ -26,7 +26,7 @@ pub fn update<'a, 'b>( println!("\tDevice ID (internal): {}", device_id); let rn = &redis_ctx.namespace; - let sensor_set_event = update_sensor_set(redis_ctx, rn, sensor_message, device_id); + let sensor_set_event = update_devices_set(redis_ctx, rn, sensor_message, device_id); if let Some(e) = sensor_set_event { delta_events.push(e) } @@ -67,7 +67,7 @@ pub fn update<'a, 'b>( Ok(delta_events) } -fn update_sensor_set( +fn update_devices_set( redis_ctx: &RedisContext, rn: &str, sensor_message: &model::SensorMessage, // TODO wat ? From 7adfe29b0d85f34c0bf3073400aa7bd3d3e4ca02 Mon Sep 17 00:00:00 2001 From: Terkwood Date: Thu, 25 Jul 2019 12:46:43 -0400 Subject: [PATCH 7/7] =?UTF-8?q?Update=20README.md=20=F0=9F=93=9D?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- services/sensor_tracker/README.md | 32 +++++++++++-------------------- 1 file changed, 11 insertions(+), 21 deletions(-) diff --git a/services/sensor_tracker/README.md b/services/sensor_tracker/README.md index 5ae67d93..7a8ffffb 100644 --- a/services/sensor_tracker/README.md +++ b/services/sensor_tracker/README.md @@ -20,26 +20,16 @@ Such data might come into an MQTT topic looking like this: If the device hasn't ever been tracked, it will create the following type of stub record with an internal device ID. The internal device ID is a (namespaced) UUID V5: -<<<<<<< HEAD -``` -HMSET /devices/ create_time -======= ```text HMSET /sensors// create_time ->>>>>>> unstable ``` The operator is encouraged to later amend the hash to include a helpful reference to the area which the sensing device serves, so that the LED status utility can properly format messages. -<<<<<<< HEAD -``` -HSET /devices/ area 0 -======= ```text HSET /sensors// tank 0 ->>>>>>> unstable ``` ## Docker builds @@ -50,9 +40,9 @@ See `build.sh` and `run.sh` for entry points. #### temp sensor -`> hgetall namespace/sensors/temp/aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa` +`> hgetall namespace/devices/aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa` -``` +```text 1) "create_time" 2) "1540598539" 3) "ext_device_id" @@ -69,11 +59,11 @@ See `build.sh` and `run.sh` for entry points. 14) "1" ``` -**pH sensor** +#### pH sensor -`> hgetall namespace/sensors/ph/aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa` +`> hgetall namespace/devices/aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa` -``` +```text 1) "low_ph_ref" 2) "4.00" 3) "low_mv" @@ -96,19 +86,19 @@ See `build.sh` and `run.sh` for entry points. 20) "286cbc98090000bd" ``` -**tank counter** +#### area counter -`> get namespace/tanks` +`> get namespace/areas` -``` +```text "1" ``` -**tank hash** +#### area hash -`> hgetall namespace/tanks/1` +`> hgetall namespace/areas/1` -``` +```text hgetall namespace/tanks/1 1) "temp_f" 2) "81.16"