Skip to content

Commit

Permalink
Merge pull request #10 from opencost/atm/integrate-rates
Browse files Browse the repository at this point in the history
use rate based pricing to get better real-time view
  • Loading branch information
teevans authored Mar 11, 2024
2 parents 3694d01 + e0eb1cf commit 538d81d
Show file tree
Hide file tree
Showing 3 changed files with 99 additions and 20 deletions.
96 changes: 79 additions & 17 deletions datadog/cmd/main/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,56 +125,68 @@ func boilerplateDDCustomCost(win opencost.Window) pb.CustomCostResponse {
}
func (d *DatadogCostSource) getDDCostsForWindow(window opencost.Window, listPricing *datadogplugin.PricingInformation) *pb.CustomCostResponse {
ccResp := boilerplateDDCustomCost(window)
params := datadogV2.NewGetHourlyUsageOptionalParameters()
params.FilterTimestampEnd = window.End()

nextPageId := "init"
for morepages := true; morepages; morepages = (nextPageId != "") {
params := datadogV2.NewGetHourlyUsageOptionalParameters()
if nextPageId != "init" {
params.PageNextRecordId = &nextPageId
}
if d.rateLimiter.Tokens() < 1.0 {
log.Infof("datadog rate limit reached. holding request until rate capacity is back")
}

err := d.rateLimiter.Wait(context.TODO())
err := d.rateLimiter.WaitN(context.TODO(), 2)
if err != nil {
log.Errorf("error waiting on rate limiter`: %v\n", err)
ccResp.Errors = append(ccResp.Errors, err.Error())
return &ccResp
}

params.FilterTimestampEnd = window.End()
resp, r, err := d.usageApi.GetHourlyUsage(d.ddCtx, *window.Start(), "all", *params)
if err != nil {
log.Errorf("Error when calling `UsageMeteringApi.GetHourlyUsage`: %v\n", err)
log.Errorf("Full HTTP response: %v\n", r)
ccResp.Errors = append(ccResp.Errors, err.Error())
}

for _, hourlyUsageData := range resp.Data {
for _, meas := range hourlyUsageData.Attributes.Measurements {
// many datadog usages are given in terms of a cumulative month to date usage
// therefore, make a call for the hour before this hour to get a comparison
// where needed
params.FilterTimestampEnd = window.Start()
toSub := window.End().Sub(*window.Start())
respPriorWindow, r, err := d.usageApi.GetHourlyUsage(d.ddCtx, (*window.Start()).Add(-toSub), "all", *params)
if err != nil {
log.Errorf("Error when calling `UsageMeteringApi.GetHourlyUsage`: %v\n", err)
log.Errorf("Full HTTP response: %v\n", r)
ccResp.Errors = append(ccResp.Errors, err.Error())
}

for index := range resp.Data {
for indexMeas := range resp.Data[index].Attributes.Measurements {
usageQty := float32(0.0)

if meas.Value.IsSet() {
usageQty = float32(meas.GetValue())
if resp.Data[index].Attributes.Measurements[indexMeas].Value.IsSet() {
usageQty = GetUsageQuantity(*resp.Data[index].Attributes.ProductFamily, resp.Data[index].Attributes.Measurements[indexMeas], respPriorWindow.Data[index].Attributes.Measurements[indexMeas])
}

if usageQty == 0.0 {
log.Tracef("product %s/%s had 0 usage, not recording that cost", *hourlyUsageData.Attributes.ProductFamily, *meas.UsageType)
log.Tracef("product %s/%s had 0 usage, not recording that cost", *resp.Data[index].Attributes.ProductFamily, *resp.Data[index].Attributes.Measurements[indexMeas].UsageType)
continue
}

desc, usageUnit, pricing, currency := getListingInfo(*hourlyUsageData.Attributes.ProductFamily, *meas.UsageType, listPricing)
desc, usageUnit, pricing, currency := getListingInfo(window, *resp.Data[index].Attributes.ProductFamily, *resp.Data[index].Attributes.Measurements[indexMeas].UsageType, listPricing)
ccResp.Currency = currency
cost := pb.CustomCost{
Zone: *hourlyUsageData.Attributes.Region,
AccountName: *hourlyUsageData.Attributes.OrgName,
Zone: *resp.Data[index].Attributes.Region,
AccountName: *resp.Data[index].Attributes.OrgName,
ChargeCategory: "usage",
Description: desc,
ResourceName: *meas.UsageType,
ResourceType: *hourlyUsageData.Attributes.ProductFamily,
Id: *hourlyUsageData.Id,
ProviderId: *hourlyUsageData.Attributes.PublicId + "/" + *meas.UsageType,
ResourceName: *resp.Data[index].Attributes.Measurements[indexMeas].UsageType,
ResourceType: *resp.Data[index].Attributes.ProductFamily,
Id: *resp.Data[index].Id,
ProviderId: *resp.Data[index].Attributes.PublicId + "/" + *resp.Data[index].Attributes.Measurements[indexMeas].UsageType,
Labels: map[string]string{},
ListCost: usageQty * pricing,
ListUnitPrice: pricing,
Expand All @@ -195,6 +207,23 @@ func (d *DatadogCostSource) getDDCostsForWindow(window opencost.Window, listPric
return &ccResp
}

// we have two basic types usages: cumulative and rate
// rate usages are e.g., number of infra hosts, that have fixed costs per hour
// cumulative usages are e.g., number of logs ingested, that have a fixed cost per unit
// if a usage is cumulative, then suptract the usage in the hour prior to get the incremental usage
// if a usage is a rate, then just return the usage
func GetUsageQuantity(productFamily string, currentPeriodUsage, previousPeriodUsage datadogV2.HourlyUsageMeasurement) float32 {
curUsage := currentPeriodUsage.GetValue()
if _, found := rateFamilies[productFamily]; found {
// this family is a rate family, so just return the usage
return float32(curUsage)
}

prevUsage := previousPeriodUsage.GetValue()

return float32(curUsage - prevUsage)
}

// the public pricing used in the pricing list doesn't always match the usage reports
// therefore, we maintain a list of aliases
var usageToPricingMap = map[string]string{
Expand All @@ -214,6 +243,8 @@ var usageToPricingMap = map[string]string{
"ingested_events_bytes": "ingested_logs",
"logs_live_ingested_bytes": "ingested_logs",
"logs_rehydrated_ingested_bytes": "ingested_logs",
"indexed_events_count": "indexed_logs",
"logs_live_indexed_count": "indexed_logs",
"synthetics_api": "api_tests",
"synthetics_browser": "browser_checks",
"tasks_count": "fargate_tasks",
Expand All @@ -223,7 +254,23 @@ var usageToPricingMap = map[string]string{
"invocations_sum": "serverless_inv",
}

func getListingInfo(productfamily string, usageType string, listPricing *datadogplugin.PricingInformation) (description string, usageUnit string, pricing float32, currency string) {
var pricingMap = map[string]float64{
"custom_metrics": 100.0,
"indexed_logs": 1000000.0,
"ingested_logs": 1024.0 * 1024.0 * 1024.0 * 1024.0,
"api_tests": 10000.0,
"browser_checks": 1000.0,
"rum_events": 10000.0,
"security_logs": 1024.0 * 1024.0 * 1024.0 * 1024.0,
"serverless_inv": 1000000.0,
}

var rateFamilies = map[string]int{
"infra_hosts": 730.0,
"apm_hosts": 730.0,
}

func getListingInfo(window opencost.Window, productfamily string, usageType string, listPricing *datadogplugin.PricingInformation) (description string, usageUnit string, pricing float32, currency string) {
pricingKey := ""
var found bool
// first, check if the usage type is mapped to a pricing key
Expand All @@ -249,7 +296,22 @@ func getListingInfo(productfamily string, usageType string, listPricing *datadog
if err != nil {
log.Errorf("error converting string to float for rate: %s", detail.OneMonths.Rate)
}
pricing = float32(pricingFloat)

// if the family is a rate family, then the pricing is per hour
if hourlyPriceDenominator, found := rateFamilies[pricingKey]; found {
// adjust the pricing to fit the window duration
pricingPerHour := float32(pricingFloat) / float32(hourlyPriceDenominator)
pricing = pricingPerHour * float32(window.Duration().Hours())
} else {
// if the family is a cumulative family, then the pricing is per unit
// check for a scale factor on the pricing
if scalefactor, found := pricingMap[pricingKey]; found {
pricing = float32(pricingFloat) / float32(scalefactor)
} else {
pricing = float32(pricingFloat)
}
}

}
}

Expand Down
1 change: 1 addition & 0 deletions datadog/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ go 1.21.6

require (
github.com/DataDog/datadog-api-client-go/v2 v2.23.0
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc
github.com/hashicorp/go-plugin v1.6.0
github.com/opencost/opencost-plugins/test v0.0.0-20240307142929-df4df8ee69fa
github.com/opencost/opencost/core v0.0.0-20240307141548-816f98c9051a
Expand Down
22 changes: 19 additions & 3 deletions datadog/tests/datadog_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"testing"
"time"

"github.com/davecgh/go-spew/spew"
datadogplugin "github.com/opencost/opencost-plugins/datadog/datadogplugin"
harness "github.com/opencost/opencost-plugins/test/pkg/harness"
"github.com/opencost/opencost/core/pkg/log"
Expand Down Expand Up @@ -73,9 +74,9 @@ func TestDDCostRetrieval(t *testing.T) {
parent := filepath.Dir(filename)
pluginRoot := filepath.Dir(parent)
pluginFile := pluginRoot + "/cmd/main/main.go"
windowStart := time.Date(2024, 2, 27, 0, 0, 0, 0, time.UTC)
windowStart := time.Date(2024, 3, 8, 0, 0, 0, 0, time.UTC)
// query for qty 2 of 1 hour windows
windowEnd := time.Date(2024, 2, 27, 2, 0, 0, 0, time.UTC)
windowEnd := time.Date(2024, 3, 8, 2, 0, 0, 0, time.UTC)

req := pb.CustomCostRequest{
Start: timestamppb.New(windowStart),
Expand Down Expand Up @@ -106,11 +107,26 @@ func TestDDCostRetrieval(t *testing.T) {
t.Fatalf("unexpected domain. expected datadog, got %s", resp.Domain)
}
}
// confirm there are > 0 custom costs

// check some attributes of the cost response
for _, resp := range response {
// confirm there are > 0 custom costs
if len(resp.Costs) < 1 {
t.Fatalf("expect non-zero costs in response.")
}

for _, cost := range resp.Costs {
if cost.ResourceType == "indexed_logs" {
// check for sane values fo a rate-priced resource
if cost.ListCost > 1000 {
spew.Dump(
cost,
)
t.Fatalf("unexpectedly high cost for indexed logs: %f", cost.ListCost)

}
}
}
}

}

0 comments on commit 538d81d

Please sign in to comment.