Skip to content

Commit

Permalink
return custom fields from kflowInit
Browse files Browse the repository at this point in the history
  • Loading branch information
wg committed Dec 13, 2016
1 parent 218464f commit a5ed432
Show file tree
Hide file tree
Showing 6 changed files with 104 additions and 77 deletions.
4 changes: 3 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
MAIN := github.com/kentik/libkflow
PKGS := $(MAIN) $(MAIN)/api $(MAIN)/chf

CFLAGS += -std=c99

OS := $(shell go env GOOS)
ARCH := $(shell go env GOARCH)
TARGET := $(OS)-$(ARCH)
Expand Down Expand Up @@ -45,7 +47,7 @@ $(WORK)/server: $(SRC)
go build -o $@ $(MAIN)/cmd/server

$(WORK)/demo: $(MAIN)/kflow.h $(CURDIR)/demo.c $(WORK)/libkflow.a
$(CC) $(LDFLAGS) -o $@ -I $(<D) $(filter-out $<,$^)
$(CC) $(CFLAGS) $(LDFLAGS) -o $@ -I $(<D) $(filter-out $<,$^)

test:
@go test -v $(shell go list ./... | grep -v /vendor/)
Expand Down
22 changes: 16 additions & 6 deletions demo.c
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include "kflow.h"

int main(int argc, char **argv) {
Expand All @@ -20,18 +21,27 @@ int main(int argc, char **argv) {
.device_id = 1,
.verbose = 1,
};
kflowCustom *customs;
uint32_t numCustoms;

if ((r = kflowInit(&cfg)) != 0) {
if ((r = kflowInit(&cfg, &customs, &numCustoms)) != 0) {
printf("error initializing libkflow: %d\n", r);
goto error;
};

char *url = "http://foo.com";
kflowCustom customs[] = {
{ .name = KFLOWCUSTOM_HTTP_URL, .vtype = KFLOWCUSTOMSTR, .value.str = url },
{ .name = KFLOWCUSTOM_HTTP_STATUS, .vtype = KFLOWCUSTOMU32, .value.u32 = 200 },
};
uint32_t numCustoms = sizeof(customs) / sizeof(kflowCustom);

for (uint32_t i = 0; i < numCustoms; i++) {
if (!strcmp(customs[i].name, KFLOWCUSTOM_HTTP_URL)) {
customs[i].value.str = url;
} else if (!strcmp(customs[i].name, KFLOWCUSTOM_HTTP_STATUS)) {
customs[i].value.u32 = 200;
} else {
free(customs[i].name);
memcpy(&customs[i], &customs[i+1], sizeof(kflowCustom)*(numCustoms-i));
numCustoms--; i--;
}
}

kflow flow = {
.deviceId = cfg.device_id,
Expand Down
42 changes: 16 additions & 26 deletions src/github.com/kentik/libkflow/api/types.go
Original file line number Diff line number Diff line change
@@ -1,47 +1,37 @@
package api

import (
"encoding/json"
"fmt"
"regexp"
"strconv"
"strings"
)

type DeviceResponse struct {
Device Device `json:"device"`
}

type Device struct {
ID int `json:"id,string"`
Name string `json:"device_name"`
MaxFlowRate int `json:"max_flow_rate"`
CompanyID int `json:"company_id,string"`
Customs CustomColumns `json:"custom_columns,omitempty"`
ID int `json:"id,string"`
Name string `json:"device_name"`
MaxFlowRate int `json:"max_flow_rate"`
CompanyID int `json:"company_id,string"`
Customs []Column `json:"custom_column_data,omitempty"`
}

type CustomColumns map[string]uint64
type Column struct {
ID uint64 `json:"id,string"`
Name string `json:"col_name"`
Type string `json:"col_type"`
}

func (d *Device) ClientID() string {
return fmt.Sprintf("%d:%s:%d", d.CompanyID, d.Name, d.ID)
}

func (c *CustomColumns) UnmarshalJSON(data []byte) error {
m := map[string]uint64{}
for _, match := range split.FindAllSubmatchIndex(data, -1) {
key := string(data[match[2]:match[3]])
val := string(data[match[4]:match[5]])
m[key], _ = strconv.ParseUint(val, 10, 64)
}
*c = m
return nil
func (c *Column) UnmarshalFlag(value string) error {
return json.Unmarshal([]byte(value), c)
}

func (c *CustomColumns) MarshalJSON() ([]byte, error) {
kvs := make([]string, 0, len(*c))
for k, v := range *c {
kvs = append(kvs, fmt.Sprintf("%s=%d", k, v))
}
return []byte(`"` + strings.Join(kvs, ",") + `"`), nil
func (c Column) MarshalFlag() (string, error) {
b, err := json.Marshal(c)
return string(b), err
}

var split = regexp.MustCompile(`([\w-]+)=(\d+),?`)
50 changes: 25 additions & 25 deletions src/github.com/kentik/libkflow/cmd/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,15 @@ import (
)

type Args struct {
Host string `short:"h" description:"listen on host"`
Port int `short:"p" description:"listen on port"`
TLS bool `long:"tls" description:"require TLS "`
Email string `long:"email" description:"API auth email"`
Token string `long:"token" description:"API auth token"`
CompanyID int `long:"company-id" description:"company ID "`
DeviceID int `long:"device-id" description:"device ID "`
DeviceName string `long:"device-name" description:"device name "`
Customs map[string]uint64 `long:"custom" description:"custom fields "`
Host string `short:"h" description:"listen on host"`
Port int `short:"p" description:"listen on port"`
TLS bool `long:"tls" description:"require TLS "`
Email string `long:"email" description:"API auth email"`
Token string `long:"token" description:"API auth token"`
CompanyID int `long:"company-id" description:"company ID "`
DeviceID int `long:"device-id" description:"device ID "`
DeviceName string `long:"device-name" description:"device name "`
Customs []api.Column `long:"custom" description:"custom fields "`
}

func main() {
Expand All @@ -31,22 +31,22 @@ func main() {
CompanyID: 1,
DeviceID: 1,
DeviceName: "dev1",
Customs: map[string]uint64{
"RETRANSMITTED_IN_PKTS": 1,
"RETRANSMITTED_OUT_PKTS": 2,
"FRAGMENTS": 3,
"CLIENT_NW_LATENCY_MS": 4,
"SERVER_NW_LATENCY_MS": 5,
"APPL_LATENCY_MS": 6,
"OOORDER_IN_PKTS": 7,
"OOORDER_OUT_PKTS": 8,
"KFLOW_HTTP_URL": 9,
"KFLOW_HTTP_STATUS": 10,
"KFLOW_HTTP_UA": 11,
"KFLOW_HTTP_REFERER": 12,
"KFLOW_DNS_QUERY": 13,
"KFLOW_DNS_QUERY_TYPE": 14,
"KFLOW_DNS_RET_CODE": 15,
Customs: []api.Column{
{ID: 1, Type: "uint32", Name: "RETRANSMITTED_IN_PKTS"},
{ID: 2, Type: "uint32", Name: "RETRANSMITTED_OUT_PKTS"},
{ID: 3, Type: "uint32", Name: "FRAGMENTS"},
{ID: 4, Type: "uint32", Name: "CLIENT_NW_LATENCY_MS"},
{ID: 5, Type: "uint32", Name: "SERVER_NW_LATENCY_MS"},
{ID: 6, Type: "uint32", Name: "APPL_LATENCY_MS"},
{ID: 7, Type: "uint32", Name: "OOORDER_IN_PKTS"},
{ID: 8, Type: "uint32", Name: "OOORDER_OUT_PKTS"},
{ID: 9, Type: "string", Name: "KFLOW_HTTP_URL"},
{ID: 10, Type: "uint32", Name: "KFLOW_HTTP_STATUS"},
{ID: 11, Type: "string", Name: "KFLOW_HTTP_UA"},
{ID: 12, Type: "string", Name: "KFLOW_HTTP_REFERER"},
{ID: 13, Type: "string", Name: "KFLOW_DNS_QUERY"},
{ID: 14, Type: "uint32", Name: "KFLOW_DNS_QUERY_TYPE"},
{ID: 15, Type: "uint32", Name: "KFLOW_DNS_RET_CODE"},
},
}

Expand Down
10 changes: 7 additions & 3 deletions src/github.com/kentik/libkflow/kflow.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#define KFLOW_H

#include <stdint.h>
#include <stdlib.h>

// struct kflowConfig defines the flow sending configuration.
typedef struct {
Expand Down Expand Up @@ -129,9 +130,12 @@ typedef struct {
} kflow;

// kflowInit initializes the library and must be called prior
// to any other functions. It returns 0 on success or an error
// code on failure.
int kflowInit(kflowConfig *);
// to any other functions. If a non-NULL pointer is passed as
// the second parameter it will be set to an array of
// kflowCustom structs containing the custom columns supported
// by the configured device, which must be freed by the caller.
// kflowInit returns 0 on success or an error code on failure.
int kflowInit(kflowConfig *, kflowCustom **, uint32_t *);

// kflowSend asynchronously dispatches a kflow record to the
// server. All fields of the record are copied and may be
Expand Down
53 changes: 37 additions & 16 deletions src/github.com/kentik/libkflow/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ var sender *Sender
var errors chan error

//export kflowInit
func kflowInit(cfg *C.kflowConfig) C.int {
func kflowInit(cfg *C.kflowConfig, customs **C.kflowCustom, n *C.uint32_t) C.int {
errors = make(chan error, 100)

url, err := url.Parse(C.GoString(cfg.URL))
Expand Down Expand Up @@ -49,6 +49,8 @@ func kflowInit(cfg *C.kflowConfig) C.int {
return C.EKFLOWCONFIG
}

populateCustoms(device, customs, n)

interval := time.Duration(cfg.metrics.interval) * time.Second
metrics := NewMetrics(device.ClientID())
metrics.Start(C.GoString(cfg.metrics.URL), email, token, interval)
Expand Down Expand Up @@ -77,21 +79,6 @@ func kflowSend(cflow *C.kflow) C.int {
return C.EKFLOWNOINIT
}

customs := *(*[]C.kflowCustom)(unsafe.Pointer(&reflect.SliceHeader{
Data: (uintptr)(unsafe.Pointer(cflow.customs)),
Len: int(cflow.numCustoms),
Cap: int(cflow.numCustoms),
}))

for i, c := range customs {
name := C.GoString(c.name)
id, ok := sender.Device.Customs[name]
if !ok {
return C.EKFLOWNOCUSTOM
}
customs[i].id = (C.uint64_t)(id)
}

kflow, err := Pack(sender.Segment(), (*Ckflow)(cflow))
if err != nil {
errors <- err
Expand Down Expand Up @@ -126,5 +113,39 @@ func kflowError() *C.char {
}
}

func populateCustoms(device *api.Device, ptr **C.kflowCustom, cnt *C.uint32_t) {
if ptr == nil || cnt == nil {
return
}

n := len(device.Customs)
*ptr = (*C.kflowCustom)(C.calloc(C.size_t(n), C.sizeof_kflowCustom))
*cnt = C.uint32_t(n)

customs := *(*[]C.kflowCustom)(unsafe.Pointer(&reflect.SliceHeader{
Data: (uintptr)(unsafe.Pointer(*ptr)),
Len: int(n),
Cap: int(n),
}))

for i, c := range device.Customs {
var vtype C.int
switch c.Type {
case "string":
vtype = C.KFLOWCUSTOMSTR
case "uint32":
vtype = C.KFLOWCUSTOMU32
case "float32":
vtype = C.KFLOWCUSTOMF32
}

customs[i] = C.kflowCustom{
id: C.uint64_t(c.ID),
name: C.CString(c.Name),
vtype: vtype,
}
}
}

func main() {
}

0 comments on commit a5ed432

Please sign in to comment.