From b35e35b3291bc64508d4f2bfaa84dc595608367d Mon Sep 17 00:00:00 2001 From: "kevin.russ" Date: Fri, 4 Aug 2023 10:05:32 +0200 Subject: [PATCH] Add someip statistics api to unbound session --- application/apps/indexer/Cargo.lock | 3 + application/apps/indexer/parsers/Cargo.toml | 3 + .../apps/indexer/parsers/src/someip.rs | 383 +++++++++++++++++- .../session/src/unbound/commands/someip.rs | 71 ++-- .../apps/rustcore/rs-bindings/Cargo.lock | 3 + .../rustcore/ts-bindings/spec/defaults.json | 2 +- 6 files changed, 425 insertions(+), 40 deletions(-) diff --git a/application/apps/indexer/Cargo.lock b/application/apps/indexer/Cargo.lock index 08d84b6e20..b921e5c476 100644 --- a/application/apps/indexer/Cargo.lock +++ b/application/apps/indexer/Cargo.lock @@ -1919,11 +1919,14 @@ dependencies = [ "crossbeam-channel", "dlt-core", "env_logger", + "etherparse", "humantime", "lazy_static", "log", "memchr", + "pcap-parser", "rand 0.8.5", + "rustc-hash", "serde", "someip-messages", "someip-payload", diff --git a/application/apps/indexer/parsers/Cargo.toml b/application/apps/indexer/parsers/Cargo.toml index a166b65950..bc9a5a43e9 100644 --- a/application/apps/indexer/parsers/Cargo.toml +++ b/application/apps/indexer/parsers/Cargo.toml @@ -10,6 +10,7 @@ crossbeam-channel = "0.5" chrono = "0.4" chrono-tz = "0.8" dlt-core = "0.14" +etherparse = "0.13" humantime = "2.1" lazy_static = "1.4" log = "0.4.17" @@ -17,7 +18,9 @@ memchr = "2.4" serde = { version = "1.0", features = ["derive"] } thiserror = "1.0" tokio-util = "0.7" +pcap-parser = "0.14" rand = "0.8.5" +rustc-hash = "1.1" # someip-messages = { path = "../../../../../someip"} someip-messages = { git = "https://github.com/esrlabs/someip" } # someip-payload = { path = "../../../../../someip-payload" } diff --git a/application/apps/indexer/parsers/src/someip.rs b/application/apps/indexer/parsers/src/someip.rs index 1601017d78..b94e64cd02 100644 --- a/application/apps/indexer/parsers/src/someip.rs +++ b/application/apps/indexer/parsers/src/someip.rs @@ -1,5 +1,13 @@ use crate::{Error, LogMessage, ParseYield, Parser}; -use std::{borrow::Cow, fmt, fmt::Display, io::Write, path::PathBuf}; +use std::{ + borrow::Cow, + fmt, + fmt::Display, + fs::File, + io::Write, + path::{Path, PathBuf}, +}; +use tokio_util::sync::CancellationToken; use someip_messages::*; use someip_payload::{ @@ -11,6 +19,9 @@ use someip_payload::{ use log::{debug, error}; use serde::Serialize; +use pcap_parser::{traits::PcapReaderIterator, PcapBlockOwned, PcapError, PcapNGReader}; +use rustc_hash::FxHashMap; + /// A parser for SOME/IP log messages. pub struct SomeipParser { model: Option, @@ -337,6 +348,250 @@ impl Display for SomeipLogMessage { } } +/// Represents the statistic of a SOME/IP trace. +#[derive(Serialize, Debug, PartialEq, Clone)] +pub struct SomeipStatistic { + /** Statistic on service-ids and related method-ids */ + pub services: Vec, + /** Statistic on message-types and related return-codes */ + pub messages: Vec, +} + +#[derive(Serialize, Debug, PartialEq, Clone)] +pub struct SomeipStatisticItem { + pub item: SomeipStatisticDetail, + pub details: Vec, +} + +#[derive(Serialize, Debug, PartialEq, Clone)] +pub struct SomeipStatisticDetail { + pub id: usize, + pub num: usize, +} + +impl SomeipStatistic { + pub fn new() -> Self { + Self { + services: vec![], + messages: vec![], + } + } + + pub fn merge(&mut self, incomes: Self) { + Self::merge_items(&mut self.services, &incomes.services); + Self::merge_items(&mut self.messages, &incomes.messages); + } + + fn merge_items(owner: &mut Vec, incomes: &[SomeipStatisticItem]) { + incomes.iter().for_each(|income_item| { + if let Some(existed_item) = owner + .iter_mut() + .find(|owner_item| owner_item.item.id == income_item.item.id) + { + existed_item.item.num += income_item.item.num; + Self::merge_details(&mut existed_item.details, &income_item.details); + } else { + owner.push(SomeipStatisticItem { + item: SomeipStatisticDetail { + id: income_item.item.id, + num: income_item.item.num, + }, + details: income_item.details.clone(), + }); + } + }); + } + + fn merge_details(owner: &mut Vec, incomes: &[SomeipStatisticDetail]) { + incomes.iter().for_each(|income_detail| { + if let Some(existed_detail) = owner + .iter_mut() + .find(|owner_detail| owner_detail.id == income_detail.id) + { + existed_detail.num += income_detail.num; + } else { + owner.push(SomeipStatisticDetail { + id: income_detail.id, + num: income_detail.num, + }); + } + }); + } +} + +impl Default for SomeipStatistic { + fn default() -> Self { + Self::new() + } +} + +pub fn read_someip_statistic_from_pcapng( + path: &Path, + cancel: &CancellationToken, +) -> Result { + let mut services: StatisticItemMap = FxHashMap::default(); + let mut messages: StatisticItemMap = FxHashMap::default(); + let mut error: Option = None; + + match File::open(path) { + Ok(file) => { + let mut reader = + PcapNGReader::new(65536, file).map_err(|e| Error::Parse(format!("{e}")))?; + + loop { + if cancel.is_cancelled() { + break; + } + match reader.next() { + Ok((offset, block)) => { + match block { + PcapBlockOwned::NG(pcap_parser::Block::EnhancedPacket(ref epb)) => { + if let Err(e) = read_someip_statistic_from_pcapng_block( + epb.data, + &mut services, + &mut messages, + ) { + error = Some(e.to_string()); + break; + } + } + PcapBlockOwned::NG(pcap_parser::Block::SimplePacket(ref spb)) => { + if let Err(e) = read_someip_statistic_from_pcapng_block( + spb.data, + &mut services, + &mut messages, + ) { + error = Some(e.to_string()); + break; + } + } + _ => { + // skipped + } + } + reader.consume(offset); + } + Err(PcapError::Eof) => { + break; + } + Err(PcapError::Incomplete) => { + reader.refill().expect("pcapng refill failed"); + // continue; + } + Err(e) => { + let msg = e.to_string(); + error!("pcapng at offset {} : {}", reader.consumed(), msg); + error = Some(msg); + break; + } + } + } + } + Err(e) => { + error = Some(e.to_string()); + } + } + + if let Some(err) = error { + return Err(Error::Parse(err)); + } + + let result = SomeipStatistic { + services: map_statistic(&services), + messages: map_statistic(&messages), + }; + + Ok(result) +} + +type StatisticItemMap = FxHashMap; +type StatisticDetailMap = FxHashMap; + +fn add_statistic(item_map: &mut StatisticItemMap, item_id: usize, detail_id: usize) { + if let Some((item_value, detail_map)) = item_map.get_mut(&item_id) { + *item_value += 1; + if let Some(detail_value) = detail_map.get_mut(&detail_id) { + *detail_value += 1; + } else { + detail_map.insert(detail_id, 1); + } + } else { + let mut detail_map: StatisticDetailMap = FxHashMap::default(); + detail_map.insert(detail_id, 1); + item_map.insert(item_id, (1, detail_map)); + } +} + +fn map_statistic(item_map: &StatisticItemMap) -> Vec { + let mut item_vec: Vec = Vec::new(); + + for (item_id, (item_value, detail_map)) in item_map.iter() { + let mut item_statistic = SomeipStatisticItem { + item: SomeipStatisticDetail { + id: *item_id, + num: *item_value, + }, + details: Vec::new(), + }; + + for (detail_id, detail_value) in detail_map.iter() { + item_statistic.details.push(SomeipStatisticDetail { + id: *detail_id, + num: *detail_value, + }); + } + item_vec.push(item_statistic); + } + + item_vec +} + +fn read_someip_statistic_from_pcapng_block( + data: &[u8], + services: &mut StatisticItemMap, + messages: &mut StatisticItemMap, +) -> Result<(), Error> { + match etherparse::SlicedPacket::from_ethernet(data) { + Ok(value) => { + let payload = value.payload; + let total_len = payload.len(); + let mut offset: usize = 0; + while total_len - offset >= Header::LENGTH { + match Header::from_slice(payload) { + Ok(header) => { + let message_len = header.message_len(); + if total_len - offset >= message_len { + debug!("read someip statistic: {:?}", header.message_id()); + add_statistic( + services, + header.message_id.service_id as usize, + header.message_id.method_id as usize, + ); + add_statistic( + messages, + u8::from(header.message_type()) as usize, + u8::from(header.return_code()) as usize, + ); + offset += message_len; + } else { + return Err(Error::Parse(format!( + "incomplete message ({} / {} bytes)", + message_len, + total_len - offset + ))); + } + } + Err(e) => { + return Err(Error::Parse(e.to_string())); + } + } + } + Ok(()) + } + Err(e) => Err(Error::Parse(e.to_string())), + } +} + #[cfg(test)] mod test { use super::*; @@ -604,4 +859,130 @@ mod test { panic!("unexpected parse yield"); } } + + #[test] + fn test_merge_statistics() { + let mut s1 = SomeipStatistic::new(); + s1.services.push(SomeipStatisticItem { + item: SomeipStatisticDetail { id: 10001, num: 1 }, + details: vec![ + SomeipStatisticDetail { id: 1, num: 1 }, + SomeipStatisticDetail { id: 2, num: 2 }, + ], + }); + s1.messages.push(SomeipStatisticItem { + item: SomeipStatisticDetail { id: 10, num: 5 }, + details: vec![ + SomeipStatisticDetail { id: 0, num: 3 }, + SomeipStatisticDetail { id: 1, num: 2 }, + ], + }); + + let mut s2 = SomeipStatistic::new(); + s2.services.push(SomeipStatisticItem { + item: SomeipStatisticDetail { id: 10001, num: 1 }, + details: vec![ + SomeipStatisticDetail { id: 1, num: 3 }, + SomeipStatisticDetail { id: 3, num: 3 }, + ], + }); + s2.services.push(SomeipStatisticItem { + item: SomeipStatisticDetail { id: 10002, num: 3 }, + details: vec![SomeipStatisticDetail { id: 5, num: 7 }], + }); + s2.messages.push(SomeipStatisticItem { + item: SomeipStatisticDetail { id: 10, num: 1 }, + details: vec![SomeipStatisticDetail { id: 9, num: 1 }], + }); + s2.messages.push(SomeipStatisticItem { + item: SomeipStatisticDetail { id: 11, num: 3 }, + details: vec![ + SomeipStatisticDetail { id: 0, num: 1 }, + SomeipStatisticDetail { id: 9, num: 2 }, + ], + }); + + s1.merge(s2); + + assert_eq!( + s1, + SomeipStatistic { + services: [ + SomeipStatisticItem { + item: SomeipStatisticDetail { id: 10001, num: 2 }, + details: [ + SomeipStatisticDetail { id: 1, num: 4 }, + SomeipStatisticDetail { id: 2, num: 2 }, + SomeipStatisticDetail { id: 3, num: 3 } + ] + .to_vec() + }, + SomeipStatisticItem { + item: SomeipStatisticDetail { id: 10002, num: 3 }, + details: [SomeipStatisticDetail { id: 5, num: 7 }].to_vec() + } + ] + .to_vec(), + messages: [ + SomeipStatisticItem { + item: SomeipStatisticDetail { id: 10, num: 6 }, + details: [ + SomeipStatisticDetail { id: 0, num: 3 }, + SomeipStatisticDetail { id: 1, num: 2 }, + SomeipStatisticDetail { id: 9, num: 1 } + ] + .to_vec() + }, + SomeipStatisticItem { + item: SomeipStatisticDetail { id: 11, num: 3 }, + details: [ + SomeipStatisticDetail { id: 0, num: 1 }, + SomeipStatisticDetail { id: 9, num: 2 } + ] + .to_vec() + } + ] + .to_vec() + } + ); + } + + const SOMEIP_PCAPNG_FILE: &str = "../../../../application/developing/resources/someip.pcapng"; + + #[test] + fn test_read_someip_statistic_from_pcapng() { + let _ = env_logger::try_init(); + + let path = Path::new(SOMEIP_PCAPNG_FILE); + let cancel = CancellationToken::new(); + + match read_someip_statistic_from_pcapng(&path, &cancel) { + Ok(statistic) => { + assert_eq!( + statistic, + SomeipStatistic { + services: [ + SomeipStatisticItem { + item: SomeipStatisticDetail { id: 123, num: 22 }, + details: [SomeipStatisticDetail { id: 32773, num: 22 }].to_vec() + }, + SomeipStatisticItem { + item: SomeipStatisticDetail { id: 65535, num: 33 }, + details: [SomeipStatisticDetail { id: 33024, num: 33 }].to_vec() + } + ] + .to_vec(), + messages: [SomeipStatisticItem { + item: SomeipStatisticDetail { id: 2, num: 55 }, + details: [SomeipStatisticDetail { id: 0, num: 55 }].to_vec() + }] + .to_vec() + } + ); + } + Err(error) => { + panic!("{}", format!("{error}")); + } + } + } } diff --git a/application/apps/indexer/session/src/unbound/commands/someip.rs b/application/apps/indexer/session/src/unbound/commands/someip.rs index d98bb2fc48..2f609acbda 100644 --- a/application/apps/indexer/session/src/unbound/commands/someip.rs +++ b/application/apps/indexer/session/src/unbound/commands/someip.rs @@ -1,44 +1,39 @@ use super::CommandOutcome; use crate::{events::ComputationError, unbound::signal::Signal}; +use std::path::Path; + +use parsers::someip::{read_someip_statistic_from_pcapng, SomeipStatistic}; pub fn get_someip_statistic( - _files: Vec, - _signal: Signal, + files: Vec, + signal: Signal, ) -> Result, ComputationError> { - Err(ComputationError::OperationNotSupported("NYI".into())) - // use parsers::someip::{read_someip_statistic_from_pcapng, SomeipStatistic}; - // use log::{error, warn}; - // use std::path::Path; - - // let mut statistic = SomeipStatistic::new(); - // let mut error: Option = None; - // warn!("Getting statistic for: {files:?}"); - // files.iter().for_each(|file| { - // if error.is_some() { - // return; - // } - // if signal.is_cancelling() { - // return; - // } - // match read_someip_statistic_from_pcapng(Path::new(&file), &signal.token()) { - // Ok(result) => { - // statistic.merge(result); - // } - // Err(err) => { - // error = Some(err.to_string()); - // } - // } - // }); - // if let Some(err) = error { - // error!("Fail to get statistic for: {files:?}"); - // return Err(ComputationError::IoOperation(err)); - // } - // if signal.is_cancelling() { - // warn!("Operation of geting statistic for: {files:?} has been cancelled"); - // return Ok(CommandOutcome::Cancelled); - // } - // Ok(CommandOutcome::Finished( - // serde_json::to_string(&statistic) - // .map_err(|e| ComputationError::IoOperation(e.to_string()))?, - // )) + let mut statistic = SomeipStatistic::new(); + let mut error: Option = None; + files.iter().for_each(|file| { + if error.is_some() { + return; + } + if signal.is_cancelling() { + return; + } + match read_someip_statistic_from_pcapng(Path::new(&file), &signal.token()) { + Ok(result) => { + statistic.merge(result); + } + Err(err) => { + error = Some(err.to_string()); + } + } + }); + if let Some(err) = error { + return Err(ComputationError::IoOperation(err)); + } + if signal.is_cancelling() { + return Ok(CommandOutcome::Cancelled); + } + Ok(CommandOutcome::Finished( + serde_json::to_string(&statistic) + .map_err(|e| ComputationError::IoOperation(e.to_string()))?, + )) } diff --git a/application/apps/rustcore/rs-bindings/Cargo.lock b/application/apps/rustcore/rs-bindings/Cargo.lock index 684d09087d..4cd0a53f49 100644 --- a/application/apps/rustcore/rs-bindings/Cargo.lock +++ b/application/apps/rustcore/rs-bindings/Cargo.lock @@ -1996,11 +1996,14 @@ dependencies = [ "chrono-tz", "crossbeam-channel", "dlt-core", + "etherparse", "humantime", "lazy_static", "log", "memchr", + "pcap-parser", "rand", + "rustc-hash", "serde", "someip-messages", "someip-payload", diff --git a/application/apps/rustcore/ts-bindings/spec/defaults.json b/application/apps/rustcore/ts-bindings/spec/defaults.json index 21e7a22289..35d1d3f069 100644 --- a/application/apps/rustcore/ts-bindings/spec/defaults.json +++ b/application/apps/rustcore/ts-bindings/spec/defaults.json @@ -58,7 +58,7 @@ }, "jobs": { "regular": { - "execute_only": [1,2,3,4,5], + "execute_only": [], "list": { "1": "Test 1. Cancelation testing", "2": "Test 2. Wrong sequence test",