From 2855a52e8daed69bee32f657d4ffb4d47276fc9d Mon Sep 17 00:00:00 2001 From: Benjamin Eckel Date: Tue, 18 Jul 2023 14:33:58 -0500 Subject: [PATCH 01/12] refactor: Refactor Go architecture Refactoring Go SDK to be closer to the API and internal representation that we have with the Rust SDK. Just doing Datadog at the moment and seeking some feedback then i'll port the others and add docs and tests. --- go/adapter.go | 140 ++++----------- go/adapter/datadog/adapter.go | 105 +++++------ go/adapter/datadog_formatter/format.go | 28 +-- go/adapter/otel_formatter/format.go | 98 ----------- go/adapter/otel_stdout/adapter.go | 101 ----------- go/adapter/stdout/adapter.go | 65 ------- go/bin/datadog/main.go | 97 ++++++----- go/bin/otelstdout/main.go | 54 ------ go/bin/stdout/main.go | 54 ------ go/collector.go | 202 --------------------- go/listener.go | 20 +-- go/telemetry.go | 31 ++++ go/trace_ctx.go | 232 +++++++++++++++++++++++++ 13 files changed, 414 insertions(+), 813 deletions(-) delete mode 100644 go/adapter/otel_formatter/format.go delete mode 100644 go/adapter/otel_stdout/adapter.go delete mode 100644 go/adapter/stdout/adapter.go delete mode 100644 go/bin/otelstdout/main.go delete mode 100644 go/bin/stdout/main.go delete mode 100644 go/collector.go create mode 100644 go/telemetry.go create mode 100644 go/trace_ctx.go diff --git a/go/adapter.go b/go/adapter.go index a5db1f7..887e8a5 100644 --- a/go/adapter.go +++ b/go/adapter.go @@ -1,125 +1,55 @@ package observe -import ( - "bytes" - "errors" - "fmt" - "math/rand" - "time" - - "github.com/tetratelabs/wabin/leb128" - "github.com/tetratelabs/wabin/wasm" -) +import "log" type Adapter interface { - Start(collector *Collector, wasm []byte) error - Stop(collector *Collector) - Event(Event) + Start() + Stop() + HandleTraceEvent(TraceEvent) } -type AdapterBase struct { - Collectors map[*Collector]chan bool +type TraceEvent struct { + Events []Event + TelemetryId *TelemetryId } -func checkVersion(m *wasm.Module) error { - var minorGlobal *wasm.Export = nil - var majorGlobal *wasm.Export = nil - for _, export := range m.ExportSection { - if export.Type != wasm.ExternTypeGlobal { - continue - } - - if export.Name == "wasm_instr_version_minor" { - minorGlobal = export - } else if export.Name == "wasm_instr_version_major" { - majorGlobal = export - } - } - - if minorGlobal == nil || majorGlobal == nil { - return errors.New("wasm_instr_version functions not found") - } - - minor, _, err := leb128.DecodeUint32(bytes.NewReader(m.GlobalSection[minorGlobal.Index].Init.Data)) - if err != nil { - return err - } - major, _, err := leb128.DecodeUint32(bytes.NewReader(m.GlobalSection[majorGlobal.Index].Init.Data)) - if err != nil { - return err - } - - if major != wasmInstrVersionMajor || minor < wasmInstrVersionMinor { - return errors.New(fmt.Sprintf("Expected instrumentation version >= %d.%d but got %d.%d", wasmInstrVersionMajor, wasmInstrVersionMinor, major, minor)) - } - - return nil +type AdapterBase struct { + TraceEvents chan TraceEvent + stop chan bool } -func (a *AdapterBase) Wait(collector *Collector, timeout time.Duration, callback func()) { - for { - select { - case <-time.After(timeout): - if len(collector.Events) > 0 { - if callback != nil { - callback() - } - continue - } - a.RemoveCollector(collector) - return - } - } +func (a *AdapterBase) NewTraceCtx(wasm []byte, config *Config) (*TraceCtx, error) { + if config == nil { + config = NewDefaultConfig() + } + return NewTraceCtx(a, wasm, config) } func NewAdapterBase() AdapterBase { - a := AdapterBase{ - Collectors: map[*Collector]chan bool{}, + return AdapterBase{ + // TODO set to some kind of max, add dump logic + TraceEvents: make(chan TraceEvent, 100), } - return a } -func (a *AdapterBase) Start(collector *Collector, wasm []byte) error { - a.Collectors[collector] = make(chan bool, 1) - return collector.GetNames(wasm) -} - -func (a *AdapterBase) RemoveCollector(collector *Collector) { - delete(a.Collectors, collector) -} - -func (a *AdapterBase) Stop(collector *Collector) { - stop, ok := a.Collectors[collector] - if ok { - stop <- true - a.RemoveCollector(collector) - } -} - -func (a AdapterBase) StopChan(collector *Collector) chan bool { - return a.Collectors[collector] -} - -type TelemetryId uint64 +func (b *AdapterBase) Start(a Adapter) { + b.stop = make(chan bool) -var rng rand.Source - -func init() { - rng = rand.NewSource(time.Now().UnixNano()) -} - -func NewTraceId() TelemetryId { - return TelemetryId(rng.Int63()) -} - -func NewSpanId() TelemetryId { - return TelemetryId(rng.Int63()) -} - -func (t TelemetryId) ToHex8() string { - return fmt.Sprintf("%016x", t) + go func() { + for { + select { + case event := <-b.TraceEvents: + log.Println("Adapter Got TraceEvent") + a.HandleTraceEvent(event) + case <-b.stop: + log.Println("Adapter Stopped") + return + } + } + }() } -func (t TelemetryId) ToHex16() string { - return fmt.Sprintf("%032x", t) +func (b *AdapterBase) Stop() { + log.Println("Stopping adapter") + b.stop <- true } diff --git a/go/adapter/datadog/adapter.go b/go/adapter/datadog/adapter.go index 47e8fa6..ce22cae 100644 --- a/go/adapter/datadog/adapter.go +++ b/go/adapter/datadog/adapter.go @@ -6,8 +6,6 @@ import ( "log" "net/http" "net/url" - "strconv" - "time" "github.com/dylibso/observe-sdk/go" "github.com/dylibso/observe-sdk/go/adapter/datadog_formatter" @@ -48,71 +46,42 @@ func NewDatadogAdapter(config *DatadogConfig) (DatadogAdapter, error) { }, nil } -func (d *DatadogAdapter) Event(e observe.Event) { - switch event := e.(type) { - case observe.CallEvent: - spans := d.makeCallSpans(event, nil) - if len(spans) > 0 { - d.Spans = append(d.Spans, spans...) - } - - case observe.MemoryGrowEvent: - if len(d.Spans) > 0 { - d.Spans[len(d.Spans)-1].AddAllocation(event.MemoryGrowAmount()) - } - case observe.CustomEvent: - if value, ok := event.Metadata["trace_id"]; ok { - traceId, err := strconv.ParseUint(value.(string), 10, 64) - if err != nil { - log.Println("failed to parse traceId from event metadata:", err) - return - } - - d.TraceId = traceId - } - } -} - -func (d *DatadogAdapter) Wait(collector *observe.Collector, timeout time.Duration) { - d.AdapterBase.Wait(collector, timeout, nil) -} - -func (d *DatadogAdapter) Start(collector *observe.Collector, wasm []byte) error { - if err := d.AdapterBase.Start(collector, wasm); err != nil { - return err - } - - stop := d.StopChan(collector) - - go func() { - for { - select { - case event := <-collector.Events: - d.Event(event) - case <-stop: - return - } - } - }() - - return nil -} - -func (d *DatadogAdapter) Stop(collector *observe.Collector) { - d.AdapterBase.Stop(collector) - - if len(d.Spans) == 0 { - return - } +func (d *DatadogAdapter) HandleTraceEvent(te observe.TraceEvent) { + if te.TelemetryId == nil { + log.Println("Datadog adapter needs a trace id") + return + } + + var allSpans []datadog_formatter.Span + for _, e := range te.Events { + switch event := e.(type) { + case observe.CallEvent: + spans := d.makeCallSpans(event, nil, *te.TelemetryId) + if len(spans) > 0 { + allSpans = append(allSpans, spans...) + } + case observe.MemoryGrowEvent: + if len(d.Spans) > 0 { + allSpans[len(allSpans)-1].AddAllocation(event.MemoryGrowAmount()) + } + case observe.CustomEvent: + log.Println("Datadog adapter does not respect custom events") + } + } + + if len(allSpans) <= 1 { + log.Println("No spans built for datadog trace") + return + } go func() { output := datadog_formatter.New() // TODO: for the moment, these are hard-coded, but will transition to a programmer- // controlled API to customer these values. - d.Spans[0].Resource = "request" + allSpans[0].Resource = "request" tt := d.Config.TraceType.String() - d.Spans[0].Type = &tt - output.AddTrace(d.Spans) + allSpans[0].Type = &tt + output.AddTrace(allSpans) b, err := json.Marshal(output) if err != nil { @@ -140,14 +109,22 @@ func (d *DatadogAdapter) Stop(collector *observe.Collector) { }() } -func (d *DatadogAdapter) makeCallSpans(event observe.CallEvent, parentId *uint64) []datadog_formatter.Span { +func (d *DatadogAdapter) Start() { + d.AdapterBase.Start(d) +} + +func (d *DatadogAdapter) Stop() { + d.AdapterBase.Stop() +} + +func (d *DatadogAdapter) makeCallSpans(event observe.CallEvent, parentId *observe.TelemetryId, traceId observe.TelemetryId) []datadog_formatter.Span { name := event.FunctionName() - span := datadog_formatter.NewSpan(d.Config.ServiceName, d.TraceId, parentId, name, event.Time, event.Time.Add(event.Duration)) + span := datadog_formatter.NewSpan(d.Config.ServiceName, traceId, parentId, name, event.Time, event.Time.Add(event.Duration)) spans := []datadog_formatter.Span{*span} for _, ev := range event.Within() { if call, ok := ev.(observe.CallEvent); ok { - spans = append(spans, d.makeCallSpans(call, &span.SpanId)...) + spans = append(spans, d.makeCallSpans(call, &span.SpanId, traceId)...) } } diff --git a/go/adapter/datadog_formatter/format.go b/go/adapter/datadog_formatter/format.go index 8d8a5e1..e9777d5 100644 --- a/go/adapter/datadog_formatter/format.go +++ b/go/adapter/datadog_formatter/format.go @@ -12,24 +12,24 @@ type DatadogFormatter []Trace type Trace []Span type Span struct { - TraceId uint64 `json:"trace_id"` - SpanId uint64 `json:"span_id"` - ParentId *uint64 `json:"parent_id,omitempty"` - Name string `json:"name"` - Start uint64 `json:"start"` - Duration uint64 `json:"duration"` - Resource string `json:"resource"` - Error uint8 `json:"error"` - Meta map[string]string `json:"meta"` - Metrics map[string]string `json:"metrics"` - Service string `json:"service"` - Type *string `json:"type,omitempty"` + TraceId observe.TelemetryId `json:"trace_id"` + SpanId observe.TelemetryId `json:"span_id"` + ParentId *observe.TelemetryId `json:"parent_id,omitempty"` + Name string `json:"name"` + Start uint64 `json:"start"` + Duration uint64 `json:"duration"` + Resource string `json:"resource"` + Error uint8 `json:"error"` + Meta map[string]string `json:"meta"` + Metrics map[string]string `json:"metrics"` + Service string `json:"service"` + Type *string `json:"type,omitempty"` } -func NewSpan(service string, traceId uint64, parentId *uint64, name string, start, end time.Time) *Span { +func NewSpan(service string, traceId observe.TelemetryId, parentId *observe.TelemetryId, name string, start, end time.Time) *Span { id := observe.NewSpanId() span := Span{ - SpanId: uint64(id), + SpanId: id, ParentId: parentId, TraceId: traceId, Name: name, diff --git a/go/adapter/otel_formatter/format.go b/go/adapter/otel_formatter/format.go deleted file mode 100644 index e180228..0000000 --- a/go/adapter/otel_formatter/format.go +++ /dev/null @@ -1,98 +0,0 @@ -package otel_formatter - -import ( - "time" - - "github.com/dylibso/observe-sdk/go" -) - -type OtelFormatter struct { - ResourceSpans []ResourceSpan `json:"resourceSpans"` -} - -func New() *OtelFormatter { - return &OtelFormatter{} -} - -func (o *OtelFormatter) AddResourceSpan(span ResourceSpan) { - o.ResourceSpans = append(o.ResourceSpans, span) -} - -type ResourceSpan struct { - Resource Resource `json:"resource"` - ScopeSpans []ScopeSpan `json:"scopeSpans"` -} - -func NewResourceSpan() *ResourceSpan { - return &ResourceSpan{} -} - -func (r *ResourceSpan) AddAttribute(key string, value any) *ResourceSpan { - r.Resource.Attributes = append(r.Resource.Attributes, Attribute{Key: key, Value: value}) - return r -} - -func (r *ResourceSpan) AddSpans(spans []Span) { - r.ScopeSpans = append(r.ScopeSpans, ScopeSpan{ - Scope: Scope{ - Name: "event", - }, - Spans: spans, - }) -} - -type Resource struct { - Attributes []Attribute `json:"attributes"` -} - -type ScopeSpan struct { - Scope Scope `json:"scope"` - Spans []Span `json:"spans"` -} - -type Attribute struct { - Key string `json:"key"` - Value any `json:"value"` -} - -type Scope struct { - Name string `json:"name"` -} - -type Span struct { - TraceId string `json:"traceId"` - SpanId string `json:"spanId"` - ParentSpanId string `json:"parentSpanId"` - Name string `json:"name"` - Kind int64 `json:"kind"` - StartTimeNano int64 `json:"startTimeUnixNano"` - EndTimeNano int64 `json:"endTimeUnixNano"` - Attributes []Attribute `json:"attributes"` - DroppedAttributesCount int64 `json:"droppedAttributesCount"` - DroppedEventsCount int64 `json:"droppedEventsCount"` - DroppedLinksCount int64 `json:"droppedLinksCount"` - Status Status `json:"status"` -} - -type Status struct{} - -func NewSpan(traceId string, parentId *string, name string, start, end time.Time) *Span { - if parentId == nil { - var empty string - parentId = &empty - } - return &Span{ - TraceId: traceId, - SpanId: observe.NewSpanId().ToHex8(), - ParentSpanId: *parentId, - Name: name, - Kind: 1, - StartTimeNano: start.UnixNano(), - EndTimeNano: end.UnixNano(), - // uses empty defaults for remaining fields... - } -} - -func (s *Span) AddAttribute(key string, value any) { - s.Attributes = append(s.Attributes, Attribute{Key: key, Value: value}) -} diff --git a/go/adapter/otel_stdout/adapter.go b/go/adapter/otel_stdout/adapter.go deleted file mode 100644 index 7dfc494..0000000 --- a/go/adapter/otel_stdout/adapter.go +++ /dev/null @@ -1,101 +0,0 @@ -package otel_stdout - -import ( - "encoding/json" - "fmt" - "log" - "time" - - observe "github.com/dylibso/observe-sdk/go" - otel "github.com/dylibso/observe-sdk/go/adapter/otel_formatter" -) - -type OtelStdoutAdapter struct { - observe.AdapterBase - TraceId string -} - -func NewOtelStdoutAdapter() OtelStdoutAdapter { - base := observe.NewAdapterBase() - return OtelStdoutAdapter{ - AdapterBase: base, - TraceId: observe.NewTraceId().ToHex16(), - } -} - -func (o *OtelStdoutAdapter) Event(e observe.Event) { - switch event := e.(type) { - case observe.CallEvent: - spans := o.makeCallSpans(event, nil) - if len(spans) > 0 { - output := otel.New() - resourceSpan := otel.NewResourceSpan() - resourceSpan.AddSpans(spans) - output.AddResourceSpan(*resourceSpan) - b, err := json.Marshal(output) - if err != nil { - log.Println("failed to encode CallEvent spans") - } - - fmt.Println(string(b)) - } - - case observe.MemoryGrowEvent: - output := otel.New() - span := otel.NewSpan(o.TraceId, nil, "allocation", event.Time, event.Time) - span.AddAttribute("amount", event.MemoryGrowAmount()) - resourceSpan := otel.NewResourceSpan() - resourceSpan.AddSpans([]otel.Span{*span}) - output.AddResourceSpan(*resourceSpan) - b, err := json.Marshal(output) - if err != nil { - log.Println("failed to encode MemoryGrowEvent spans") - } - - fmt.Println(string(b)) - - case observe.CustomEvent: - if value, ok := event.Metadata["trace_id"]; ok { - o.TraceId = value.(string) - } - } -} - -func (o *OtelStdoutAdapter) Wait(collector *observe.Collector, timeout time.Duration) { - o.AdapterBase.Wait(collector, timeout, func() {}) -} - -func (o *OtelStdoutAdapter) Start(collector *observe.Collector, wasm []byte) error { - if err := o.AdapterBase.Start(collector, wasm); err != nil { - return err - } - - stop := o.StopChan(collector) - - go func() { - for { - select { - case event := <-collector.Events: - o.Event(event) - case <-stop: - return - } - } - }() - return nil -} - -func (o *OtelStdoutAdapter) makeCallSpans(event observe.CallEvent, parentId *string) []otel.Span { - name := event.FunctionName() - span := otel.NewSpan(o.TraceId, parentId, name, event.Time, event.Time.Add(event.Duration)) - span.AddAttribute("function_name", fmt.Sprintf("function-call-%s", name)) - - spans := []otel.Span{*span} - for _, ev := range event.Within() { - if call, ok := ev.(observe.CallEvent); ok { - spans = append(spans, o.makeCallSpans(call, &span.SpanId)...) - } - } - - return spans -} diff --git a/go/adapter/stdout/adapter.go b/go/adapter/stdout/adapter.go deleted file mode 100644 index ac612d5..0000000 --- a/go/adapter/stdout/adapter.go +++ /dev/null @@ -1,65 +0,0 @@ -package stdout - -import ( - "log" - "strings" - "time" - - observe "github.com/dylibso/observe-sdk/go" -) - -type StdoutAdapter struct { - observe.AdapterBase -} - -func NewStdoutAdapter() StdoutAdapter { - base := observe.NewAdapterBase() - return StdoutAdapter{AdapterBase: base} -} - -func (s *StdoutAdapter) printEvents(event observe.CallEvent, indentation int) { - name := event.FunctionName() - log.Println(strings.Repeat(" ", indentation), "Call to", name, "took", event.Duration) - for _, event := range event.Within() { - if call, ok := event.(observe.CallEvent); ok { - s.printEvents(call, indentation+1) - } - } -} - -func (s *StdoutAdapter) Event(e observe.Event) { - switch event := e.(type) { - case observe.CallEvent: - s.printEvents(event, 0) - case observe.MemoryGrowEvent: - name := event.FunctionName() - log.Println("Allocated", event.MemoryGrowAmount(), "pages of memory in", name) - case observe.CustomEvent: - log.Println(event.Name, event.Time) - } -} - -func (a *StdoutAdapter) Start(collector *observe.Collector, wasm []byte) error { - if err := a.AdapterBase.Start(collector, wasm); err != nil { - return err - } - - stop := a.StopChan(collector) - - go func() { - for { - select { - case event := <-collector.Events: - a.Event(event) - case <-stop: - return - } - } - }() - - return nil -} - -func (a *StdoutAdapter) Wait(collector *observe.Collector, timeout time.Duration) { - a.AdapterBase.Wait(collector, timeout, func() {}) -} diff --git a/go/bin/datadog/main.go b/go/bin/datadog/main.go index c601658..d97bfbc 100644 --- a/go/bin/datadog/main.go +++ b/go/bin/datadog/main.go @@ -1,60 +1,65 @@ package main import ( + "context" "log" "os" "time" - "github.com/dylibso/observe-sdk/go" "github.com/dylibso/observe-sdk/go/adapter/datadog" "github.com/tetratelabs/wazero" "github.com/tetratelabs/wazero/imports/wasi_snapshot_preview1" ) func main() { - // - // Collector API - collector := observe.NewCollector(nil) - ctx, r, err := collector.InitRuntime() - if err != nil { - log.Panicln(err) - } - defer r.Close(ctx) // This closes everything this Runtime created. - - // Load WASM from disk - wasm, err := os.ReadFile(os.Args[1]) - if err != nil { - log.Panicln(err) - } - - // Wazero stuff - // Instantiate WASI - wasi_snapshot_preview1.MustInstantiate(ctx, r) - // collector.CustomEvent("Start module", map[string]interface{}{"name": "testing"}) - - // - // Adapter API - adapter, err := datadog.NewDatadogAdapter(nil) - if err != nil { - log.Panicln(err) - } - adapter.Start(collector, wasm) - defer adapter.Wait(collector, time.Millisecond*100) - defer adapter.Stop(collector) - - adapter.SetTraceId(datadog.NewTraceId()) - - config := wazero.NewModuleConfig(). - WithStdin(os.Stdin). - WithStdout(os.Stdout). - WithStderr(os.Stderr). - WithArgs(os.Args[1:]...). - WithStartFunctions("_start") - m, err := r.InstantiateWithConfig(ctx, wasm, config) - if err != nil { - log.Panicln(err) - } - defer m.Close(ctx) - - time.Sleep(time.Second * 2) + ctx := context.Background() + + log.Println("Starting adapter") + + // we only need to create and start once per instance of our host app + ddconf := datadog.DefaultDatadogConfig() + adapter, err := datadog.NewDatadogAdapter(ddconf) + adapter.Start() + defer adapter.Stop() + + log.Println("Adapter started") + + // Load WASM from disk + wasm, err := os.ReadFile(os.Args[1]) + if err != nil { + log.Panicln(err) + } + + log.Println("Create trace ctx") + traceCtx, err := adapter.NewTraceCtx(wasm, nil) + if err != nil { + log.Panicln(err) + } + log.Println("trace ctx created") + cfg := wazero.NewRuntimeConfig().WithCustomSections(true) + rt := wazero.NewRuntimeWithConfig(ctx, cfg) + err = traceCtx.Init(ctx, rt) + if err != nil { + log.Panicln(err) + } + wasi_snapshot_preview1.MustInstantiate(ctx, rt) + log.Println("wasi inited") + + config := wazero.NewModuleConfig(). + WithStdin(os.Stdin). + WithStdout(os.Stdout). + WithStderr(os.Stderr). + WithArgs(os.Args[1:]...). + WithStartFunctions("_start") + m, err := rt.InstantiateWithConfig(ctx, wasm, config) + if err != nil { + log.Panicln(err) + } + log.Println("module run") + defer m.Close(ctx) + + traceCtx.Finish() + log.Println("trace ctx finish") + + time.Sleep(time.Second * 2) } diff --git a/go/bin/otelstdout/main.go b/go/bin/otelstdout/main.go deleted file mode 100644 index f3b5512..0000000 --- a/go/bin/otelstdout/main.go +++ /dev/null @@ -1,54 +0,0 @@ -package main - -import ( - "log" - "os" - "time" - - observe "github.com/dylibso/observe-sdk/go" - "github.com/dylibso/observe-sdk/go/adapter/otel_stdout" - "github.com/tetratelabs/wazero" - "github.com/tetratelabs/wazero/imports/wasi_snapshot_preview1" -) - -func main() { - // - // Collector API - collector := observe.NewCollector(nil) - ctx, r, err := collector.InitRuntime() - if err != nil { - log.Panicln(err) - } - defer r.Close(ctx) // This closes everything this Runtime created. - - // Load WASM from disk - wasm, err := os.ReadFile(os.Args[1]) - if err != nil { - log.Panicln(err) - } - - // Wazero stuff - // Instantiate WASI - wasi_snapshot_preview1.MustInstantiate(ctx, r) - // collector.CustomEvent("Start module", map[string]interface{}{"name": "testing"}) - - // - // Adapter API - adapter := otel_stdout.NewOtelStdoutAdapter() - if err = adapter.Start(collector, wasm); err != nil { - log.Panicln(err) - } - defer adapter.Wait(collector, time.Millisecond) - - config := wazero.NewModuleConfig(). - WithStdin(os.Stdin). - WithStdout(os.Stdout). - WithStderr(os.Stderr). - WithArgs(os.Args[1:]...). - WithStartFunctions("_start") - m, err := r.InstantiateWithConfig(ctx, wasm, config) - if err != nil { - log.Panicln(err) - } - defer m.Close(ctx) -} diff --git a/go/bin/stdout/main.go b/go/bin/stdout/main.go deleted file mode 100644 index aa1cb12..0000000 --- a/go/bin/stdout/main.go +++ /dev/null @@ -1,54 +0,0 @@ -package main - -import ( - "log" - "os" - "time" - - observe "github.com/dylibso/observe-sdk/go" - "github.com/dylibso/observe-sdk/go/adapter/stdout" - "github.com/tetratelabs/wazero" - "github.com/tetratelabs/wazero/imports/wasi_snapshot_preview1" -) - -func main() { - // - // Collector API - collector := observe.NewCollector(nil) - ctx, r, err := collector.InitRuntime() - if err != nil { - log.Panicln(err) - } - defer r.Close(ctx) // This closes everything this Runtime created. - - // Load WASM from disk - wasm, err := os.ReadFile(os.Args[1]) - if err != nil { - log.Panicln(err) - } - - // Wazero stuff - // Instantiate WASI - wasi_snapshot_preview1.MustInstantiate(ctx, r) - // collector.CustomEvent("Start module", map[string]interface{}{"name": "testing"}) - - // - // Adapter API - adapter := stdout.NewStdoutAdapter() - if err = adapter.Start(collector, wasm); err != nil { - log.Panicln(err) - } - defer adapter.Wait(collector, time.Millisecond) - - config := wazero.NewModuleConfig(). - WithStdin(os.Stdin). - WithStdout(os.Stdout). - WithStderr(os.Stderr). - WithArgs(os.Args[1:]...). - WithStartFunctions("_start") - m, err := r.InstantiateWithConfig(ctx, wasm, config) - if err != nil { - log.Panicln(err) - } - defer m.Close(ctx) -} diff --git a/go/collector.go b/go/collector.go deleted file mode 100644 index 0442c8d..0000000 --- a/go/collector.go +++ /dev/null @@ -1,202 +0,0 @@ -package observe - -import ( - "context" - "errors" - "log" - "time" - - "github.com/tetratelabs/wabin/binary" - "github.com/tetratelabs/wabin/wasm" - "github.com/tetratelabs/wazero" - "github.com/tetratelabs/wazero/api" - "github.com/tetratelabs/wazero/experimental" -) - -type Collector struct { - raw chan RawEvent - stack []CallEvent - Events chan Event - Config *Config - names map[uint32]string -} - -func (c *Collector) Names() map[uint32]string { - return c.names -} - -func (c *Collector) GetNames(data []byte) error { - features := wasm.CoreFeaturesV2 - m, err := binary.DecodeModule(data, features) - if err != nil { - return err - } - - // Check for version globals - if err := checkVersion(m); err != nil { - return err - } - - if m.NameSection == nil { - return errors.New("Name section not found") - } - - c.names = make(map[uint32]string, len(m.NameSection.FunctionNames)) - - for _, v := range m.NameSection.FunctionNames { - c.names[v.Index] = v.Name - } - - return nil -} - -func (c *Collector) clearEvents() { - for { - select { - case <-c.Events: - continue - default: - break - } - } -} - -func (c *Collector) pushFunction(ev CallEvent) { - c.stack = append(c.stack, ev) -} - -func (c *Collector) popFunction() (CallEvent, bool) { - if len(c.stack) == 0 { - return CallEvent{}, false - } - - event := c.stack[len(c.stack)-1] - c.stack = c.stack[:len(c.stack)-1] - - return event, true -} - -func (c *Collector) peekFunction() (CallEvent, bool) { - if len(c.stack) == 0 { - return CallEvent{}, false - } - - return c.stack[len(c.stack)-1], true -} - -type Config struct { - ChannelBufferSize int - RuntimeConfig wazero.RuntimeConfig -} - -func NewDefaultConfig() *Config { - return &Config{ - ChannelBufferSize: 1024, - RuntimeConfig: wazero.NewRuntimeConfig(), - } -} - -func NewCollector(config *Config) *Collector { - if config == nil { - config = NewDefaultConfig() - } - return &Collector{ - raw: make(chan RawEvent, config.ChannelBufferSize), - Events: make(chan Event, config.ChannelBufferSize), - Config: config, - names: nil, - } -} - -// TODO: consider a different initial entrypoint to create the runtime using an provided config and context: -// func (c *Collector) NewRuntimeWithConfig(ctx context.Context, config wazero.RuntimeConfig) - -func (c *Collector) InitRuntime() (context.Context, wazero.Runtime, error) { - ctx := context.WithValue(context.Background(), experimental.FunctionListenerFactoryKey{}, c) - r := wazero.NewRuntimeWithConfig(ctx, c.Config.RuntimeConfig.WithCustomSections(true)) - observe := r.NewHostModuleBuilder("dylibso_observe") - functions := observe.NewFunctionBuilder() - - functions.WithFunc(func(ctx context.Context, m api.Module, i int32) { - start := time.Now() - ev := <-c.raw - if ev.Kind != RawEnter { - log.Println("Expected event", RawEnter, "but got", ev.Kind) - } - c.pushFunction(CallEvent{Raw: []RawEvent{ev}, Time: start}) - }).Export("instrument_enter") - - functions.WithFunc(func(ctx context.Context, i int32) { - end := time.Now() - ev := <-c.raw - if ev.Kind != RawExit { - log.Println("Expected event", RawExit, "but got", ev.Kind) - return - } - fn, ok := c.peekFunction() - if !ok { - log.Println("Expected values on started function stack, but none were found") - return - } - if ev.FunctionIndex != fn.FunctionIndex() { - log.Println("Expected call to", ev.FunctionIndex, "but found call to", fn.FunctionIndex()) - return - } - - fn, _ = c.popFunction() - fn.Stop(end) - fn.Raw = append(fn.Raw, ev) - - f, ok := c.popFunction() - if !ok { - c.Events <- fn - return - } - - f.within = append(f.within, fn) - c.pushFunction(f) - }).Export("instrument_exit") - - functions.WithFunc(func(ctx context.Context, amt int32) { - ev := <-c.raw - if ev.Kind != RawMemoryGrow { - log.Println("Expected event", MemoryGrow, "but got", ev.Kind) - return - } - - if len(c.stack) > 0 { - f := c.stack[len(c.stack)-1] - ev.FunctionIndex = f.FunctionIndex() - ev.FunctionName = f.FunctionName() - } - - event := MemoryGrowEvent{ - Raw: ev, - Time: time.Now(), - } - - c.Events <- event - }).Export("instrument_memory_grow") - - _, err := observe.Instantiate(ctx) - if err != nil { - return nil, nil, err - } - return ctx, r, nil -} - -func (c *Collector) CustomEvent(name string, metadata map[string]interface{}) { - ev := NewCustomEvent(name) - ev.Metadata = metadata - c.Events <- ev -} - -func Init(config *Config) (context.Context, wazero.Runtime, *Collector, error) { - c := NewCollector(config) - ctx, r, err := c.InitRuntime() - if err != nil { - return nil, nil, nil, err - } - - return ctx, r, c, nil -} diff --git a/go/listener.go b/go/listener.go index 196d3b4..3dd387c 100644 --- a/go/listener.go +++ b/go/listener.go @@ -7,18 +7,18 @@ import ( "github.com/tetratelabs/wazero/experimental" ) -func (c *Collector) NewListener(def api.FunctionDefinition) experimental.FunctionListener { +func (t *TraceCtx) NewListener(def api.FunctionDefinition) experimental.FunctionListener { if def.GoFunction() == nil { return nil } - return c + return t } -func (c *Collector) NewFunctionListener(_ api.FunctionDefinition) experimental.FunctionListener { - return c +func (t *TraceCtx) NewFunctionListener(_ api.FunctionDefinition) experimental.FunctionListener { + return t } -func (c *Collector) Before(ctx context.Context, _ api.Module, def api.FunctionDefinition, inputs []uint64, stack experimental.StackIterator) { +func (t *TraceCtx) Before(ctx context.Context, _ api.Module, def api.FunctionDefinition, inputs []uint64, stack experimental.StackIterator) { var event RawEvent name := def.Name() @@ -26,11 +26,11 @@ func (c *Collector) Before(ctx context.Context, _ api.Module, def api.FunctionDe case "instrument_enter": event.Kind = RawEnter event.FunctionIndex = uint32(inputs[0]) - event.FunctionName = c.names[event.FunctionIndex] + event.FunctionName = t.names[event.FunctionIndex] case "instrument_exit": event.Kind = RawExit event.FunctionIndex = uint32(inputs[0]) - event.FunctionName = c.names[event.FunctionIndex] + event.FunctionName = t.names[event.FunctionIndex] case "instrument_memory_grow": event.Kind = RawMemoryGrow event.MemoryGrowAmount = uint32(inputs[0]) @@ -41,9 +41,9 @@ func (c *Collector) Before(ctx context.Context, _ api.Module, def api.FunctionDe f := stack.Function() event.Stack = append(event.Stack, f) } - c.raw <- event + t.raw <- event } -func (c *Collector) After(context.Context, api.Module, api.FunctionDefinition, []uint64) {} +func (t *TraceCtx) After(context.Context, api.Module, api.FunctionDefinition, []uint64) {} -func (c *Collector) Abort(context.Context, api.Module, api.FunctionDefinition, error) {} +func (t *TraceCtx) Abort(context.Context, api.Module, api.FunctionDefinition, error) {} diff --git a/go/telemetry.go b/go/telemetry.go new file mode 100644 index 0000000..ddbcede --- /dev/null +++ b/go/telemetry.go @@ -0,0 +1,31 @@ +package observe + +import ( + "fmt" + "math/rand" + "time" +) + +type TelemetryId uint64 + +var rng rand.Source + +func init() { + rng = rand.NewSource(time.Now().UnixNano()) +} + +func NewTraceId() TelemetryId { + return TelemetryId(rng.Int63()) +} + +func NewSpanId() TelemetryId { + return TelemetryId(rng.Int63()) +} + +func (t TelemetryId) ToHex8() string { + return fmt.Sprintf("%016x", t) +} + +func (t TelemetryId) ToHex16() string { + return fmt.Sprintf("%032x", t) +} diff --git a/go/trace_ctx.go b/go/trace_ctx.go new file mode 100644 index 0000000..b0b7aca --- /dev/null +++ b/go/trace_ctx.go @@ -0,0 +1,232 @@ +package observe + +import ( + "bytes" + "context" + "errors" + "fmt" + "log" + "time" + + "github.com/tetratelabs/wabin/binary" + "github.com/tetratelabs/wabin/leb128" + "github.com/tetratelabs/wabin/wasm" + "github.com/tetratelabs/wazero" + "github.com/tetratelabs/wazero/api" + "github.com/tetratelabs/wazero/experimental" +) + + +type Config struct { + ChannelBufferSize int + RuntimeConfig wazero.RuntimeConfig +} + +func NewDefaultConfig() *Config { + return &Config{ + ChannelBufferSize: 1024, + } +} + +type TraceCtx struct { + adapter chan TraceEvent + raw chan RawEvent + events []Event + stack []CallEvent + Config *Config + names map[uint32]string + telemetryId TelemetryId +} + +func NewTraceCtx(adapter *AdapterBase, data []byte, config *Config) (*TraceCtx, error) { + names, err := parseNames(data) + if err != nil { + return nil, err + } + + return &TraceCtx { + adapter: adapter.TraceEvents, + raw: make(chan RawEvent, config.ChannelBufferSize), + names: names, + telemetryId: NewTraceId(), + Config: config, + }, nil +} + +// Finish() will stop the trace and send the +// TraceEvent payload to the adapter +func (t *TraceCtx) Finish() { + traceEvent := TraceEvent { + Events: t.events, + TelemetryId: &t.telemetryId, + } + t.adapter <- traceEvent + // clear the trace context + t.events = nil + t.telemetryId = NewTraceId() +} + +func (t *TraceCtx) Names() map[uint32]string { + return t.names +} + +func (t *TraceCtx) pushFunction(ev CallEvent) { + t.stack = append(t.stack, ev) +} + +func (t *TraceCtx) popFunction() (CallEvent, bool) { + if len(t.stack) == 0 { + return CallEvent{}, false + } + + event := t.stack[len(t.stack)-1] + t.stack = t.stack[:len(t.stack)-1] + + return event, true +} + +func (t *TraceCtx) peekFunction() (CallEvent, bool) { + if len(t.stack) == 0 { + return CallEvent{}, false + } + + return t.stack[len(t.stack)-1], true +} + +func (t *TraceCtx) WithListener(ctx context.Context) context.Context { + return context.WithValue(ctx, experimental.FunctionListenerFactoryKey{}, t) +} + +func (t *TraceCtx) Init(ctx context.Context, r wazero.Runtime) error { + ctx = t.WithListener(ctx) + observe := r.NewHostModuleBuilder("dylibso_observe") + functions := observe.NewFunctionBuilder() + + functions.WithFunc(func(ctx context.Context, m api.Module, i int32) { + start := time.Now() + ev := <-t.raw + if ev.Kind != RawEnter { + log.Println("Expected event", RawEnter, "but got", ev.Kind) + } + t.pushFunction(CallEvent{Raw: []RawEvent{ev}, Time: start}) + }).Export("instrument_enter") + + functions.WithFunc(func(ctx context.Context, i int32) { + end := time.Now() + ev := <-t.raw + if ev.Kind != RawExit { + log.Println("Expected event", RawExit, "but got", ev.Kind) + return + } + fn, ok := t.peekFunction() + if !ok { + log.Println("Expected values on started function stack, but none were found") + return + } + if ev.FunctionIndex != fn.FunctionIndex() { + log.Println("Expected call to", ev.FunctionIndex, "but found call to", fn.FunctionIndex()) + return + } + + fn, _ = t.popFunction() + fn.Stop(end) + fn.Raw = append(fn.Raw, ev) + + f, ok := t.popFunction() + if !ok { + t.events = append(t.events, fn) + return + } + + f.within = append(f.within, fn) + t.pushFunction(f) + }).Export("instrument_exit") + + functions.WithFunc(func(ctx context.Context, amt int32) { + ev := <-t.raw + if ev.Kind != RawMemoryGrow { + log.Println("Expected event", MemoryGrow, "but got", ev.Kind) + return + } + + if len(t.stack) > 0 { + f := t.stack[len(t.stack)-1] + ev.FunctionIndex = f.FunctionIndex() + ev.FunctionName = f.FunctionName() + } + + event := MemoryGrowEvent{ + Raw: ev, + Time: time.Now(), + } + + t.events = append(t.events, event) + }).Export("instrument_memory_grow") + + _, err := observe.Instantiate(ctx) + if err != nil { + return err + } + return nil +} + +func checkVersion(m *wasm.Module) error { + var minorGlobal *wasm.Export = nil + var majorGlobal *wasm.Export = nil + for _, export := range m.ExportSection { + if export.Type != wasm.ExternTypeGlobal { + continue + } + + if export.Name == "wasm_instr_version_minor" { + minorGlobal = export + } else if export.Name == "wasm_instr_version_major" { + majorGlobal = export + } + } + + if minorGlobal == nil || majorGlobal == nil { + return errors.New("wasm_instr_version functions not found") + } + + minor, _, err := leb128.DecodeUint32(bytes.NewReader(m.GlobalSection[minorGlobal.Index].Init.Data)) + if err != nil { + return err + } + major, _, err := leb128.DecodeUint32(bytes.NewReader(m.GlobalSection[majorGlobal.Index].Init.Data)) + if err != nil { + return err + } + + if major != wasmInstrVersionMajor || minor < wasmInstrVersionMinor { + return errors.New(fmt.Sprintf("Expected instrumentation version >= %d.%d but got %d.%d", wasmInstrVersionMajor, wasmInstrVersionMinor, major, minor)) + } + + return nil +} + +func parseNames(data []byte) (map[uint32]string, error) { + features := wasm.CoreFeaturesV2 + m, err := binary.DecodeModule(data, features) + if err != nil { + return nil, err + } + + // Check for version globals + if err := checkVersion(m); err != nil { + return nil, err + } + + if m.NameSection == nil { + return nil, errors.New("Name section not found") + } + + names := make(map[uint32]string, len(m.NameSection.FunctionNames)) + + for _, v := range m.NameSection.FunctionNames { + names[v.Index] = v.Name + } + + return names, nil +} + From c0b1580a4915aa166f77c55a4892c3ea63f07327 Mon Sep 17 00:00:00 2001 From: Benjamin Eckel Date: Tue, 18 Jul 2023 14:40:11 -0500 Subject: [PATCH 02/12] fix code formatting --- go/adapter.go | 44 ++--- go/adapter/datadog/adapter.go | 190 +++++++++++----------- go/bin/datadog/main.go | 24 +-- go/telemetry.go | 16 +- go/trace_ctx.go | 292 +++++++++++++++++----------------- 5 files changed, 283 insertions(+), 283 deletions(-) diff --git a/go/adapter.go b/go/adapter.go index 887e8a5..e0c5a56 100644 --- a/go/adapter.go +++ b/go/adapter.go @@ -3,53 +3,53 @@ package observe import "log" type Adapter interface { - Start() - Stop() - HandleTraceEvent(TraceEvent) + Start() + Stop() + HandleTraceEvent(TraceEvent) } type TraceEvent struct { - Events []Event - TelemetryId *TelemetryId + Events []Event + TelemetryId *TelemetryId } type AdapterBase struct { - TraceEvents chan TraceEvent - stop chan bool + TraceEvents chan TraceEvent + stop chan bool } func (a *AdapterBase) NewTraceCtx(wasm []byte, config *Config) (*TraceCtx, error) { if config == nil { config = NewDefaultConfig() } - return NewTraceCtx(a, wasm, config) + return NewTraceCtx(a, wasm, config) } func NewAdapterBase() AdapterBase { return AdapterBase{ // TODO set to some kind of max, add dump logic - TraceEvents: make(chan TraceEvent, 100), - } + TraceEvents: make(chan TraceEvent, 100), + } } func (b *AdapterBase) Start(a Adapter) { - b.stop = make(chan bool) + b.stop = make(chan bool) - go func() { - for { - select { - case event := <-b.TraceEvents: + go func() { + for { + select { + case event := <-b.TraceEvents: log.Println("Adapter Got TraceEvent") - a.HandleTraceEvent(event) - case <-b.stop: + a.HandleTraceEvent(event) + case <-b.stop: log.Println("Adapter Stopped") - return - } - } - }() + return + } + } + }() } func (b *AdapterBase) Stop() { - log.Println("Stopping adapter") + log.Println("Stopping adapter") b.stop <- true } diff --git a/go/adapter/datadog/adapter.go b/go/adapter/datadog/adapter.go index ce22cae..98cad5e 100644 --- a/go/adapter/datadog/adapter.go +++ b/go/adapter/datadog/adapter.go @@ -1,49 +1,49 @@ package datadog import ( - "bytes" - "encoding/json" - "log" - "net/http" - "net/url" - - "github.com/dylibso/observe-sdk/go" - "github.com/dylibso/observe-sdk/go/adapter/datadog_formatter" + "bytes" + "encoding/json" + "log" + "net/http" + "net/url" + + "github.com/dylibso/observe-sdk/go" + "github.com/dylibso/observe-sdk/go/adapter/datadog_formatter" ) type DatadogConfig struct { - AgentHost string `json:"agent_host"` - ServiceName string `json:"service_name"` - DefaultTags map[string]string `json:"default_tags"` - TraceType datadog_formatter.DatadogTraceType `json:"trace_type"` + AgentHost string `json:"agent_host"` + ServiceName string `json:"service_name"` + DefaultTags map[string]string `json:"default_tags"` + TraceType datadog_formatter.DatadogTraceType `json:"trace_type"` } func DefaultDatadogConfig() *DatadogConfig { - return &DatadogConfig{ - AgentHost: "http://localhost:8126", - ServiceName: "my-wasm-service", - DefaultTags: nil, - TraceType: datadog_formatter.Web, - } + return &DatadogConfig{ + AgentHost: "http://localhost:8126", + ServiceName: "my-wasm-service", + DefaultTags: nil, + TraceType: datadog_formatter.Web, + } } type DatadogAdapter struct { - observe.AdapterBase - TraceId uint64 - Spans []datadog_formatter.Span - Config *DatadogConfig + observe.AdapterBase + TraceId uint64 + Spans []datadog_formatter.Span + Config *DatadogConfig } func NewDatadogAdapter(config *DatadogConfig) (DatadogAdapter, error) { - if config == nil { - config = DefaultDatadogConfig() - } - - return DatadogAdapter{ - AdapterBase: observe.NewAdapterBase(), - TraceId: uint64(observe.NewTraceId()), - Config: config, - }, nil + if config == nil { + config = DefaultDatadogConfig() + } + + return DatadogAdapter{ + AdapterBase: observe.NewAdapterBase(), + TraceId: uint64(observe.NewTraceId()), + Config: config, + }, nil } func (d *DatadogAdapter) HandleTraceEvent(te observe.TraceEvent) { @@ -74,39 +74,39 @@ func (d *DatadogAdapter) HandleTraceEvent(te observe.TraceEvent) { return } - go func() { - output := datadog_formatter.New() - // TODO: for the moment, these are hard-coded, but will transition to a programmer- - // controlled API to customer these values. - allSpans[0].Resource = "request" - tt := d.Config.TraceType.String() - allSpans[0].Type = &tt - output.AddTrace(allSpans) - - b, err := json.Marshal(output) - if err != nil { - log.Println("failed to encode trace data to json", err) - return - } - - data := bytes.NewBuffer(b) - - host, err := url.JoinPath(d.Config.AgentHost, "v0.3", "traces") - if err != nil { - log.Println("failed to create datadog agent endpoint url:", err) - return - } - - resp, err := http.Post(host, "application/json", data) - if err != nil { - log.Println("failed to send trace request to datadog:", err) - return - } - - if resp.StatusCode != http.StatusOK { - log.Println("unexpected status code from datadog agent:", resp.StatusCode) - } - }() + go func() { + output := datadog_formatter.New() + // TODO: for the moment, these are hard-coded, but will transition to a programmer- + // controlled API to customer these values. + allSpans[0].Resource = "request" + tt := d.Config.TraceType.String() + allSpans[0].Type = &tt + output.AddTrace(allSpans) + + b, err := json.Marshal(output) + if err != nil { + log.Println("failed to encode trace data to json", err) + return + } + + data := bytes.NewBuffer(b) + + host, err := url.JoinPath(d.Config.AgentHost, "v0.3", "traces") + if err != nil { + log.Println("failed to create datadog agent endpoint url:", err) + return + } + + resp, err := http.Post(host, "application/json", data) + if err != nil { + log.Println("failed to send trace request to datadog:", err) + return + } + + if resp.StatusCode != http.StatusOK { + log.Println("unexpected status code from datadog agent:", resp.StatusCode) + } + }() } func (d *DatadogAdapter) Start() { @@ -114,54 +114,54 @@ func (d *DatadogAdapter) Start() { } func (d *DatadogAdapter) Stop() { - d.AdapterBase.Stop() + d.AdapterBase.Stop() } func (d *DatadogAdapter) makeCallSpans(event observe.CallEvent, parentId *observe.TelemetryId, traceId observe.TelemetryId) []datadog_formatter.Span { - name := event.FunctionName() - span := datadog_formatter.NewSpan(d.Config.ServiceName, traceId, parentId, name, event.Time, event.Time.Add(event.Duration)) + name := event.FunctionName() + span := datadog_formatter.NewSpan(d.Config.ServiceName, traceId, parentId, name, event.Time, event.Time.Add(event.Duration)) - spans := []datadog_formatter.Span{*span} - for _, ev := range event.Within() { - if call, ok := ev.(observe.CallEvent); ok { - spans = append(spans, d.makeCallSpans(call, &span.SpanId, traceId)...) - } - } + spans := []datadog_formatter.Span{*span} + for _, ev := range event.Within() { + if call, ok := ev.(observe.CallEvent); ok { + spans = append(spans, d.makeCallSpans(call, &span.SpanId, traceId)...) + } + } - return spans + return spans } func NewTraceId() uint64 { - return uint64(observe.NewTraceId()) + return uint64(observe.NewTraceId()) } func (d *DatadogAdapter) SetTraceId(traceId uint64) { - d.TraceId = traceId + d.TraceId = traceId } type DatadogSpanKind int const ( - Server DatadogSpanKind = iota - Client - Producer - Consumer - Internal + Server DatadogSpanKind = iota + Client + Producer + Consumer + Internal ) func (d DatadogSpanKind) String() string { - switch d { - case Server: - return "server" - case Client: - return "client" - case Producer: - return "producer" - case Consumer: - return "consumer" - case Internal: - return "internal" - default: - return "unknown-span-kind" - } + switch d { + case Server: + return "server" + case Client: + return "client" + case Producer: + return "producer" + case Consumer: + return "consumer" + case Internal: + return "internal" + default: + return "unknown-span-kind" + } } diff --git a/go/bin/datadog/main.go b/go/bin/datadog/main.go index d97bfbc..3bde7f1 100644 --- a/go/bin/datadog/main.go +++ b/go/bin/datadog/main.go @@ -1,14 +1,14 @@ package main import ( - "context" - "log" - "os" - "time" + "context" + "log" + "os" + "time" - "github.com/dylibso/observe-sdk/go/adapter/datadog" - "github.com/tetratelabs/wazero" - "github.com/tetratelabs/wazero/imports/wasi_snapshot_preview1" + "github.com/dylibso/observe-sdk/go/adapter/datadog" + "github.com/tetratelabs/wazero" + "github.com/tetratelabs/wazero/imports/wasi_snapshot_preview1" ) func main() { @@ -46,11 +46,11 @@ func main() { log.Println("wasi inited") config := wazero.NewModuleConfig(). - WithStdin(os.Stdin). - WithStdout(os.Stdout). - WithStderr(os.Stderr). - WithArgs(os.Args[1:]...). - WithStartFunctions("_start") + WithStdin(os.Stdin). + WithStdout(os.Stdout). + WithStderr(os.Stderr). + WithArgs(os.Args[1:]...). + WithStartFunctions("_start") m, err := rt.InstantiateWithConfig(ctx, wasm, config) if err != nil { log.Panicln(err) diff --git a/go/telemetry.go b/go/telemetry.go index ddbcede..5a0602d 100644 --- a/go/telemetry.go +++ b/go/telemetry.go @@ -1,9 +1,9 @@ package observe import ( - "fmt" - "math/rand" - "time" + "fmt" + "math/rand" + "time" ) type TelemetryId uint64 @@ -11,21 +11,21 @@ type TelemetryId uint64 var rng rand.Source func init() { - rng = rand.NewSource(time.Now().UnixNano()) + rng = rand.NewSource(time.Now().UnixNano()) } func NewTraceId() TelemetryId { - return TelemetryId(rng.Int63()) + return TelemetryId(rng.Int63()) } func NewSpanId() TelemetryId { - return TelemetryId(rng.Int63()) + return TelemetryId(rng.Int63()) } func (t TelemetryId) ToHex8() string { - return fmt.Sprintf("%016x", t) + return fmt.Sprintf("%016x", t) } func (t TelemetryId) ToHex16() string { - return fmt.Sprintf("%032x", t) + return fmt.Sprintf("%032x", t) } diff --git a/go/trace_ctx.go b/go/trace_ctx.go index b0b7aca..0199235 100644 --- a/go/trace_ctx.go +++ b/go/trace_ctx.go @@ -1,30 +1,30 @@ package observe import ( - "bytes" - "context" - "errors" - "fmt" - "log" - "time" - - "github.com/tetratelabs/wabin/binary" - "github.com/tetratelabs/wabin/leb128" - "github.com/tetratelabs/wabin/wasm" - "github.com/tetratelabs/wazero" - "github.com/tetratelabs/wazero/api" - "github.com/tetratelabs/wazero/experimental" + "bytes" + "context" + "errors" + "fmt" + "log" + "time" + + "github.com/tetratelabs/wabin/binary" + "github.com/tetratelabs/wabin/leb128" + "github.com/tetratelabs/wabin/wasm" + "github.com/tetratelabs/wazero" + "github.com/tetratelabs/wazero/api" + "github.com/tetratelabs/wazero/experimental" ) type Config struct { - ChannelBufferSize int - RuntimeConfig wazero.RuntimeConfig + ChannelBufferSize int + RuntimeConfig wazero.RuntimeConfig } func NewDefaultConfig() *Config { - return &Config{ - ChannelBufferSize: 1024, + return &Config{ + ChannelBufferSize: 1024, } } @@ -67,166 +67,166 @@ func (t *TraceCtx) Finish() { } func (t *TraceCtx) Names() map[uint32]string { - return t.names + return t.names } func (t *TraceCtx) pushFunction(ev CallEvent) { - t.stack = append(t.stack, ev) + t.stack = append(t.stack, ev) } func (t *TraceCtx) popFunction() (CallEvent, bool) { - if len(t.stack) == 0 { - return CallEvent{}, false - } + if len(t.stack) == 0 { + return CallEvent{}, false + } - event := t.stack[len(t.stack)-1] - t.stack = t.stack[:len(t.stack)-1] + event := t.stack[len(t.stack)-1] + t.stack = t.stack[:len(t.stack)-1] - return event, true + return event, true } func (t *TraceCtx) peekFunction() (CallEvent, bool) { - if len(t.stack) == 0 { - return CallEvent{}, false - } + if len(t.stack) == 0 { + return CallEvent{}, false + } - return t.stack[len(t.stack)-1], true + return t.stack[len(t.stack)-1], true } func (t *TraceCtx) WithListener(ctx context.Context) context.Context { - return context.WithValue(ctx, experimental.FunctionListenerFactoryKey{}, t) + return context.WithValue(ctx, experimental.FunctionListenerFactoryKey{}, t) } func (t *TraceCtx) Init(ctx context.Context, r wazero.Runtime) error { ctx = t.WithListener(ctx) - observe := r.NewHostModuleBuilder("dylibso_observe") - functions := observe.NewFunctionBuilder() - - functions.WithFunc(func(ctx context.Context, m api.Module, i int32) { - start := time.Now() - ev := <-t.raw - if ev.Kind != RawEnter { - log.Println("Expected event", RawEnter, "but got", ev.Kind) - } - t.pushFunction(CallEvent{Raw: []RawEvent{ev}, Time: start}) - }).Export("instrument_enter") - - functions.WithFunc(func(ctx context.Context, i int32) { - end := time.Now() - ev := <-t.raw - if ev.Kind != RawExit { - log.Println("Expected event", RawExit, "but got", ev.Kind) - return - } - fn, ok := t.peekFunction() - if !ok { - log.Println("Expected values on started function stack, but none were found") - return - } - if ev.FunctionIndex != fn.FunctionIndex() { - log.Println("Expected call to", ev.FunctionIndex, "but found call to", fn.FunctionIndex()) - return - } - - fn, _ = t.popFunction() - fn.Stop(end) - fn.Raw = append(fn.Raw, ev) - - f, ok := t.popFunction() - if !ok { + observe := r.NewHostModuleBuilder("dylibso_observe") + functions := observe.NewFunctionBuilder() + + functions.WithFunc(func(ctx context.Context, m api.Module, i int32) { + start := time.Now() + ev := <-t.raw + if ev.Kind != RawEnter { + log.Println("Expected event", RawEnter, "but got", ev.Kind) + } + t.pushFunction(CallEvent{Raw: []RawEvent{ev}, Time: start}) + }).Export("instrument_enter") + + functions.WithFunc(func(ctx context.Context, i int32) { + end := time.Now() + ev := <-t.raw + if ev.Kind != RawExit { + log.Println("Expected event", RawExit, "but got", ev.Kind) + return + } + fn, ok := t.peekFunction() + if !ok { + log.Println("Expected values on started function stack, but none were found") + return + } + if ev.FunctionIndex != fn.FunctionIndex() { + log.Println("Expected call to", ev.FunctionIndex, "but found call to", fn.FunctionIndex()) + return + } + + fn, _ = t.popFunction() + fn.Stop(end) + fn.Raw = append(fn.Raw, ev) + + f, ok := t.popFunction() + if !ok { t.events = append(t.events, fn) - return - } - - f.within = append(f.within, fn) - t.pushFunction(f) - }).Export("instrument_exit") - - functions.WithFunc(func(ctx context.Context, amt int32) { - ev := <-t.raw - if ev.Kind != RawMemoryGrow { - log.Println("Expected event", MemoryGrow, "but got", ev.Kind) - return - } - - if len(t.stack) > 0 { - f := t.stack[len(t.stack)-1] - ev.FunctionIndex = f.FunctionIndex() - ev.FunctionName = f.FunctionName() - } - - event := MemoryGrowEvent{ - Raw: ev, - Time: time.Now(), - } - - t.events = append(t.events, event) - }).Export("instrument_memory_grow") - - _, err := observe.Instantiate(ctx) - if err != nil { - return err - } - return nil + return + } + + f.within = append(f.within, fn) + t.pushFunction(f) + }).Export("instrument_exit") + + functions.WithFunc(func(ctx context.Context, amt int32) { + ev := <-t.raw + if ev.Kind != RawMemoryGrow { + log.Println("Expected event", MemoryGrow, "but got", ev.Kind) + return + } + + if len(t.stack) > 0 { + f := t.stack[len(t.stack)-1] + ev.FunctionIndex = f.FunctionIndex() + ev.FunctionName = f.FunctionName() + } + + event := MemoryGrowEvent{ + Raw: ev, + Time: time.Now(), + } + + t.events = append(t.events, event) + }).Export("instrument_memory_grow") + + _, err := observe.Instantiate(ctx) + if err != nil { + return err + } + return nil } func checkVersion(m *wasm.Module) error { - var minorGlobal *wasm.Export = nil - var majorGlobal *wasm.Export = nil - for _, export := range m.ExportSection { - if export.Type != wasm.ExternTypeGlobal { - continue - } - - if export.Name == "wasm_instr_version_minor" { - minorGlobal = export - } else if export.Name == "wasm_instr_version_major" { - majorGlobal = export - } - } - - if minorGlobal == nil || majorGlobal == nil { - return errors.New("wasm_instr_version functions not found") - } - - minor, _, err := leb128.DecodeUint32(bytes.NewReader(m.GlobalSection[minorGlobal.Index].Init.Data)) - if err != nil { - return err - } - major, _, err := leb128.DecodeUint32(bytes.NewReader(m.GlobalSection[majorGlobal.Index].Init.Data)) - if err != nil { - return err - } - - if major != wasmInstrVersionMajor || minor < wasmInstrVersionMinor { - return errors.New(fmt.Sprintf("Expected instrumentation version >= %d.%d but got %d.%d", wasmInstrVersionMajor, wasmInstrVersionMinor, major, minor)) - } - - return nil + var minorGlobal *wasm.Export = nil + var majorGlobal *wasm.Export = nil + for _, export := range m.ExportSection { + if export.Type != wasm.ExternTypeGlobal { + continue + } + + if export.Name == "wasm_instr_version_minor" { + minorGlobal = export + } else if export.Name == "wasm_instr_version_major" { + majorGlobal = export + } + } + + if minorGlobal == nil || majorGlobal == nil { + return errors.New("wasm_instr_version functions not found") + } + + minor, _, err := leb128.DecodeUint32(bytes.NewReader(m.GlobalSection[minorGlobal.Index].Init.Data)) + if err != nil { + return err + } + major, _, err := leb128.DecodeUint32(bytes.NewReader(m.GlobalSection[majorGlobal.Index].Init.Data)) + if err != nil { + return err + } + + if major != wasmInstrVersionMajor || minor < wasmInstrVersionMinor { + return errors.New(fmt.Sprintf("Expected instrumentation version >= %d.%d but got %d.%d", wasmInstrVersionMajor, wasmInstrVersionMinor, major, minor)) + } + + return nil } func parseNames(data []byte) (map[uint32]string, error) { - features := wasm.CoreFeaturesV2 - m, err := binary.DecodeModule(data, features) - if err != nil { - return nil, err - } + features := wasm.CoreFeaturesV2 + m, err := binary.DecodeModule(data, features) + if err != nil { + return nil, err + } - // Check for version globals - if err := checkVersion(m); err != nil { - return nil, err - } + // Check for version globals + if err := checkVersion(m); err != nil { + return nil, err + } - if m.NameSection == nil { - return nil, errors.New("Name section not found") - } + if m.NameSection == nil { + return nil, errors.New("Name section not found") + } names := make(map[uint32]string, len(m.NameSection.FunctionNames)) - for _, v := range m.NameSection.FunctionNames { - names[v.Index] = v.Name - } + for _, v := range m.NameSection.FunctionNames { + names[v.Index] = v.Name + } - return names, nil + return names, nil } From 1d47a654d167d53a203c60f0d066364fbf34bf95 Mon Sep 17 00:00:00 2001 From: Benjamin Eckel Date: Tue, 18 Jul 2023 14:44:36 -0500 Subject: [PATCH 03/12] fix formatting --- go/adapter.go | 62 ++++----- go/telemetry.go | 16 +-- go/trace_ctx.go | 356 ++++++++++++++++++++++++------------------------ 3 files changed, 216 insertions(+), 218 deletions(-) diff --git a/go/adapter.go b/go/adapter.go index e0c5a56..037dd02 100644 --- a/go/adapter.go +++ b/go/adapter.go @@ -3,53 +3,53 @@ package observe import "log" type Adapter interface { - Start() - Stop() - HandleTraceEvent(TraceEvent) + Start() + Stop() + HandleTraceEvent(TraceEvent) } type TraceEvent struct { - Events []Event - TelemetryId *TelemetryId + Events []Event + TelemetryId *TelemetryId } type AdapterBase struct { - TraceEvents chan TraceEvent - stop chan bool + TraceEvents chan TraceEvent + stop chan bool } func (a *AdapterBase) NewTraceCtx(wasm []byte, config *Config) (*TraceCtx, error) { - if config == nil { - config = NewDefaultConfig() - } - return NewTraceCtx(a, wasm, config) + if config == nil { + config = NewDefaultConfig() + } + return NewTraceCtx(a, wasm, config) } func NewAdapterBase() AdapterBase { - return AdapterBase{ - // TODO set to some kind of max, add dump logic - TraceEvents: make(chan TraceEvent, 100), - } + return AdapterBase{ + // TODO set to some kind of max, add dump logic + TraceEvents: make(chan TraceEvent, 100), + } } func (b *AdapterBase) Start(a Adapter) { - b.stop = make(chan bool) - - go func() { - for { - select { - case event := <-b.TraceEvents: - log.Println("Adapter Got TraceEvent") - a.HandleTraceEvent(event) - case <-b.stop: - log.Println("Adapter Stopped") - return - } - } - }() + b.stop = make(chan bool) + + go func() { + for { + select { + case event := <-b.TraceEvents: + log.Println("Adapter Got TraceEvent") + a.HandleTraceEvent(event) + case <-b.stop: + log.Println("Adapter Stopped") + return + } + } + }() } func (b *AdapterBase) Stop() { - log.Println("Stopping adapter") - b.stop <- true + log.Println("Stopping adapter") + b.stop <- true } diff --git a/go/telemetry.go b/go/telemetry.go index 5a0602d..ddbcede 100644 --- a/go/telemetry.go +++ b/go/telemetry.go @@ -1,9 +1,9 @@ package observe import ( - "fmt" - "math/rand" - "time" + "fmt" + "math/rand" + "time" ) type TelemetryId uint64 @@ -11,21 +11,21 @@ type TelemetryId uint64 var rng rand.Source func init() { - rng = rand.NewSource(time.Now().UnixNano()) + rng = rand.NewSource(time.Now().UnixNano()) } func NewTraceId() TelemetryId { - return TelemetryId(rng.Int63()) + return TelemetryId(rng.Int63()) } func NewSpanId() TelemetryId { - return TelemetryId(rng.Int63()) + return TelemetryId(rng.Int63()) } func (t TelemetryId) ToHex8() string { - return fmt.Sprintf("%016x", t) + return fmt.Sprintf("%016x", t) } func (t TelemetryId) ToHex16() string { - return fmt.Sprintf("%032x", t) + return fmt.Sprintf("%032x", t) } diff --git a/go/trace_ctx.go b/go/trace_ctx.go index 0199235..971e395 100644 --- a/go/trace_ctx.go +++ b/go/trace_ctx.go @@ -1,232 +1,230 @@ package observe import ( - "bytes" - "context" - "errors" - "fmt" - "log" - "time" - - "github.com/tetratelabs/wabin/binary" - "github.com/tetratelabs/wabin/leb128" - "github.com/tetratelabs/wabin/wasm" - "github.com/tetratelabs/wazero" - "github.com/tetratelabs/wazero/api" - "github.com/tetratelabs/wazero/experimental" + "bytes" + "context" + "errors" + "fmt" + "log" + "time" + + "github.com/tetratelabs/wabin/binary" + "github.com/tetratelabs/wabin/leb128" + "github.com/tetratelabs/wabin/wasm" + "github.com/tetratelabs/wazero" + "github.com/tetratelabs/wazero/api" + "github.com/tetratelabs/wazero/experimental" ) - type Config struct { - ChannelBufferSize int - RuntimeConfig wazero.RuntimeConfig + ChannelBufferSize int + RuntimeConfig wazero.RuntimeConfig } func NewDefaultConfig() *Config { - return &Config{ - ChannelBufferSize: 1024, - } + return &Config{ + ChannelBufferSize: 1024, + } } type TraceCtx struct { - adapter chan TraceEvent - raw chan RawEvent - events []Event - stack []CallEvent - Config *Config - names map[uint32]string - telemetryId TelemetryId + adapter chan TraceEvent + raw chan RawEvent + events []Event + stack []CallEvent + Config *Config + names map[uint32]string + telemetryId TelemetryId } func NewTraceCtx(adapter *AdapterBase, data []byte, config *Config) (*TraceCtx, error) { - names, err := parseNames(data) - if err != nil { - return nil, err - } - - return &TraceCtx { - adapter: adapter.TraceEvents, - raw: make(chan RawEvent, config.ChannelBufferSize), - names: names, - telemetryId: NewTraceId(), - Config: config, - }, nil + names, err := parseNames(data) + if err != nil { + return nil, err + } + + return &TraceCtx{ + adapter: adapter.TraceEvents, + raw: make(chan RawEvent, config.ChannelBufferSize), + names: names, + telemetryId: NewTraceId(), + Config: config, + }, nil } // Finish() will stop the trace and send the // TraceEvent payload to the adapter func (t *TraceCtx) Finish() { - traceEvent := TraceEvent { - Events: t.events, - TelemetryId: &t.telemetryId, - } - t.adapter <- traceEvent - // clear the trace context - t.events = nil - t.telemetryId = NewTraceId() + traceEvent := TraceEvent{ + Events: t.events, + TelemetryId: &t.telemetryId, + } + t.adapter <- traceEvent + // clear the trace context + t.events = nil + t.telemetryId = NewTraceId() } func (t *TraceCtx) Names() map[uint32]string { - return t.names + return t.names } func (t *TraceCtx) pushFunction(ev CallEvent) { - t.stack = append(t.stack, ev) + t.stack = append(t.stack, ev) } func (t *TraceCtx) popFunction() (CallEvent, bool) { - if len(t.stack) == 0 { - return CallEvent{}, false - } + if len(t.stack) == 0 { + return CallEvent{}, false + } - event := t.stack[len(t.stack)-1] - t.stack = t.stack[:len(t.stack)-1] + event := t.stack[len(t.stack)-1] + t.stack = t.stack[:len(t.stack)-1] - return event, true + return event, true } func (t *TraceCtx) peekFunction() (CallEvent, bool) { - if len(t.stack) == 0 { - return CallEvent{}, false - } + if len(t.stack) == 0 { + return CallEvent{}, false + } - return t.stack[len(t.stack)-1], true + return t.stack[len(t.stack)-1], true } func (t *TraceCtx) WithListener(ctx context.Context) context.Context { - return context.WithValue(ctx, experimental.FunctionListenerFactoryKey{}, t) + return context.WithValue(ctx, experimental.FunctionListenerFactoryKey{}, t) } func (t *TraceCtx) Init(ctx context.Context, r wazero.Runtime) error { - ctx = t.WithListener(ctx) - observe := r.NewHostModuleBuilder("dylibso_observe") - functions := observe.NewFunctionBuilder() - - functions.WithFunc(func(ctx context.Context, m api.Module, i int32) { - start := time.Now() - ev := <-t.raw - if ev.Kind != RawEnter { - log.Println("Expected event", RawEnter, "but got", ev.Kind) - } - t.pushFunction(CallEvent{Raw: []RawEvent{ev}, Time: start}) - }).Export("instrument_enter") - - functions.WithFunc(func(ctx context.Context, i int32) { - end := time.Now() - ev := <-t.raw - if ev.Kind != RawExit { - log.Println("Expected event", RawExit, "but got", ev.Kind) - return - } - fn, ok := t.peekFunction() - if !ok { - log.Println("Expected values on started function stack, but none were found") - return - } - if ev.FunctionIndex != fn.FunctionIndex() { - log.Println("Expected call to", ev.FunctionIndex, "but found call to", fn.FunctionIndex()) - return - } - - fn, _ = t.popFunction() - fn.Stop(end) - fn.Raw = append(fn.Raw, ev) - - f, ok := t.popFunction() - if !ok { - t.events = append(t.events, fn) - return - } - - f.within = append(f.within, fn) - t.pushFunction(f) - }).Export("instrument_exit") - - functions.WithFunc(func(ctx context.Context, amt int32) { - ev := <-t.raw - if ev.Kind != RawMemoryGrow { - log.Println("Expected event", MemoryGrow, "but got", ev.Kind) - return - } - - if len(t.stack) > 0 { - f := t.stack[len(t.stack)-1] - ev.FunctionIndex = f.FunctionIndex() - ev.FunctionName = f.FunctionName() - } - - event := MemoryGrowEvent{ - Raw: ev, - Time: time.Now(), - } - - t.events = append(t.events, event) - }).Export("instrument_memory_grow") - - _, err := observe.Instantiate(ctx) - if err != nil { - return err - } - return nil + ctx = t.WithListener(ctx) + observe := r.NewHostModuleBuilder("dylibso_observe") + functions := observe.NewFunctionBuilder() + + functions.WithFunc(func(ctx context.Context, m api.Module, i int32) { + start := time.Now() + ev := <-t.raw + if ev.Kind != RawEnter { + log.Println("Expected event", RawEnter, "but got", ev.Kind) + } + t.pushFunction(CallEvent{Raw: []RawEvent{ev}, Time: start}) + }).Export("instrument_enter") + + functions.WithFunc(func(ctx context.Context, i int32) { + end := time.Now() + ev := <-t.raw + if ev.Kind != RawExit { + log.Println("Expected event", RawExit, "but got", ev.Kind) + return + } + fn, ok := t.peekFunction() + if !ok { + log.Println("Expected values on started function stack, but none were found") + return + } + if ev.FunctionIndex != fn.FunctionIndex() { + log.Println("Expected call to", ev.FunctionIndex, "but found call to", fn.FunctionIndex()) + return + } + + fn, _ = t.popFunction() + fn.Stop(end) + fn.Raw = append(fn.Raw, ev) + + f, ok := t.popFunction() + if !ok { + t.events = append(t.events, fn) + return + } + + f.within = append(f.within, fn) + t.pushFunction(f) + }).Export("instrument_exit") + + functions.WithFunc(func(ctx context.Context, amt int32) { + ev := <-t.raw + if ev.Kind != RawMemoryGrow { + log.Println("Expected event", MemoryGrow, "but got", ev.Kind) + return + } + + if len(t.stack) > 0 { + f := t.stack[len(t.stack)-1] + ev.FunctionIndex = f.FunctionIndex() + ev.FunctionName = f.FunctionName() + } + + event := MemoryGrowEvent{ + Raw: ev, + Time: time.Now(), + } + + t.events = append(t.events, event) + }).Export("instrument_memory_grow") + + _, err := observe.Instantiate(ctx) + if err != nil { + return err + } + return nil } func checkVersion(m *wasm.Module) error { - var minorGlobal *wasm.Export = nil - var majorGlobal *wasm.Export = nil - for _, export := range m.ExportSection { - if export.Type != wasm.ExternTypeGlobal { - continue - } - - if export.Name == "wasm_instr_version_minor" { - minorGlobal = export - } else if export.Name == "wasm_instr_version_major" { - majorGlobal = export - } - } - - if minorGlobal == nil || majorGlobal == nil { - return errors.New("wasm_instr_version functions not found") - } - - minor, _, err := leb128.DecodeUint32(bytes.NewReader(m.GlobalSection[minorGlobal.Index].Init.Data)) - if err != nil { - return err - } - major, _, err := leb128.DecodeUint32(bytes.NewReader(m.GlobalSection[majorGlobal.Index].Init.Data)) - if err != nil { - return err - } - - if major != wasmInstrVersionMajor || minor < wasmInstrVersionMinor { - return errors.New(fmt.Sprintf("Expected instrumentation version >= %d.%d but got %d.%d", wasmInstrVersionMajor, wasmInstrVersionMinor, major, minor)) - } - - return nil + var minorGlobal *wasm.Export = nil + var majorGlobal *wasm.Export = nil + for _, export := range m.ExportSection { + if export.Type != wasm.ExternTypeGlobal { + continue + } + + if export.Name == "wasm_instr_version_minor" { + minorGlobal = export + } else if export.Name == "wasm_instr_version_major" { + majorGlobal = export + } + } + + if minorGlobal == nil || majorGlobal == nil { + return errors.New("wasm_instr_version functions not found") + } + + minor, _, err := leb128.DecodeUint32(bytes.NewReader(m.GlobalSection[minorGlobal.Index].Init.Data)) + if err != nil { + return err + } + major, _, err := leb128.DecodeUint32(bytes.NewReader(m.GlobalSection[majorGlobal.Index].Init.Data)) + if err != nil { + return err + } + + if major != wasmInstrVersionMajor || minor < wasmInstrVersionMinor { + return errors.New(fmt.Sprintf("Expected instrumentation version >= %d.%d but got %d.%d", wasmInstrVersionMajor, wasmInstrVersionMinor, major, minor)) + } + + return nil } func parseNames(data []byte) (map[uint32]string, error) { - features := wasm.CoreFeaturesV2 - m, err := binary.DecodeModule(data, features) - if err != nil { - return nil, err - } + features := wasm.CoreFeaturesV2 + m, err := binary.DecodeModule(data, features) + if err != nil { + return nil, err + } - // Check for version globals - if err := checkVersion(m); err != nil { - return nil, err - } + // Check for version globals + if err := checkVersion(m); err != nil { + return nil, err + } - if m.NameSection == nil { - return nil, errors.New("Name section not found") - } + if m.NameSection == nil { + return nil, errors.New("Name section not found") + } - names := make(map[uint32]string, len(m.NameSection.FunctionNames)) + names := make(map[uint32]string, len(m.NameSection.FunctionNames)) - for _, v := range m.NameSection.FunctionNames { - names[v.Index] = v.Name - } + for _, v := range m.NameSection.FunctionNames { + names[v.Index] = v.Name + } - return names, nil + return names, nil } - From ce3e1c26893bb0db44701519a0e5eec426fdca8e Mon Sep 17 00:00:00 2001 From: Benjamin Eckel Date: Tue, 18 Jul 2023 15:12:02 -0500 Subject: [PATCH 04/12] combine new and init --- go/adapter.go | 11 +++-- go/bin/datadog/main.go | 103 ++++++++++++++++++----------------------- go/trace_ctx.go | 61 +++++++++++++----------- 3 files changed, 87 insertions(+), 88 deletions(-) diff --git a/go/adapter.go b/go/adapter.go index 037dd02..a297bdd 100644 --- a/go/adapter.go +++ b/go/adapter.go @@ -1,6 +1,11 @@ package observe -import "log" +import ( + "context" + "log" + + "github.com/tetratelabs/wazero" +) type Adapter interface { Start() @@ -18,11 +23,11 @@ type AdapterBase struct { stop chan bool } -func (a *AdapterBase) NewTraceCtx(wasm []byte, config *Config) (*TraceCtx, error) { +func (a *AdapterBase) NewTraceCtx(ctx context.Context, r wazero.Runtime, wasm []byte, config *Config) (*TraceCtx, error) { if config == nil { config = NewDefaultConfig() } - return NewTraceCtx(a, wasm, config) + return NewTraceCtx(ctx, a, r, wasm, config) } func NewAdapterBase() AdapterBase { diff --git a/go/bin/datadog/main.go b/go/bin/datadog/main.go index 3bde7f1..0e70b5a 100644 --- a/go/bin/datadog/main.go +++ b/go/bin/datadog/main.go @@ -1,65 +1,52 @@ package main import ( - "context" - "log" - "os" - "time" - - "github.com/dylibso/observe-sdk/go/adapter/datadog" - "github.com/tetratelabs/wazero" - "github.com/tetratelabs/wazero/imports/wasi_snapshot_preview1" + "context" + "log" + "os" + "time" + + "github.com/dylibso/observe-sdk/go/adapter/datadog" + "github.com/tetratelabs/wazero" + "github.com/tetratelabs/wazero/imports/wasi_snapshot_preview1" ) func main() { - ctx := context.Background() - - log.Println("Starting adapter") - - // we only need to create and start once per instance of our host app - ddconf := datadog.DefaultDatadogConfig() - adapter, err := datadog.NewDatadogAdapter(ddconf) - adapter.Start() - defer adapter.Stop() - - log.Println("Adapter started") - - // Load WASM from disk - wasm, err := os.ReadFile(os.Args[1]) - if err != nil { - log.Panicln(err) - } - - log.Println("Create trace ctx") - traceCtx, err := adapter.NewTraceCtx(wasm, nil) - if err != nil { - log.Panicln(err) - } - log.Println("trace ctx created") - cfg := wazero.NewRuntimeConfig().WithCustomSections(true) - rt := wazero.NewRuntimeWithConfig(ctx, cfg) - err = traceCtx.Init(ctx, rt) - if err != nil { - log.Panicln(err) - } - wasi_snapshot_preview1.MustInstantiate(ctx, rt) - log.Println("wasi inited") - - config := wazero.NewModuleConfig(). - WithStdin(os.Stdin). - WithStdout(os.Stdout). - WithStderr(os.Stderr). - WithArgs(os.Args[1:]...). - WithStartFunctions("_start") - m, err := rt.InstantiateWithConfig(ctx, wasm, config) - if err != nil { - log.Panicln(err) - } - log.Println("module run") - defer m.Close(ctx) - - traceCtx.Finish() - log.Println("trace ctx finish") - - time.Sleep(time.Second * 2) + ctx := context.Background() + + // we only need to create and start once per instance of our host app + ddconf := datadog.DefaultDatadogConfig() + adapter, err := datadog.NewDatadogAdapter(ddconf) + adapter.Start() + defer adapter.Stop() + + // Load WASM from disk + wasm, err := os.ReadFile(os.Args[1]) + if err != nil { + log.Panicln(err) + } + + cfg := wazero.NewRuntimeConfig().WithCustomSections(true) + rt := wazero.NewRuntimeWithConfig(ctx, cfg) + traceCtx, err := adapter.NewTraceCtx(ctx, rt, wasm, nil) + if err != nil { + log.Panicln(err) + } + wasi_snapshot_preview1.MustInstantiate(ctx, rt) + + config := wazero.NewModuleConfig(). + WithStdin(os.Stdin). + WithStdout(os.Stdout). + WithStderr(os.Stderr). + WithArgs(os.Args[1:]...). + WithStartFunctions("_start") + m, err := rt.InstantiateWithConfig(ctx, wasm, config) + if err != nil { + log.Panicln(err) + } + defer m.Close(ctx) + + traceCtx.Finish() + + time.Sleep(time.Second * 2) } diff --git a/go/trace_ctx.go b/go/trace_ctx.go index 971e395..997328c 100644 --- a/go/trace_ctx.go +++ b/go/trace_ctx.go @@ -37,19 +37,26 @@ type TraceCtx struct { telemetryId TelemetryId } -func NewTraceCtx(adapter *AdapterBase, data []byte, config *Config) (*TraceCtx, error) { +func NewTraceCtx(ctx context.Context, adapter *AdapterBase, r wazero.Runtime, data []byte, config *Config) (*TraceCtx, error) { names, err := parseNames(data) if err != nil { return nil, err } - return &TraceCtx{ + traceCtx := &TraceCtx{ adapter: adapter.TraceEvents, raw: make(chan RawEvent, config.ChannelBufferSize), names: names, telemetryId: NewTraceId(), Config: config, - }, nil + } + + err = traceCtx.init(ctx, r) + if err != nil { + return nil, err + } + + return traceCtx, nil } // Finish() will stop the trace and send the @@ -69,34 +76,11 @@ func (t *TraceCtx) Names() map[uint32]string { return t.names } -func (t *TraceCtx) pushFunction(ev CallEvent) { - t.stack = append(t.stack, ev) -} - -func (t *TraceCtx) popFunction() (CallEvent, bool) { - if len(t.stack) == 0 { - return CallEvent{}, false - } - - event := t.stack[len(t.stack)-1] - t.stack = t.stack[:len(t.stack)-1] - - return event, true -} - -func (t *TraceCtx) peekFunction() (CallEvent, bool) { - if len(t.stack) == 0 { - return CallEvent{}, false - } - - return t.stack[len(t.stack)-1], true -} - func (t *TraceCtx) WithListener(ctx context.Context) context.Context { return context.WithValue(ctx, experimental.FunctionListenerFactoryKey{}, t) } -func (t *TraceCtx) Init(ctx context.Context, r wazero.Runtime) error { +func (t *TraceCtx) init(ctx context.Context, r wazero.Runtime) error { ctx = t.WithListener(ctx) observe := r.NewHostModuleBuilder("dylibso_observe") functions := observe.NewFunctionBuilder() @@ -169,6 +153,29 @@ func (t *TraceCtx) Init(ctx context.Context, r wazero.Runtime) error { return nil } +func (t *TraceCtx) pushFunction(ev CallEvent) { + t.stack = append(t.stack, ev) +} + +func (t *TraceCtx) popFunction() (CallEvent, bool) { + if len(t.stack) == 0 { + return CallEvent{}, false + } + + event := t.stack[len(t.stack)-1] + t.stack = t.stack[:len(t.stack)-1] + + return event, true +} + +func (t *TraceCtx) peekFunction() (CallEvent, bool) { + if len(t.stack) == 0 { + return CallEvent{}, false + } + + return t.stack[len(t.stack)-1], true +} + func checkVersion(m *wasm.Module) error { var minorGlobal *wasm.Export = nil var majorGlobal *wasm.Export = nil From 5aef3658c3bf3c8d7dcf88b7907dd320b6644927 Mon Sep 17 00:00:00 2001 From: Benjamin Eckel Date: Tue, 18 Jul 2023 15:43:51 -0500 Subject: [PATCH 05/12] remove vistigial parts of adapter struct --- go/adapter/datadog/adapter.go | 247 ++++++++++++++++------------------ 1 file changed, 118 insertions(+), 129 deletions(-) diff --git a/go/adapter/datadog/adapter.go b/go/adapter/datadog/adapter.go index 98cad5e..8e2067b 100644 --- a/go/adapter/datadog/adapter.go +++ b/go/adapter/datadog/adapter.go @@ -1,167 +1,156 @@ package datadog import ( - "bytes" - "encoding/json" - "log" - "net/http" - "net/url" - - "github.com/dylibso/observe-sdk/go" - "github.com/dylibso/observe-sdk/go/adapter/datadog_formatter" + "bytes" + "encoding/json" + "log" + "net/http" + "net/url" + + "github.com/dylibso/observe-sdk/go" + "github.com/dylibso/observe-sdk/go/adapter/datadog_formatter" ) type DatadogConfig struct { - AgentHost string `json:"agent_host"` - ServiceName string `json:"service_name"` - DefaultTags map[string]string `json:"default_tags"` - TraceType datadog_formatter.DatadogTraceType `json:"trace_type"` + AgentHost string `json:"agent_host"` + ServiceName string `json:"service_name"` + DefaultTags map[string]string `json:"default_tags"` + TraceType datadog_formatter.DatadogTraceType `json:"trace_type"` } func DefaultDatadogConfig() *DatadogConfig { - return &DatadogConfig{ - AgentHost: "http://localhost:8126", - ServiceName: "my-wasm-service", - DefaultTags: nil, - TraceType: datadog_formatter.Web, - } + return &DatadogConfig{ + AgentHost: "http://localhost:8126", + ServiceName: "my-wasm-service", + DefaultTags: nil, + TraceType: datadog_formatter.Web, + } } type DatadogAdapter struct { - observe.AdapterBase - TraceId uint64 - Spans []datadog_formatter.Span - Config *DatadogConfig + observe.AdapterBase + Config *DatadogConfig } func NewDatadogAdapter(config *DatadogConfig) (DatadogAdapter, error) { - if config == nil { - config = DefaultDatadogConfig() - } - - return DatadogAdapter{ - AdapterBase: observe.NewAdapterBase(), - TraceId: uint64(observe.NewTraceId()), - Config: config, - }, nil + if config == nil { + config = DefaultDatadogConfig() + } + + return DatadogAdapter{ + AdapterBase: observe.NewAdapterBase(), + Config: config, + }, nil } func (d *DatadogAdapter) HandleTraceEvent(te observe.TraceEvent) { - if te.TelemetryId == nil { - log.Println("Datadog adapter needs a trace id") - return - } - - var allSpans []datadog_formatter.Span - for _, e := range te.Events { - switch event := e.(type) { - case observe.CallEvent: - spans := d.makeCallSpans(event, nil, *te.TelemetryId) - if len(spans) > 0 { - allSpans = append(allSpans, spans...) - } - case observe.MemoryGrowEvent: - if len(d.Spans) > 0 { - allSpans[len(allSpans)-1].AddAllocation(event.MemoryGrowAmount()) - } - case observe.CustomEvent: - log.Println("Datadog adapter does not respect custom events") - } - } - - if len(allSpans) <= 1 { - log.Println("No spans built for datadog trace") - return - } - - go func() { - output := datadog_formatter.New() - // TODO: for the moment, these are hard-coded, but will transition to a programmer- - // controlled API to customer these values. - allSpans[0].Resource = "request" - tt := d.Config.TraceType.String() - allSpans[0].Type = &tt - output.AddTrace(allSpans) - - b, err := json.Marshal(output) - if err != nil { - log.Println("failed to encode trace data to json", err) - return - } - - data := bytes.NewBuffer(b) - - host, err := url.JoinPath(d.Config.AgentHost, "v0.3", "traces") - if err != nil { - log.Println("failed to create datadog agent endpoint url:", err) - return - } - - resp, err := http.Post(host, "application/json", data) - if err != nil { - log.Println("failed to send trace request to datadog:", err) - return - } - - if resp.StatusCode != http.StatusOK { - log.Println("unexpected status code from datadog agent:", resp.StatusCode) - } - }() + if te.TelemetryId == nil { + log.Println("Datadog adapter needs a trace id") + return + } + + var allSpans []datadog_formatter.Span + for _, e := range te.Events { + switch event := e.(type) { + case observe.CallEvent: + spans := d.makeCallSpans(event, nil, *te.TelemetryId) + if len(spans) > 0 { + allSpans = append(allSpans, spans...) + } + case observe.MemoryGrowEvent: + if len(allSpans) > 0 { + allSpans[len(allSpans)-1].AddAllocation(event.MemoryGrowAmount()) + } + case observe.CustomEvent: + log.Println("Datadog adapter does not respect custom events") + } + } + + if len(allSpans) <= 1 { + log.Println("No spans built for datadog trace") + return + } + + go func() { + output := datadog_formatter.New() + // TODO: for the moment, these are hard-coded, but will transition to a programmer- + // controlled API to customer these values. + allSpans[0].Resource = "request" + tt := d.Config.TraceType.String() + allSpans[0].Type = &tt + output.AddTrace(allSpans) + + b, err := json.Marshal(output) + if err != nil { + log.Println("failed to encode trace data to json", err) + return + } + + data := bytes.NewBuffer(b) + + host, err := url.JoinPath(d.Config.AgentHost, "v0.3", "traces") + if err != nil { + log.Println("failed to create datadog agent endpoint url:", err) + return + } + + resp, err := http.Post(host, "application/json", data) + if err != nil { + log.Println("failed to send trace request to datadog:", err) + return + } + + if resp.StatusCode != http.StatusOK { + log.Println("unexpected status code from datadog agent:", resp.StatusCode) + } + }() } func (d *DatadogAdapter) Start() { - d.AdapterBase.Start(d) + d.AdapterBase.Start(d) } func (d *DatadogAdapter) Stop() { - d.AdapterBase.Stop() + d.AdapterBase.Stop() } func (d *DatadogAdapter) makeCallSpans(event observe.CallEvent, parentId *observe.TelemetryId, traceId observe.TelemetryId) []datadog_formatter.Span { - name := event.FunctionName() - span := datadog_formatter.NewSpan(d.Config.ServiceName, traceId, parentId, name, event.Time, event.Time.Add(event.Duration)) + name := event.FunctionName() + span := datadog_formatter.NewSpan(d.Config.ServiceName, traceId, parentId, name, event.Time, event.Time.Add(event.Duration)) - spans := []datadog_formatter.Span{*span} - for _, ev := range event.Within() { - if call, ok := ev.(observe.CallEvent); ok { - spans = append(spans, d.makeCallSpans(call, &span.SpanId, traceId)...) - } - } + spans := []datadog_formatter.Span{*span} + for _, ev := range event.Within() { + if call, ok := ev.(observe.CallEvent); ok { + spans = append(spans, d.makeCallSpans(call, &span.SpanId, traceId)...) + } + } - return spans -} - -func NewTraceId() uint64 { - return uint64(observe.NewTraceId()) -} - -func (d *DatadogAdapter) SetTraceId(traceId uint64) { - d.TraceId = traceId + return spans } type DatadogSpanKind int const ( - Server DatadogSpanKind = iota - Client - Producer - Consumer - Internal + Server DatadogSpanKind = iota + Client + Producer + Consumer + Internal ) func (d DatadogSpanKind) String() string { - switch d { - case Server: - return "server" - case Client: - return "client" - case Producer: - return "producer" - case Consumer: - return "consumer" - case Internal: - return "internal" - default: - return "unknown-span-kind" - } + switch d { + case Server: + return "server" + case Client: + return "client" + case Producer: + return "producer" + case Consumer: + return "consumer" + case Internal: + return "internal" + default: + return "unknown-span-kind" + } } From 219f80311838e6363c6c1ac706b3de9824833a93 Mon Sep 17 00:00:00 2001 From: Benjamin Eckel Date: Wed, 19 Jul 2023 11:27:52 -0500 Subject: [PATCH 06/12] Add back and refactor other adapters --- go/adapter/datadog/adapter.go | 16 ++--- go/adapter/otel_formatter/format.go | 98 ++++++++++++++++++++++++++++ go/adapter/otel_stdout/adapter.go | 99 +++++++++++++++++++++++++++++ go/adapter/stdout/adapter.go | 49 ++++++++++++++ go/bin/otelstdout/main.go | 51 +++++++++++++++ go/bin/stdout/main.go | 51 +++++++++++++++ 6 files changed, 356 insertions(+), 8 deletions(-) create mode 100644 go/adapter/otel_formatter/format.go create mode 100644 go/adapter/otel_stdout/adapter.go create mode 100644 go/adapter/stdout/adapter.go create mode 100644 go/bin/otelstdout/main.go create mode 100644 go/bin/stdout/main.go diff --git a/go/adapter/datadog/adapter.go b/go/adapter/datadog/adapter.go index 8e2067b..01ba6fd 100644 --- a/go/adapter/datadog/adapter.go +++ b/go/adapter/datadog/adapter.go @@ -43,6 +43,14 @@ func NewDatadogAdapter(config *DatadogConfig) (DatadogAdapter, error) { }, nil } +func (d *DatadogAdapter) Start() { + d.AdapterBase.Start(d) +} + +func (d *DatadogAdapter) Stop() { + d.AdapterBase.Stop() +} + func (d *DatadogAdapter) HandleTraceEvent(te observe.TraceEvent) { if te.TelemetryId == nil { log.Println("Datadog adapter needs a trace id") @@ -106,14 +114,6 @@ func (d *DatadogAdapter) HandleTraceEvent(te observe.TraceEvent) { }() } -func (d *DatadogAdapter) Start() { - d.AdapterBase.Start(d) -} - -func (d *DatadogAdapter) Stop() { - d.AdapterBase.Stop() -} - func (d *DatadogAdapter) makeCallSpans(event observe.CallEvent, parentId *observe.TelemetryId, traceId observe.TelemetryId) []datadog_formatter.Span { name := event.FunctionName() span := datadog_formatter.NewSpan(d.Config.ServiceName, traceId, parentId, name, event.Time, event.Time.Add(event.Duration)) diff --git a/go/adapter/otel_formatter/format.go b/go/adapter/otel_formatter/format.go new file mode 100644 index 0000000..e180228 --- /dev/null +++ b/go/adapter/otel_formatter/format.go @@ -0,0 +1,98 @@ +package otel_formatter + +import ( + "time" + + "github.com/dylibso/observe-sdk/go" +) + +type OtelFormatter struct { + ResourceSpans []ResourceSpan `json:"resourceSpans"` +} + +func New() *OtelFormatter { + return &OtelFormatter{} +} + +func (o *OtelFormatter) AddResourceSpan(span ResourceSpan) { + o.ResourceSpans = append(o.ResourceSpans, span) +} + +type ResourceSpan struct { + Resource Resource `json:"resource"` + ScopeSpans []ScopeSpan `json:"scopeSpans"` +} + +func NewResourceSpan() *ResourceSpan { + return &ResourceSpan{} +} + +func (r *ResourceSpan) AddAttribute(key string, value any) *ResourceSpan { + r.Resource.Attributes = append(r.Resource.Attributes, Attribute{Key: key, Value: value}) + return r +} + +func (r *ResourceSpan) AddSpans(spans []Span) { + r.ScopeSpans = append(r.ScopeSpans, ScopeSpan{ + Scope: Scope{ + Name: "event", + }, + Spans: spans, + }) +} + +type Resource struct { + Attributes []Attribute `json:"attributes"` +} + +type ScopeSpan struct { + Scope Scope `json:"scope"` + Spans []Span `json:"spans"` +} + +type Attribute struct { + Key string `json:"key"` + Value any `json:"value"` +} + +type Scope struct { + Name string `json:"name"` +} + +type Span struct { + TraceId string `json:"traceId"` + SpanId string `json:"spanId"` + ParentSpanId string `json:"parentSpanId"` + Name string `json:"name"` + Kind int64 `json:"kind"` + StartTimeNano int64 `json:"startTimeUnixNano"` + EndTimeNano int64 `json:"endTimeUnixNano"` + Attributes []Attribute `json:"attributes"` + DroppedAttributesCount int64 `json:"droppedAttributesCount"` + DroppedEventsCount int64 `json:"droppedEventsCount"` + DroppedLinksCount int64 `json:"droppedLinksCount"` + Status Status `json:"status"` +} + +type Status struct{} + +func NewSpan(traceId string, parentId *string, name string, start, end time.Time) *Span { + if parentId == nil { + var empty string + parentId = &empty + } + return &Span{ + TraceId: traceId, + SpanId: observe.NewSpanId().ToHex8(), + ParentSpanId: *parentId, + Name: name, + Kind: 1, + StartTimeNano: start.UnixNano(), + EndTimeNano: end.UnixNano(), + // uses empty defaults for remaining fields... + } +} + +func (s *Span) AddAttribute(key string, value any) { + s.Attributes = append(s.Attributes, Attribute{Key: key, Value: value}) +} diff --git a/go/adapter/otel_stdout/adapter.go b/go/adapter/otel_stdout/adapter.go new file mode 100644 index 0000000..a9b6594 --- /dev/null +++ b/go/adapter/otel_stdout/adapter.go @@ -0,0 +1,99 @@ +package otel_stdout + +import ( + "encoding/json" + "fmt" + "log" + + observe "github.com/dylibso/observe-sdk/go" + "github.com/dylibso/observe-sdk/go/adapter/otel_formatter" + otel "github.com/dylibso/observe-sdk/go/adapter/otel_formatter" +) + +type OtelStdoutAdapter struct { + observe.AdapterBase +} + +func NewOtelStdoutAdapter() OtelStdoutAdapter { + base := observe.NewAdapterBase() + return OtelStdoutAdapter{ + AdapterBase: base, + } +} +func (o *OtelStdoutAdapter) HandleTraceEvent(te observe.TraceEvent) { + if te.TelemetryId == nil { + log.Println("Otel adapter needs a trace id") + return + } + + traceId := te.TelemetryId.ToHex16() + + var allSpans []otel_formatter.Span + for _, e := range te.Events { + switch event := e.(type) { + case observe.CallEvent: + spans := o.makeCallSpans(event, nil, traceId) + if len(spans) > 0 { + allSpans = append(allSpans, spans...) + } + case observe.MemoryGrowEvent: + output := otel.New() + span := otel.NewSpan(traceId, nil, "allocation", event.Time, event.Time) + span.AddAttribute("amount", event.MemoryGrowAmount()) + resourceSpan := otel.NewResourceSpan() + resourceSpan.AddSpans([]otel.Span{*span}) + output.AddResourceSpan(*resourceSpan) + b, err := json.Marshal(output) + if err != nil { + log.Println("failed to encode MemoryGrowEvent spans") + } + + fmt.Println(string(b)) + case observe.CustomEvent: + log.Println("Otel adapter does not respect custom events") + } + } + + if len(allSpans) <= 1 { + log.Println("No spans built for datadog trace") + return + } + + log.Println(allSpans) + + output := otel.New() + resourceSpan := otel.NewResourceSpan() + resourceSpan.AddSpans(allSpans) + output.AddResourceSpan(*resourceSpan) + b, err := json.Marshal(output) + if err != nil { + log.Println("failed to encode CallEvent spans") + return + } + + fmt.Println(string(b)) + +} + +func (o *OtelStdoutAdapter) Start() { + o.AdapterBase.Start(o) +} + +func (o *OtelStdoutAdapter) Stop() { + o.AdapterBase.Stop() +} + +func (o *OtelStdoutAdapter) makeCallSpans(event observe.CallEvent, parentId *string, traceId string) []otel.Span { + name := event.FunctionName() + span := otel.NewSpan(traceId, parentId, name, event.Time, event.Time.Add(event.Duration)) + span.AddAttribute("function_name", fmt.Sprintf("function-call-%s", name)) + + spans := []otel.Span{*span} + for _, ev := range event.Within() { + if call, ok := ev.(observe.CallEvent); ok { + spans = append(spans, o.makeCallSpans(call, &span.SpanId, traceId)...) + } + } + + return spans +} diff --git a/go/adapter/stdout/adapter.go b/go/adapter/stdout/adapter.go new file mode 100644 index 0000000..30b324a --- /dev/null +++ b/go/adapter/stdout/adapter.go @@ -0,0 +1,49 @@ +package stdout + +import ( + "log" + "strings" + + observe "github.com/dylibso/observe-sdk/go" +) + +type StdoutAdapter struct { + observe.AdapterBase +} + +func NewStdoutAdapter() StdoutAdapter { + base := observe.NewAdapterBase() + return StdoutAdapter{AdapterBase: base} +} + +func (s *StdoutAdapter) printEvents(event observe.CallEvent, indentation int) { + name := event.FunctionName() + log.Println(strings.Repeat(" ", indentation), "Call to", name, "took", event.Duration) + for _, event := range event.Within() { + if call, ok := event.(observe.CallEvent); ok { + s.printEvents(call, indentation+1) + } + } + +} +func (s *StdoutAdapter) HandleTraceEvent(te observe.TraceEvent) { + for _, e := range te.Events { + switch event := e.(type) { + case observe.CallEvent: + s.printEvents(event, 0) + case observe.MemoryGrowEvent: + name := event.FunctionName() + log.Println("Allocated", event.MemoryGrowAmount(), "pages of memory in", name) + case observe.CustomEvent: + log.Println(event.Name, event.Time) + } + } +} + +func (s *StdoutAdapter) Start() { + s.AdapterBase.Start(s) +} + +func (s *StdoutAdapter) Stop() { + s.AdapterBase.Stop() +} diff --git a/go/bin/otelstdout/main.go b/go/bin/otelstdout/main.go new file mode 100644 index 0000000..dfe88ba --- /dev/null +++ b/go/bin/otelstdout/main.go @@ -0,0 +1,51 @@ +package main + +import ( + "context" + "log" + "os" + "time" + + "github.com/dylibso/observe-sdk/go/adapter/otel_stdout" + "github.com/tetratelabs/wazero" + "github.com/tetratelabs/wazero/imports/wasi_snapshot_preview1" +) + +func main() { + ctx := context.Background() + + // we only need to create and start once per instance of our host app + adapter := otel_stdout.NewOtelStdoutAdapter() + adapter.Start() + defer adapter.Stop() + + // Load WASM from disk + wasm, err := os.ReadFile(os.Args[1]) + if err != nil { + log.Panicln(err) + } + + cfg := wazero.NewRuntimeConfig().WithCustomSections(true) + rt := wazero.NewRuntimeWithConfig(ctx, cfg) + traceCtx, err := adapter.NewTraceCtx(ctx, rt, wasm, nil) + if err != nil { + log.Panicln(err) + } + wasi_snapshot_preview1.MustInstantiate(ctx, rt) + + config := wazero.NewModuleConfig(). + WithStdin(os.Stdin). + WithStdout(os.Stdout). + WithStderr(os.Stderr). + WithArgs(os.Args[1:]...). + WithStartFunctions("_start") + m, err := rt.InstantiateWithConfig(ctx, wasm, config) + if err != nil { + log.Panicln(err) + } + defer m.Close(ctx) + + traceCtx.Finish() + + time.Sleep(time.Second * 2) +} diff --git a/go/bin/stdout/main.go b/go/bin/stdout/main.go new file mode 100644 index 0000000..95c9a1f --- /dev/null +++ b/go/bin/stdout/main.go @@ -0,0 +1,51 @@ +package main + +import ( + "context" + "log" + "os" + "time" + + "github.com/dylibso/observe-sdk/go/adapter/stdout" + "github.com/tetratelabs/wazero" + "github.com/tetratelabs/wazero/imports/wasi_snapshot_preview1" +) + +func main() { + ctx := context.Background() + + // we only need to create and start once per instance of our host app + adapter := stdout.NewStdoutAdapter() + adapter.Start() + defer adapter.Stop() + + // Load WASM from disk + wasm, err := os.ReadFile(os.Args[1]) + if err != nil { + log.Panicln(err) + } + + cfg := wazero.NewRuntimeConfig().WithCustomSections(true) + rt := wazero.NewRuntimeWithConfig(ctx, cfg) + traceCtx, err := adapter.NewTraceCtx(ctx, rt, wasm, nil) + if err != nil { + log.Panicln(err) + } + wasi_snapshot_preview1.MustInstantiate(ctx, rt) + + config := wazero.NewModuleConfig(). + WithStdin(os.Stdin). + WithStdout(os.Stdout). + WithStderr(os.Stderr). + WithArgs(os.Args[1:]...). + WithStartFunctions("_start") + m, err := rt.InstantiateWithConfig(ctx, wasm, config) + if err != nil { + log.Panicln(err) + } + defer m.Close(ctx) + + traceCtx.Finish() + + time.Sleep(time.Second * 2) +} From f1d84e1ba7ec75f4d57f5dd686c234cc6ff69910 Mon Sep 17 00:00:00 2001 From: Benjamin Eckel Date: Wed, 19 Jul 2023 11:36:51 -0500 Subject: [PATCH 07/12] move some more out of trace_ctx --- go/trace_ctx.go | 66 --------------------------------------------- go/wasm.go | 71 +++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 71 insertions(+), 66 deletions(-) create mode 100644 go/wasm.go diff --git a/go/trace_ctx.go b/go/trace_ctx.go index 997328c..b70c279 100644 --- a/go/trace_ctx.go +++ b/go/trace_ctx.go @@ -1,16 +1,10 @@ package observe import ( - "bytes" "context" - "errors" - "fmt" "log" "time" - "github.com/tetratelabs/wabin/binary" - "github.com/tetratelabs/wabin/leb128" - "github.com/tetratelabs/wabin/wasm" "github.com/tetratelabs/wazero" "github.com/tetratelabs/wazero/api" "github.com/tetratelabs/wazero/experimental" @@ -175,63 +169,3 @@ func (t *TraceCtx) peekFunction() (CallEvent, bool) { return t.stack[len(t.stack)-1], true } - -func checkVersion(m *wasm.Module) error { - var minorGlobal *wasm.Export = nil - var majorGlobal *wasm.Export = nil - for _, export := range m.ExportSection { - if export.Type != wasm.ExternTypeGlobal { - continue - } - - if export.Name == "wasm_instr_version_minor" { - minorGlobal = export - } else if export.Name == "wasm_instr_version_major" { - majorGlobal = export - } - } - - if minorGlobal == nil || majorGlobal == nil { - return errors.New("wasm_instr_version functions not found") - } - - minor, _, err := leb128.DecodeUint32(bytes.NewReader(m.GlobalSection[minorGlobal.Index].Init.Data)) - if err != nil { - return err - } - major, _, err := leb128.DecodeUint32(bytes.NewReader(m.GlobalSection[majorGlobal.Index].Init.Data)) - if err != nil { - return err - } - - if major != wasmInstrVersionMajor || minor < wasmInstrVersionMinor { - return errors.New(fmt.Sprintf("Expected instrumentation version >= %d.%d but got %d.%d", wasmInstrVersionMajor, wasmInstrVersionMinor, major, minor)) - } - - return nil -} - -func parseNames(data []byte) (map[uint32]string, error) { - features := wasm.CoreFeaturesV2 - m, err := binary.DecodeModule(data, features) - if err != nil { - return nil, err - } - - // Check for version globals - if err := checkVersion(m); err != nil { - return nil, err - } - - if m.NameSection == nil { - return nil, errors.New("Name section not found") - } - - names := make(map[uint32]string, len(m.NameSection.FunctionNames)) - - for _, v := range m.NameSection.FunctionNames { - names[v.Index] = v.Name - } - - return names, nil -} diff --git a/go/wasm.go b/go/wasm.go new file mode 100644 index 0000000..dbb7656 --- /dev/null +++ b/go/wasm.go @@ -0,0 +1,71 @@ +package observe + +import ( + "bytes" + "errors" + "fmt" + + "github.com/tetratelabs/wabin/binary" + "github.com/tetratelabs/wabin/leb128" + "github.com/tetratelabs/wabin/wasm" +) + +func checkVersion(m *wasm.Module) error { + var minorGlobal *wasm.Export = nil + var majorGlobal *wasm.Export = nil + for _, export := range m.ExportSection { + if export.Type != wasm.ExternTypeGlobal { + continue + } + + if export.Name == "wasm_instr_version_minor" { + minorGlobal = export + } else if export.Name == "wasm_instr_version_major" { + majorGlobal = export + } + } + + if minorGlobal == nil || majorGlobal == nil { + return errors.New("wasm_instr_version functions not found") + } + + minor, _, err := leb128.DecodeUint32(bytes.NewReader(m.GlobalSection[minorGlobal.Index].Init.Data)) + if err != nil { + return err + } + major, _, err := leb128.DecodeUint32(bytes.NewReader(m.GlobalSection[majorGlobal.Index].Init.Data)) + if err != nil { + return err + } + + if major != wasmInstrVersionMajor || minor < wasmInstrVersionMinor { + return errors.New(fmt.Sprintf("Expected instrumentation version >= %d.%d but got %d.%d", wasmInstrVersionMajor, wasmInstrVersionMinor, major, minor)) + } + + return nil +} + +func parseNames(data []byte) (map[uint32]string, error) { + features := wasm.CoreFeaturesV2 + m, err := binary.DecodeModule(data, features) + if err != nil { + return nil, err + } + + // Check for version globals + if err := checkVersion(m); err != nil { + return nil, err + } + + if m.NameSection == nil { + return nil, errors.New("Name section not found") + } + + names := make(map[uint32]string, len(m.NameSection.FunctionNames)) + + for _, v := range m.NameSection.FunctionNames { + names[v.Index] = v.Name + } + + return names, nil +} From 3f3e157f2b42df7e5bd3fb0ff8bb5f1c7592447c Mon Sep 17 00:00:00 2001 From: Benjamin Eckel Date: Wed, 19 Jul 2023 12:24:22 -0500 Subject: [PATCH 08/12] Add 128bit support to TelemetryId, add comments --- go/adapter.go | 9 ++++++++- go/event.go | 3 +++ go/listener.go | 6 ++++++ go/telemetry.go | 28 +++++++++++++++++++++++----- go/trace_ctx.go | 24 ++++++++++++++++-------- go/version.go | 4 ---- go/wasm.go | 7 +++++++ 7 files changed, 63 insertions(+), 18 deletions(-) delete mode 100644 go/version.go diff --git a/go/adapter.go b/go/adapter.go index a297bdd..017f464 100644 --- a/go/adapter.go +++ b/go/adapter.go @@ -7,17 +7,24 @@ import ( "github.com/tetratelabs/wazero" ) +// The primary interface that every Adapter needs to follow +// Start() and Stop() can just call the implementations on AdapterBase +// or provide some custom logic. HandleTraceEvent is called after +// an invocation of a wasm module is done and all events are collected. type Adapter interface { Start() Stop() HandleTraceEvent(TraceEvent) } +// The payload that contains all the Events +// from a single wasm module invocation type TraceEvent struct { Events []Event TelemetryId *TelemetryId } +// Shared implementation for all Adapters type AdapterBase struct { TraceEvents chan TraceEvent stop chan bool @@ -27,7 +34,7 @@ func (a *AdapterBase) NewTraceCtx(ctx context.Context, r wazero.Runtime, wasm [] if config == nil { config = NewDefaultConfig() } - return NewTraceCtx(ctx, a, r, wasm, config) + return newTraceCtx(ctx, a, r, wasm, config) } func NewAdapterBase() AdapterBase { diff --git a/go/event.go b/go/event.go index ee6b9ae..02784cb 100644 --- a/go/event.go +++ b/go/event.go @@ -23,6 +23,9 @@ const ( Custom ) +// Represents the raw event in our Observe form. +// Events are transformed into vendor specific formats +// in the Adapters. type RawEvent struct { Kind RawEventKind Stack []experimental.InternalFunction diff --git a/go/listener.go b/go/listener.go index 3dd387c..0b81922 100644 --- a/go/listener.go +++ b/go/listener.go @@ -7,6 +7,7 @@ import ( "github.com/tetratelabs/wazero/experimental" ) +// Implements the NewListener() method to satisfy the FunctionListener interface func (t *TraceCtx) NewListener(def api.FunctionDefinition) experimental.FunctionListener { if def.GoFunction() == nil { return nil @@ -14,10 +15,13 @@ func (t *TraceCtx) NewListener(def api.FunctionDefinition) experimental.Function return t } +// Implements the NewFunctionListener() method to satisfy the FunctionListener interface func (t *TraceCtx) NewFunctionListener(_ api.FunctionDefinition) experimental.FunctionListener { return t } +// Implements the Before() method to satisfy the FunctionListener interface. +// This takes events from the wazero runtime and sends them to the `raw` channel on the TraceCtx. func (t *TraceCtx) Before(ctx context.Context, _ api.Module, def api.FunctionDefinition, inputs []uint64, stack experimental.StackIterator) { var event RawEvent name := def.Name() @@ -44,6 +48,8 @@ func (t *TraceCtx) Before(ctx context.Context, _ api.Module, def api.FunctionDef t.raw <- event } +// Null implementation of the After() method to satisfy the FunctionListener interface. func (t *TraceCtx) After(context.Context, api.Module, api.FunctionDefinition, []uint64) {} +// Null implementation of the Abort() method to satisfy the FunctionListener interface. func (t *TraceCtx) Abort(context.Context, api.Module, api.FunctionDefinition, error) {} diff --git a/go/telemetry.go b/go/telemetry.go index ddbcede..ce29c4f 100644 --- a/go/telemetry.go +++ b/go/telemetry.go @@ -6,7 +6,13 @@ import ( "time" ) -type TelemetryId uint64 +// This is a shared type for a span or trace id. +// It's represented by 2 uint64s and can be transformed +// to different string or int representations where needed. +type TelemetryId struct { + lsb uint64 + msb uint64 +} var rng rand.Source @@ -14,18 +20,30 @@ func init() { rng = rand.NewSource(time.Now().UnixNano()) } +// Create a new trace id func NewTraceId() TelemetryId { - return TelemetryId(rng.Int63()) + return TelemetryId{ + msb: uint64(rng.Int63()), + lsb: uint64(rng.Int63()), + } } +// Create a new span id func NewSpanId() TelemetryId { - return TelemetryId(rng.Int63()) + return TelemetryId{ + msb: uint64(rng.Int63()), + lsb: uint64(rng.Int63()), + } } +// Encode this id into an 8 byte hex (16 chars) +// Just uses the least significant of the 16 bytes func (t TelemetryId) ToHex8() string { - return fmt.Sprintf("%016x", t) + return fmt.Sprintf("%016x", t.lsb) } +// Encode this id into a 16 byte hex (32 chars) +// Uses both 16 byte uint64 values func (t TelemetryId) ToHex16() string { - return fmt.Sprintf("%032x", t) + return fmt.Sprintf("%016x%016x", t.msb, t.lsb) } diff --git a/go/trace_ctx.go b/go/trace_ctx.go index b70c279..32135af 100644 --- a/go/trace_ctx.go +++ b/go/trace_ctx.go @@ -10,17 +10,22 @@ import ( "github.com/tetratelabs/wazero/experimental" ) +// The configuration object for the observe SDK type Config struct { ChannelBufferSize int - RuntimeConfig wazero.RuntimeConfig } +// Create a default configuration func NewDefaultConfig() *Config { return &Config{ ChannelBufferSize: 1024, } } +// TraceCtx holds the context for a trace, or wasm module invocation. +// It collects holds a channel to the Adapter and from the wazero Listener +// It will collect events throughout the invocation of the function. Calling +// Finish() will then submit those events to the Adapter to be processed and sent type TraceCtx struct { adapter chan TraceEvent raw chan RawEvent @@ -31,7 +36,8 @@ type TraceCtx struct { telemetryId TelemetryId } -func NewTraceCtx(ctx context.Context, adapter *AdapterBase, r wazero.Runtime, data []byte, config *Config) (*TraceCtx, error) { +// Creates a new TraceCtx. Used internally by the Adapter. The user should create the trace context from the Adapter. +func newTraceCtx(ctx context.Context, adapter *AdapterBase, r wazero.Runtime, data []byte, config *Config) (*TraceCtx, error) { names, err := parseNames(data) if err != nil { return nil, err @@ -66,16 +72,15 @@ func (t *TraceCtx) Finish() { t.telemetryId = NewTraceId() } -func (t *TraceCtx) Names() map[uint32]string { - return t.names -} - -func (t *TraceCtx) WithListener(ctx context.Context) context.Context { +// Attaches the wazero FunctionListener to the context +func (t *TraceCtx) withListener(ctx context.Context) context.Context { return context.WithValue(ctx, experimental.FunctionListenerFactoryKey{}, t) } +// Initializes the TraceCtx. This connects up the channels with events from the FunctionListener. +// Should only be called once. func (t *TraceCtx) init(ctx context.Context, r wazero.Runtime) error { - ctx = t.WithListener(ctx) + ctx = t.withListener(ctx) observe := r.NewHostModuleBuilder("dylibso_observe") functions := observe.NewFunctionBuilder() @@ -147,10 +152,12 @@ func (t *TraceCtx) init(ctx context.Context, r wazero.Runtime) error { return nil } +// Pushes a function onto the stack func (t *TraceCtx) pushFunction(ev CallEvent) { t.stack = append(t.stack, ev) } +// Pops a function off the stack func (t *TraceCtx) popFunction() (CallEvent, bool) { if len(t.stack) == 0 { return CallEvent{}, false @@ -162,6 +169,7 @@ func (t *TraceCtx) popFunction() (CallEvent, bool) { return event, true } +// Peek at the function on top of the stack without modifying func (t *TraceCtx) peekFunction() (CallEvent, bool) { if len(t.stack) == 0 { return CallEvent{}, false diff --git a/go/version.go b/go/version.go deleted file mode 100644 index 5ffa6e5..0000000 --- a/go/version.go +++ /dev/null @@ -1,4 +0,0 @@ -package observe - -const wasmInstrVersionMajor = 0 -const wasmInstrVersionMinor = 0 // TODO: bump this to match compiler when ready diff --git a/go/wasm.go b/go/wasm.go index dbb7656..3a5d3c3 100644 --- a/go/wasm.go +++ b/go/wasm.go @@ -10,6 +10,11 @@ import ( "github.com/tetratelabs/wabin/wasm" ) +const wasmInstrVersionMajor = 0 +const wasmInstrVersionMinor = 0 // TODO: bump this to match compiler when ready + +// make sure that our function was instrumented with a compatible +// version of wasm-instr func checkVersion(m *wasm.Module) error { var minorGlobal *wasm.Export = nil var majorGlobal *wasm.Export = nil @@ -45,6 +50,8 @@ func checkVersion(m *wasm.Module) error { return nil } +// Parse the names of the functions out of the +// names custom section in the wasm binary. func parseNames(data []byte) (map[uint32]string, error) { features := wasm.CoreFeaturesV2 m, err := binary.DecodeModule(data, features) From 80547480249375bddd20666315a4b433949a82de Mon Sep 17 00:00:00 2001 From: Benjamin Eckel Date: Wed, 19 Jul 2023 13:04:19 -0500 Subject: [PATCH 09/12] fix datadog adapter --- go/adapter/datadog/adapter.go | 5 +++-- go/adapter/datadog_formatter/format.go | 28 +++++++++++++------------- go/telemetry.go | 5 +++++ 3 files changed, 22 insertions(+), 16 deletions(-) diff --git a/go/adapter/datadog/adapter.go b/go/adapter/datadog/adapter.go index 01ba6fd..62d334e 100644 --- a/go/adapter/datadog/adapter.go +++ b/go/adapter/datadog/adapter.go @@ -61,7 +61,8 @@ func (d *DatadogAdapter) HandleTraceEvent(te observe.TraceEvent) { for _, e := range te.Events { switch event := e.(type) { case observe.CallEvent: - spans := d.makeCallSpans(event, nil, *te.TelemetryId) + traceId := te.TelemetryId.ToUint64() + spans := d.makeCallSpans(event, nil, traceId) if len(spans) > 0 { allSpans = append(allSpans, spans...) } @@ -114,7 +115,7 @@ func (d *DatadogAdapter) HandleTraceEvent(te observe.TraceEvent) { }() } -func (d *DatadogAdapter) makeCallSpans(event observe.CallEvent, parentId *observe.TelemetryId, traceId observe.TelemetryId) []datadog_formatter.Span { +func (d *DatadogAdapter) makeCallSpans(event observe.CallEvent, parentId *uint64, traceId uint64) []datadog_formatter.Span { name := event.FunctionName() span := datadog_formatter.NewSpan(d.Config.ServiceName, traceId, parentId, name, event.Time, event.Time.Add(event.Duration)) diff --git a/go/adapter/datadog_formatter/format.go b/go/adapter/datadog_formatter/format.go index e9777d5..883a9bf 100644 --- a/go/adapter/datadog_formatter/format.go +++ b/go/adapter/datadog_formatter/format.go @@ -12,24 +12,24 @@ type DatadogFormatter []Trace type Trace []Span type Span struct { - TraceId observe.TelemetryId `json:"trace_id"` - SpanId observe.TelemetryId `json:"span_id"` - ParentId *observe.TelemetryId `json:"parent_id,omitempty"` - Name string `json:"name"` - Start uint64 `json:"start"` - Duration uint64 `json:"duration"` - Resource string `json:"resource"` - Error uint8 `json:"error"` - Meta map[string]string `json:"meta"` - Metrics map[string]string `json:"metrics"` - Service string `json:"service"` - Type *string `json:"type,omitempty"` + TraceId uint64 `json:"trace_id"` + SpanId uint64 `json:"span_id"` + ParentId *uint64 `json:"parent_id,omitempty"` + Name string `json:"name"` + Start uint64 `json:"start"` + Duration uint64 `json:"duration"` + Resource string `json:"resource"` + Error uint8 `json:"error"` + Meta map[string]string `json:"meta"` + Metrics map[string]string `json:"metrics"` + Service string `json:"service"` + Type *string `json:"type,omitempty"` } -func NewSpan(service string, traceId observe.TelemetryId, parentId *observe.TelemetryId, name string, start, end time.Time) *Span { +func NewSpan(service string, traceId uint64, parentId *uint64, name string, start, end time.Time) *Span { id := observe.NewSpanId() span := Span{ - SpanId: id, + SpanId: id.ToUint64(), ParentId: parentId, TraceId: traceId, Name: name, diff --git a/go/telemetry.go b/go/telemetry.go index ce29c4f..8520354 100644 --- a/go/telemetry.go +++ b/go/telemetry.go @@ -47,3 +47,8 @@ func (t TelemetryId) ToHex8() string { func (t TelemetryId) ToHex16() string { return fmt.Sprintf("%016x%016x", t.msb, t.lsb) } + +// Some adapters may need a raw representation +func (t TelemetryId) ToUint64() uint64 { + return t.lsb +} From 687af36db554f7cf9316341a1e19327fd71b8d52 Mon Sep 17 00:00:00 2001 From: Benjamin Eckel Date: Wed, 19 Jul 2023 13:21:00 -0500 Subject: [PATCH 10/12] apply tid changes from comments --- go/adapter.go | 6 +----- go/adapter/datadog/adapter.go | 5 ----- go/adapter/otel_stdout/adapter.go | 6 +----- go/trace_ctx.go | 2 +- 4 files changed, 3 insertions(+), 16 deletions(-) diff --git a/go/adapter.go b/go/adapter.go index 017f464..8f5dac0 100644 --- a/go/adapter.go +++ b/go/adapter.go @@ -2,7 +2,6 @@ package observe import ( "context" - "log" "github.com/tetratelabs/wazero" ) @@ -21,7 +20,7 @@ type Adapter interface { // from a single wasm module invocation type TraceEvent struct { Events []Event - TelemetryId *TelemetryId + TelemetryId TelemetryId } // Shared implementation for all Adapters @@ -51,10 +50,8 @@ func (b *AdapterBase) Start(a Adapter) { for { select { case event := <-b.TraceEvents: - log.Println("Adapter Got TraceEvent") a.HandleTraceEvent(event) case <-b.stop: - log.Println("Adapter Stopped") return } } @@ -62,6 +59,5 @@ func (b *AdapterBase) Start(a Adapter) { } func (b *AdapterBase) Stop() { - log.Println("Stopping adapter") b.stop <- true } diff --git a/go/adapter/datadog/adapter.go b/go/adapter/datadog/adapter.go index 62d334e..58615bc 100644 --- a/go/adapter/datadog/adapter.go +++ b/go/adapter/datadog/adapter.go @@ -52,11 +52,6 @@ func (d *DatadogAdapter) Stop() { } func (d *DatadogAdapter) HandleTraceEvent(te observe.TraceEvent) { - if te.TelemetryId == nil { - log.Println("Datadog adapter needs a trace id") - return - } - var allSpans []datadog_formatter.Span for _, e := range te.Events { switch event := e.(type) { diff --git a/go/adapter/otel_stdout/adapter.go b/go/adapter/otel_stdout/adapter.go index a9b6594..d8fe7f3 100644 --- a/go/adapter/otel_stdout/adapter.go +++ b/go/adapter/otel_stdout/adapter.go @@ -20,12 +20,8 @@ func NewOtelStdoutAdapter() OtelStdoutAdapter { AdapterBase: base, } } -func (o *OtelStdoutAdapter) HandleTraceEvent(te observe.TraceEvent) { - if te.TelemetryId == nil { - log.Println("Otel adapter needs a trace id") - return - } +func (o *OtelStdoutAdapter) HandleTraceEvent(te observe.TraceEvent) { traceId := te.TelemetryId.ToHex16() var allSpans []otel_formatter.Span diff --git a/go/trace_ctx.go b/go/trace_ctx.go index 32135af..c2c082c 100644 --- a/go/trace_ctx.go +++ b/go/trace_ctx.go @@ -64,7 +64,7 @@ func newTraceCtx(ctx context.Context, adapter *AdapterBase, r wazero.Runtime, da func (t *TraceCtx) Finish() { traceEvent := TraceEvent{ Events: t.events, - TelemetryId: &t.telemetryId, + TelemetryId: t.telemetryId, } t.adapter <- traceEvent // clear the trace context From a2cb84a1b684c8b6bc07db2fa1c00cea2ce06a4b Mon Sep 17 00:00:00 2001 From: Benjamin Eckel Date: Wed, 19 Jul 2023 14:21:02 -0500 Subject: [PATCH 11/12] Fix otel adapter memory grow events --- go/adapter/otel_stdout/adapter.go | 13 +------------ 1 file changed, 1 insertion(+), 12 deletions(-) diff --git a/go/adapter/otel_stdout/adapter.go b/go/adapter/otel_stdout/adapter.go index d8fe7f3..24b827f 100644 --- a/go/adapter/otel_stdout/adapter.go +++ b/go/adapter/otel_stdout/adapter.go @@ -33,18 +33,9 @@ func (o *OtelStdoutAdapter) HandleTraceEvent(te observe.TraceEvent) { allSpans = append(allSpans, spans...) } case observe.MemoryGrowEvent: - output := otel.New() span := otel.NewSpan(traceId, nil, "allocation", event.Time, event.Time) span.AddAttribute("amount", event.MemoryGrowAmount()) - resourceSpan := otel.NewResourceSpan() - resourceSpan.AddSpans([]otel.Span{*span}) - output.AddResourceSpan(*resourceSpan) - b, err := json.Marshal(output) - if err != nil { - log.Println("failed to encode MemoryGrowEvent spans") - } - - fmt.Println(string(b)) + allSpans = append(allSpans, *span) case observe.CustomEvent: log.Println("Otel adapter does not respect custom events") } @@ -55,8 +46,6 @@ func (o *OtelStdoutAdapter) HandleTraceEvent(te observe.TraceEvent) { return } - log.Println(allSpans) - output := otel.New() resourceSpan := otel.NewResourceSpan() resourceSpan.AddSpans(allSpans) From d17c0a83d655c27bb7214389863be5bcdec06c5b Mon Sep 17 00:00:00 2001 From: Steve Manuel Date: Wed, 19 Jul 2023 17:05:38 -0600 Subject: [PATCH 12/12] fix: include memory grow events as allocation attributes --- go/adapter/datadog/adapter.go | 14 ++++++++------ go/adapter/datadog_formatter/format.go | 2 +- go/adapter/otel_formatter/format.go | 6 +++--- go/adapter/otel_stdout/adapter.go | 14 ++++++++------ go/adapter/stdout/adapter.go | 3 +++ go/trace_ctx.go | 8 +++++++- 6 files changed, 30 insertions(+), 17 deletions(-) diff --git a/go/adapter/datadog/adapter.go b/go/adapter/datadog/adapter.go index 58615bc..a84389f 100644 --- a/go/adapter/datadog/adapter.go +++ b/go/adapter/datadog/adapter.go @@ -52,7 +52,7 @@ func (d *DatadogAdapter) Stop() { } func (d *DatadogAdapter) HandleTraceEvent(te observe.TraceEvent) { - var allSpans []datadog_formatter.Span + var allSpans []*datadog_formatter.Span for _, e := range te.Events { switch event := e.(type) { case observe.CallEvent: @@ -62,9 +62,7 @@ func (d *DatadogAdapter) HandleTraceEvent(te observe.TraceEvent) { allSpans = append(allSpans, spans...) } case observe.MemoryGrowEvent: - if len(allSpans) > 0 { - allSpans[len(allSpans)-1].AddAllocation(event.MemoryGrowAmount()) - } + log.Println("MemoryGrowEvent should be attached to a span") case observe.CustomEvent: log.Println("Datadog adapter does not respect custom events") } @@ -110,15 +108,19 @@ func (d *DatadogAdapter) HandleTraceEvent(te observe.TraceEvent) { }() } -func (d *DatadogAdapter) makeCallSpans(event observe.CallEvent, parentId *uint64, traceId uint64) []datadog_formatter.Span { +func (d *DatadogAdapter) makeCallSpans(event observe.CallEvent, parentId *uint64, traceId uint64) []*datadog_formatter.Span { name := event.FunctionName() span := datadog_formatter.NewSpan(d.Config.ServiceName, traceId, parentId, name, event.Time, event.Time.Add(event.Duration)) - spans := []datadog_formatter.Span{*span} + spans := []*datadog_formatter.Span{span} for _, ev := range event.Within() { if call, ok := ev.(observe.CallEvent); ok { spans = append(spans, d.makeCallSpans(call, &span.SpanId, traceId)...) } + if alloc, ok := ev.(observe.MemoryGrowEvent); ok { + span := spans[len(spans)-1] + span.AddAllocation(alloc.MemoryGrowAmount()) + } } return spans diff --git a/go/adapter/datadog_formatter/format.go b/go/adapter/datadog_formatter/format.go index 883a9bf..ebf4fe9 100644 --- a/go/adapter/datadog_formatter/format.go +++ b/go/adapter/datadog_formatter/format.go @@ -9,7 +9,7 @@ import ( type DatadogFormatter []Trace -type Trace []Span +type Trace []*Span type Span struct { TraceId uint64 `json:"trace_id"` diff --git a/go/adapter/otel_formatter/format.go b/go/adapter/otel_formatter/format.go index e180228..3a09371 100644 --- a/go/adapter/otel_formatter/format.go +++ b/go/adapter/otel_formatter/format.go @@ -32,7 +32,7 @@ func (r *ResourceSpan) AddAttribute(key string, value any) *ResourceSpan { return r } -func (r *ResourceSpan) AddSpans(spans []Span) { +func (r *ResourceSpan) AddSpans(spans []*Span) { r.ScopeSpans = append(r.ScopeSpans, ScopeSpan{ Scope: Scope{ Name: "event", @@ -46,8 +46,8 @@ type Resource struct { } type ScopeSpan struct { - Scope Scope `json:"scope"` - Spans []Span `json:"spans"` + Scope Scope `json:"scope"` + Spans []*Span `json:"spans"` } type Attribute struct { diff --git a/go/adapter/otel_stdout/adapter.go b/go/adapter/otel_stdout/adapter.go index 24b827f..5937d91 100644 --- a/go/adapter/otel_stdout/adapter.go +++ b/go/adapter/otel_stdout/adapter.go @@ -24,7 +24,7 @@ func NewOtelStdoutAdapter() OtelStdoutAdapter { func (o *OtelStdoutAdapter) HandleTraceEvent(te observe.TraceEvent) { traceId := te.TelemetryId.ToHex16() - var allSpans []otel_formatter.Span + var allSpans []*otel_formatter.Span for _, e := range te.Events { switch event := e.(type) { case observe.CallEvent: @@ -33,9 +33,7 @@ func (o *OtelStdoutAdapter) HandleTraceEvent(te observe.TraceEvent) { allSpans = append(allSpans, spans...) } case observe.MemoryGrowEvent: - span := otel.NewSpan(traceId, nil, "allocation", event.Time, event.Time) - span.AddAttribute("amount", event.MemoryGrowAmount()) - allSpans = append(allSpans, *span) + log.Println("MemoryGrowEvent should be attached to a span") case observe.CustomEvent: log.Println("Otel adapter does not respect custom events") } @@ -68,16 +66,20 @@ func (o *OtelStdoutAdapter) Stop() { o.AdapterBase.Stop() } -func (o *OtelStdoutAdapter) makeCallSpans(event observe.CallEvent, parentId *string, traceId string) []otel.Span { +func (o *OtelStdoutAdapter) makeCallSpans(event observe.CallEvent, parentId *string, traceId string) []*otel.Span { name := event.FunctionName() span := otel.NewSpan(traceId, parentId, name, event.Time, event.Time.Add(event.Duration)) span.AddAttribute("function_name", fmt.Sprintf("function-call-%s", name)) - spans := []otel.Span{*span} + spans := []*otel.Span{span} for _, ev := range event.Within() { if call, ok := ev.(observe.CallEvent); ok { spans = append(spans, o.makeCallSpans(call, &span.SpanId, traceId)...) } + if alloc, ok := ev.(observe.MemoryGrowEvent); ok { + last := spans[len(spans)-1] + last.AddAttribute("allocation", alloc.MemoryGrowAmount()) + } } return spans diff --git a/go/adapter/stdout/adapter.go b/go/adapter/stdout/adapter.go index 30b324a..1a970a2 100644 --- a/go/adapter/stdout/adapter.go +++ b/go/adapter/stdout/adapter.go @@ -23,6 +23,9 @@ func (s *StdoutAdapter) printEvents(event observe.CallEvent, indentation int) { if call, ok := event.(observe.CallEvent); ok { s.printEvents(call, indentation+1) } + if alloc, ok := event.(observe.MemoryGrowEvent); ok { + log.Println(strings.Repeat(" ", indentation), "Allocated", alloc.MemoryGrowAmount(), "pages of memory in", name) + } } } diff --git a/go/trace_ctx.go b/go/trace_ctx.go index c2c082c..c373c62 100644 --- a/go/trace_ctx.go +++ b/go/trace_ctx.go @@ -142,7 +142,13 @@ func (t *TraceCtx) init(ctx context.Context, r wazero.Runtime) error { Time: time.Now(), } - t.events = append(t.events, event) + fn, ok := t.popFunction() + if !ok { + t.events = append(t.events, event) + return + } + fn.within = append(fn.within, event) + t.pushFunction(fn) }).Export("instrument_memory_grow") _, err := observe.Instantiate(ctx)