Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add builtin reporter #153

Merged
merged 17 commits into from
Jul 18, 2023
4 changes: 2 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ jobs:
run: |
cargo test --workspace --all-targets -- --nocapture
cargo test --doc
- name: Run benches
run: cargo bench --workspace --bench compare --bench trace
# - name: Run benches
# run: cargo bench --workspace --bench compare --bench trace
- name: Run examples
run: |
cargo run --example asynchronous
Expand Down
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ members = [
"minitrace-jaeger",
"minitrace-datadog",
"minitrace-opentelemetry",
"test-no-report",
]

[profile.bench]
Expand Down
3 changes: 2 additions & 1 deletion minitrace-datadog/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ readme = "README.md"
keywords = ["tracing", "span", "datadog", "jaeger", "opentelemetry"]

[dependencies]
minitrace = { version = "0.4.1", path = "../minitrace" }
log = "0.4"
minitrace = { version = "0.4.1", path = "../minitrace", features = ["report"] }
reqwest = { version = "0.11", features = ["blocking"] }
rmp-serde = "1"
serde = { version = "1", features = ["derive"] }
Expand Down
185 changes: 78 additions & 107 deletions minitrace-datadog/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,130 +11,108 @@
//! ```no_run
//! use std::net::SocketAddr;
//!
//! use futures::executor::block_on;
//! use minitrace::collector::Config;
//! use minitrace::prelude::*;
//!
//! // start trace
//! let (root_span, collector) = Span::root("root");
//! // Initialize reporter
//! let reporter = minitrace_datadog::DatadogReporter::new(
//! "127.0.0.1:8126".parse().unwrap(),
//! "asynchronous",
//! "db",
//! "select",
//! );
//! minitrace::set_reporter(reporter, Config::default());
//!
//! // finish trace
//! drop(root_span);
//!
//! // collect spans
//! let spans = block_on(collector.collect());
//!
//! // encode trace
//! const NODE_ID: u32 = 42;
//! const TRACE_ID: u64 = 42;
//! const ROOT_PARENT_SPAN_ID: u64 = 0;
//! const ERROR_CODE: i32 = 0;
//! let datadog_spans = minitrace_datadog::convert(
//! &spans,
//! NODE_ID,
//! TRACE_ID,
//! ROOT_PARENT_SPAN_ID,
//! "service_name",
//! "trace_type",
//! "resource",
//! ERROR_CODE,
//! )
//! .collect();
//!
//! // report trace
//! let socket = SocketAddr::new("127.0.0.1".parse().unwrap(), 8126);
//! minitrace_datadog::report_blocking(socket, datadog_spans).expect("report error");
//! // Start trace
//! let root = Span::root("root", SpanContext::new(TraceId(42), SpanId::default()));
//! ```

use std::collections::HashMap;
use std::error::Error;
use std::net::SocketAddr;

use minitrace::collector::Reporter;
use minitrace::prelude::*;
use rmp_serde::Serializer;
use serde::Serialize;

#[allow(clippy::too_many_arguments)]
pub fn convert<'a>(
spans: &'a [SpanRecord],
// A unique node id in the cluster to avoid span id conflict
node_id: u32,
trace_id: u64,
root_parent_span_id: u64,
service_name: &'a str,
trace_type: &'a str,
resource: &'a str,
error_code: i32,
) -> impl Iterator<Item = MPSpan<'a>> + 'a {
spans.iter().map(move |s| MPSpan {
name: s.name,
service: service_name,
trace_type,
resource,
start: s.begin_unix_time_ns as i64,
duration: s.duration_ns as i64,
meta: if s.properties.is_empty() {
None
} else {
Some(s.properties.iter().map(|(k, v)| (*k, v.as_ref())).collect())
},
error_code,
span_id: (node_id as u64) << 32 | s.id as u64,
trace_id,
parent_id: if s.parent_id == 0 {
root_parent_span_id
} else {
(node_id as u64) << 32 | s.parent_id as u64
},
})
pub struct DatadogReporter {
agent_addr: SocketAddr,
service_name: String,
resource: String,
trace_type: String,
}

pub fn report_blocking(
agent: SocketAddr,
spans: Vec<MPSpan>,
) -> Result<(), Box<dyn Error + Send + Sync + 'static>> {
let bytes = serialize(spans)?;
impl DatadogReporter {
pub fn new(
agent_addr: SocketAddr,
service_name: impl Into<String>,
resource: impl Into<String>,
trace_type: impl Into<String>,
) -> DatadogReporter {
DatadogReporter {
agent_addr,
service_name: service_name.into(),
resource: resource.into(),
trace_type: trace_type.into(),
}
}

let client = reqwest::blocking::Client::new();
let rep = client
.post(format!("http://{}/v0.4/traces", agent))
.header("Datadog-Meta-Tracer-Version", "v1.27.0")
.header("Content-Type", "application/msgpack")
.body(bytes)
.send()?;
fn convert<'a>(&'a self, spans: &'a [SpanRecord]) -> Vec<DatadogSpan<'a>> {
spans
.iter()
.map(move |s| DatadogSpan {
name: s.name,
service: &self.service_name,
trace_type: &self.trace_type,
resource: &self.resource,
start: s.begin_unix_time_ns as i64,
duration: s.duration_ns as i64,
meta: if s.properties.is_empty() {
None
} else {
Some(s.properties.iter().map(|(k, v)| (*k, v.as_ref())).collect())
},
error_code: 0,
span_id: s.span_id.0,
trace_id: s.trace_id.0 as u64,
parent_id: s.parent_id.0,
})
.collect()
}

if rep.status().as_u16() >= 400 {
let status = rep.status();
return Err(format!("{} (Status: {})", rep.text()?, status).into());
fn serialize(&self, spans: Vec<DatadogSpan>) -> Result<Vec<u8>, Box<dyn std::error::Error>> {
let mut buf = vec![0b10010001];
spans.serialize(&mut Serializer::new(&mut buf).with_struct_map())?;
Ok(buf)
}

Ok(())
fn try_report(&self, spans: &[SpanRecord]) -> Result<(), Box<dyn std::error::Error>> {
let datadog_spans = self.convert(spans);
let bytes = self.serialize(datadog_spans)?;
let client = reqwest::blocking::Client::new();
let _rep = client
.post(format!("http://{}/v0.4/traces", self.agent_addr))
.header("Datadog-Meta-Tracer-Version", "v1.27.0")
.header("Content-Type", "application/msgpack")
.body(bytes)
.send()?;
Ok(())
}
}

pub async fn report(
agent: SocketAddr,
spans: Vec<MPSpan<'_>>,
) -> Result<(), Box<dyn Error + Send + Sync + 'static>> {
let bytes = serialize(spans)?;
impl Reporter for DatadogReporter {
fn report(&mut self, spans: &[SpanRecord]) {
if spans.is_empty() {
return;
}

let client = reqwest::Client::new();
let rep = client
.post(&format!("http://{}/v0.4/traces", agent))
.header("Datadog-Meta-Tracer-Version", "v1.27.0")
.header("Content-Type", "application/msgpack")
.body(bytes)
.send()
.await?;

if rep.status().as_u16() >= 400 {
let status = rep.status();
return Err(format!("{} (Status: {})", rep.text().await?, status).into());
if let Err(err) = self.try_report(spans) {
eprintln!("report to datadog failed: {}", err);
}
}

Ok(())
}

#[derive(Serialize)]
pub struct MPSpan<'a> {
struct DatadogSpan<'a> {
name: &'a str,
service: &'a str,
#[serde(rename = "type")]
Expand All @@ -149,10 +127,3 @@ pub struct MPSpan<'a> {
trace_id: u64,
parent_id: u64,
}

fn serialize(spans: Vec<MPSpan>) -> Result<Vec<u8>, Box<dyn Error + Send + Sync + 'static>> {
let mut buf = vec![0b10010001];
spans.serialize(&mut Serializer::new(&mut buf).with_struct_map())?;

Ok(buf)
}
3 changes: 2 additions & 1 deletion minitrace-jaeger/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ readme = "README.md"
keywords = ["tracing", "span", "datadog", "jaeger", "opentelemetry"]

[dependencies]
minitrace = { version = "0.4.1", path = "../minitrace" }
log = "0.4"
minitrace = { version = "0.4.1", path = "../minitrace", features = ["report"] }
thrift_codec = "0.2"
tokio = { version = "1", features = ["net"] }

Expand Down
Loading
Loading