Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: Refactor Go architecture #43

Merged
merged 12 commits into from
Jul 19, 2023
140 changes: 39 additions & 101 deletions go/adapter.go
Original file line number Diff line number Diff line change
@@ -1,125 +1,63 @@
package observe

import (
"bytes"
"errors"
"fmt"
"math/rand"
"time"
"context"

"github.com/tetratelabs/wabin/leb128"
"github.com/tetratelabs/wabin/wasm"
"github.com/tetratelabs/wazero"
)

// The primary interface that every Adapter needs to follow
// Start() and Stop() can just call the implementations on AdapterBase
// or provide some custom logic. HandleTraceEvent is called after
// an invocation of a wasm module is done and all events are collected.
type Adapter interface {
Start(collector *Collector, wasm []byte) error
Stop(collector *Collector)
Event(Event)
Start()
Stop()
HandleTraceEvent(TraceEvent)
}

type AdapterBase struct {
Collectors map[*Collector]chan bool
// The payload that contains all the Events
// from a single wasm module invocation
type TraceEvent struct {
Events []Event
TelemetryId TelemetryId
}

func checkVersion(m *wasm.Module) error {
var minorGlobal *wasm.Export = nil
var majorGlobal *wasm.Export = nil
for _, export := range m.ExportSection {
if export.Type != wasm.ExternTypeGlobal {
continue
}

if export.Name == "wasm_instr_version_minor" {
minorGlobal = export
} else if export.Name == "wasm_instr_version_major" {
majorGlobal = export
}
}

if minorGlobal == nil || majorGlobal == nil {
return errors.New("wasm_instr_version functions not found")
}

minor, _, err := leb128.DecodeUint32(bytes.NewReader(m.GlobalSection[minorGlobal.Index].Init.Data))
if err != nil {
return err
}
major, _, err := leb128.DecodeUint32(bytes.NewReader(m.GlobalSection[majorGlobal.Index].Init.Data))
if err != nil {
return err
}

if major != wasmInstrVersionMajor || minor < wasmInstrVersionMinor {
return errors.New(fmt.Sprintf("Expected instrumentation version >= %d.%d but got %d.%d", wasmInstrVersionMajor, wasmInstrVersionMinor, major, minor))
}

return nil
// Shared implementation for all Adapters
type AdapterBase struct {
TraceEvents chan TraceEvent
stop chan bool
}

func (a *AdapterBase) Wait(collector *Collector, timeout time.Duration, callback func()) {
for {
select {
case <-time.After(timeout):
if len(collector.Events) > 0 {
if callback != nil {
callback()
}
continue
}
a.RemoveCollector(collector)
return
}
func (a *AdapterBase) NewTraceCtx(ctx context.Context, r wazero.Runtime, wasm []byte, config *Config) (*TraceCtx, error) {
if config == nil {
config = NewDefaultConfig()
}
return newTraceCtx(ctx, a, r, wasm, config)
}

func NewAdapterBase() AdapterBase {
a := AdapterBase{
Collectors: map[*Collector]chan bool{},
}
return a
}

func (a *AdapterBase) Start(collector *Collector, wasm []byte) error {
a.Collectors[collector] = make(chan bool, 1)
return collector.GetNames(wasm)
}

func (a *AdapterBase) RemoveCollector(collector *Collector) {
delete(a.Collectors, collector)
}

func (a *AdapterBase) Stop(collector *Collector) {
stop, ok := a.Collectors[collector]
if ok {
stop <- true
a.RemoveCollector(collector)
return AdapterBase{
// TODO set to some kind of max, add dump logic
TraceEvents: make(chan TraceEvent, 100),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sends to this channel will block once the channel reaches 100 events. I think thats probably ok, but wondering if this is intentional. We can omit the buffered size and let the channel grow as needed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah i think non-buffered might be the way to go but let me think about it for a moment. my intention with using a buffered channel is if the adapter is busy then it may take some time to read off this channel which would block the sender right? By default sending a message to a channel blocks until the receiver synchronizes. Ideally the adapter just pulls messages off this channel and only puts them in a local slice, then it submits them offline in another goroutine. but i haven't done that yet. so this is acting at the bucket.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see -- yea that makes sense!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll make a follow up for that. Goal will be to add the bucket on the adapter side and test that this channel can't get blocked. We just need to make sure that the adapter's main routine only reads off this channel as fast as possible. And if it can't put the events in a bucket it should just throw them away and log an error. But never block.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SGTM!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#44

}
}

func (a AdapterBase) StopChan(collector *Collector) chan bool {
return a.Collectors[collector]
}

type TelemetryId uint64

var rng rand.Source
func (b *AdapterBase) Start(a Adapter) {
b.stop = make(chan bool)

func init() {
rng = rand.NewSource(time.Now().UnixNano())
}

func NewTraceId() TelemetryId {
return TelemetryId(rng.Int63())
}

func NewSpanId() TelemetryId {
return TelemetryId(rng.Int63())
}

func (t TelemetryId) ToHex8() string {
return fmt.Sprintf("%016x", t)
go func() {
for {
select {
case event := <-b.TraceEvents:
a.HandleTraceEvent(event)
case <-b.stop:
return
}
}
}()
}

func (t TelemetryId) ToHex16() string {
return fmt.Sprintf("%032x", t)
func (b *AdapterBase) Stop() {
b.stop <- true
}
100 changes: 32 additions & 68 deletions go/adapter/datadog/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@ import (
"log"
"net/http"
"net/url"
"strconv"
"time"

"github.com/dylibso/observe-sdk/go"
"github.com/dylibso/observe-sdk/go/adapter/datadog_formatter"
Expand All @@ -31,9 +29,7 @@ func DefaultDatadogConfig() *DatadogConfig {

type DatadogAdapter struct {
observe.AdapterBase
TraceId uint64
Spans []datadog_formatter.Span
Config *DatadogConfig
Config *DatadogConfig
}

func NewDatadogAdapter(config *DatadogConfig) (DatadogAdapter, error) {
Expand All @@ -43,76 +39,48 @@ func NewDatadogAdapter(config *DatadogConfig) (DatadogAdapter, error) {

return DatadogAdapter{
AdapterBase: observe.NewAdapterBase(),
TraceId: uint64(observe.NewTraceId()),
Config: config,
}, nil
}

func (d *DatadogAdapter) Event(e observe.Event) {
switch event := e.(type) {
case observe.CallEvent:
spans := d.makeCallSpans(event, nil)
if len(spans) > 0 {
d.Spans = append(d.Spans, spans...)
}

case observe.MemoryGrowEvent:
if len(d.Spans) > 0 {
d.Spans[len(d.Spans)-1].AddAllocation(event.MemoryGrowAmount())
}
case observe.CustomEvent:
if value, ok := event.Metadata["trace_id"]; ok {
traceId, err := strconv.ParseUint(value.(string), 10, 64)
if err != nil {
log.Println("failed to parse traceId from event metadata:", err)
return
}

d.TraceId = traceId
}
}
func (d *DatadogAdapter) Start() {
d.AdapterBase.Start(d)
}

func (d *DatadogAdapter) Wait(collector *observe.Collector, timeout time.Duration) {
d.AdapterBase.Wait(collector, timeout, nil)
func (d *DatadogAdapter) Stop() {
d.AdapterBase.Stop()
}

func (d *DatadogAdapter) Start(collector *observe.Collector, wasm []byte) error {
if err := d.AdapterBase.Start(collector, wasm); err != nil {
return err
}

stop := d.StopChan(collector)

go func() {
for {
select {
case event := <-collector.Events:
d.Event(event)
case <-stop:
return
func (d *DatadogAdapter) HandleTraceEvent(te observe.TraceEvent) {
var allSpans []*datadog_formatter.Span
for _, e := range te.Events {
switch event := e.(type) {
case observe.CallEvent:
traceId := te.TelemetryId.ToUint64()
spans := d.makeCallSpans(event, nil, traceId)
if len(spans) > 0 {
allSpans = append(allSpans, spans...)
}
case observe.MemoryGrowEvent:
log.Println("MemoryGrowEvent should be attached to a span")
case observe.CustomEvent:
log.Println("Datadog adapter does not respect custom events")
nilslice marked this conversation as resolved.
Show resolved Hide resolved
}
}()

return nil
}

func (d *DatadogAdapter) Stop(collector *observe.Collector) {
d.AdapterBase.Stop(collector)
}

if len(d.Spans) == 0 {
if len(allSpans) <= 1 {
log.Println("No spans built for datadog trace")
return
}

go func() {
output := datadog_formatter.New()
// TODO: for the moment, these are hard-coded, but will transition to a programmer-
// controlled API to customer these values.
d.Spans[0].Resource = "request"
allSpans[0].Resource = "request"
tt := d.Config.TraceType.String()
d.Spans[0].Type = &tt
output.AddTrace(d.Spans)
allSpans[0].Type = &tt
output.AddTrace(allSpans)

b, err := json.Marshal(output)
if err != nil {
Expand Down Expand Up @@ -140,28 +108,24 @@ func (d *DatadogAdapter) Stop(collector *observe.Collector) {
}()
}

func (d *DatadogAdapter) makeCallSpans(event observe.CallEvent, parentId *uint64) []datadog_formatter.Span {
func (d *DatadogAdapter) makeCallSpans(event observe.CallEvent, parentId *uint64, traceId uint64) []*datadog_formatter.Span {
name := event.FunctionName()
span := datadog_formatter.NewSpan(d.Config.ServiceName, d.TraceId, parentId, name, event.Time, event.Time.Add(event.Duration))
span := datadog_formatter.NewSpan(d.Config.ServiceName, traceId, parentId, name, event.Time, event.Time.Add(event.Duration))

spans := []datadog_formatter.Span{*span}
spans := []*datadog_formatter.Span{span}
for _, ev := range event.Within() {
if call, ok := ev.(observe.CallEvent); ok {
spans = append(spans, d.makeCallSpans(call, &span.SpanId)...)
spans = append(spans, d.makeCallSpans(call, &span.SpanId, traceId)...)
}
if alloc, ok := ev.(observe.MemoryGrowEvent); ok {
span := spans[len(spans)-1]
span.AddAllocation(alloc.MemoryGrowAmount())
}
}

return spans
}

func NewTraceId() uint64 {
return uint64(observe.NewTraceId())
}

func (d *DatadogAdapter) SetTraceId(traceId uint64) {
d.TraceId = traceId
}

type DatadogSpanKind int

const (
Expand Down
4 changes: 2 additions & 2 deletions go/adapter/datadog_formatter/format.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (

type DatadogFormatter []Trace

type Trace []Span
type Trace []*Span

type Span struct {
TraceId uint64 `json:"trace_id"`
Expand All @@ -29,7 +29,7 @@ type Span struct {
func NewSpan(service string, traceId uint64, parentId *uint64, name string, start, end time.Time) *Span {
id := observe.NewSpanId()
span := Span{
SpanId: uint64(id),
SpanId: id.ToUint64(),
ParentId: parentId,
TraceId: traceId,
Name: name,
Expand Down
6 changes: 3 additions & 3 deletions go/adapter/otel_formatter/format.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func (r *ResourceSpan) AddAttribute(key string, value any) *ResourceSpan {
return r
}

func (r *ResourceSpan) AddSpans(spans []Span) {
func (r *ResourceSpan) AddSpans(spans []*Span) {
r.ScopeSpans = append(r.ScopeSpans, ScopeSpan{
Scope: Scope{
Name: "event",
Expand All @@ -46,8 +46,8 @@ type Resource struct {
}

type ScopeSpan struct {
Scope Scope `json:"scope"`
Spans []Span `json:"spans"`
Scope Scope `json:"scope"`
Spans []*Span `json:"spans"`
}

type Attribute struct {
Expand Down
Loading
Loading