From 312e9fe1003443afa16c6da90de2522ce3b147a7 Mon Sep 17 00:00:00 2001 From: Christian Leinweber Date: Fri, 30 Sep 2022 17:23:57 +0200 Subject: [PATCH] feat: add new dapr checkpoint strategy to eventhub scaler Signed-off-by: Christian Leinweber --- CHANGELOG.md | 1 + .../azure/azure_eventhub_checkpoint.go | 34 +++++++++ pkg/scalers/azure/azure_eventhub_test.go | 73 +++++++++++++++++++ 3 files changed, 108 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index c92082e01b9..d97f13e589c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -50,6 +50,7 @@ To learn more about active deprecations, we recommend checking [GitHub Discussio - **General:** Add `Min` column to ScaledJob visualization ([#3689](https://github.com/kedacore/keda/issues/3689)) - **Azure AD Pod Identity Authentication:** Improve error messages to emphasize problems around the integration with aad-pod-identity itself ([#3610](https://github.com/kedacore/keda/issues/3610)) - **Prometheus Scaler:** Introduce skipping of certificate check for unsigned certs ([#2310](https://github.com/kedacore/keda/issues/2310)) +- **Eventhub Scaler:** Add new Checkpoint Strategy for Dapr ([#3022](https://github.com/kedacore/keda/issues/3022)) ### Fixes diff --git a/pkg/scalers/azure/azure_eventhub_checkpoint.go b/pkg/scalers/azure/azure_eventhub_checkpoint.go index f9aa53f742f..cfbb96ac331 100644 --- a/pkg/scalers/azure/azure_eventhub_checkpoint.go +++ b/pkg/scalers/azure/azure_eventhub_checkpoint.go @@ -82,6 +82,11 @@ type goSdkCheckpointer struct { containerName string } +type daprCheckpointer struct { + partitionID string + containerName string +} + type defaultCheckpointer struct { partitionID string containerName string @@ -100,6 +105,11 @@ func newCheckpointer(info EventHubInfo, partitionID string) checkpointer { containerName: info.BlobContainer, partitionID: partitionID, } + case (info.CheckpointStrategy == "dapr"): + return &daprCheckpointer{ + containerName: info.BlobContainer, + partitionID: partitionID, + } case (info.CheckpointStrategy == "blobMetadata"): return &blobMetadataCheckpointer{ containerName: info.BlobContainer, @@ -174,8 +184,32 @@ func (checkpointer *goSdkCheckpointer) resolvePath(info EventHubInfo) (*url.URL, return path, nil } +// resolve path for daprCheckpointer +func (checkpointer *daprCheckpointer) resolvePath(info EventHubInfo) (*url.URL, error) { + _, eventHubName, err := getHubAndNamespace(info) + if err != nil { + return nil, err + } + + path, err := url.Parse(fmt.Sprintf("/%s/dapr-%s-%s-%s", info.BlobContainer, eventHubName, info.EventHubConsumerGroup, checkpointer.partitionID)) + if err != nil { + return nil, err + } + + return path, nil +} + +// extract checkpoint for DaprCheckpointer +func (checkpointer *daprCheckpointer) extractCheckpoint(get *azblob.DownloadResponse) (Checkpoint, error) { + return newGoSdkCheckpoint(get) +} + // extract checkpoint for goSdkCheckpointer func (checkpointer *goSdkCheckpointer) extractCheckpoint(get *azblob.DownloadResponse) (Checkpoint, error) { + return newGoSdkCheckpoint(get) +} + +func newGoSdkCheckpoint(get *azblob.DownloadResponse) (Checkpoint, error) { var checkpoint goCheckpoint err := readToCheckpointFromBody(get, &checkpoint) if err != nil { diff --git a/pkg/scalers/azure/azure_eventhub_test.go b/pkg/scalers/azure/azure_eventhub_test.go index 8594224113d..f7121b5d25c 100644 --- a/pkg/scalers/azure/azure_eventhub_test.go +++ b/pkg/scalers/azure/azure_eventhub_test.go @@ -227,6 +227,49 @@ func TestCheckpointFromBlobStorageGoSdk(t *testing.T) { assert.Equal(t, check, expectedCheckpoint) } +func TestCheckpointFromBlobStorageDapr(t *testing.T) { + if StorageConnectionString == "" { + return + } + + partitionID := "0" + offset := "1004" + consumerGroup := "$default" + eventhubName := "hub" + + sequencenumber := int64(1) + + containerName := fmt.Sprintf("dapr-%s-%s-%s", eventhubName, consumerGroup, partitionID) + checkpointFormat := "{\"partitionID\":\"%s\",\"epoch\":0,\"owner\":\"\",\"checkpoint\":{\"offset\":\"%s\",\"sequenceNumber\":%d,\"enqueueTime\":\"\"},\"state\":\"\",\"token\":\"\"}" + checkpoint := fmt.Sprintf(checkpointFormat, partitionID, offset, sequencenumber) + + urlPath := "" + + ctx, err := createNewCheckpointInStorage(urlPath, containerName, partitionID, checkpoint, nil) + assert.Equal(t, err, nil) + + expectedCheckpoint := Checkpoint{ + baseCheckpoint: baseCheckpoint{ + Offset: offset, + }, + PartitionID: partitionID, + SequenceNumber: sequencenumber, + } + + eventHubInfo := EventHubInfo{ + EventHubConnection: fmt.Sprintf("Endpoint=sb://eventhubnamespace.servicebus.windows.net/;EntityPath=%s", eventhubName), + StorageConnection: StorageConnectionString, + EventHubName: eventhubName, + BlobContainer: containerName, + CheckpointStrategy: "dapr", + } + + check, _ := GetCheckpointFromBlobStorage(ctx, http.DefaultClient, eventHubInfo, partitionID) + _ = check.Offset + _ = expectedCheckpoint.Offset + assert.Equal(t, check, expectedCheckpoint) +} + func TestShouldParseCheckpointForFunction(t *testing.T) { eventHubInfo := EventHubInfo{ EventHubConnection: "Endpoint=sb://eventhubnamespace.servicebus.windows.net/;EntityPath=hub-test", @@ -368,6 +411,36 @@ func TestShouldParseCheckpointForGoSdk(t *testing.T) { assert.Equal(t, url.Path, "/containername/0") } +func TestShouldParseCheckpointForDapr(t *testing.T) { + eventHubInfo := EventHubInfo{ + EventHubConnection: "Endpoint=sb://eventhubnamespace.servicebus.windows.net/;EntityPath=hub-test", + EventHubConsumerGroup: "$default", + BlobContainer: "containername", + CheckpointStrategy: "dapr", + } + + cp := newCheckpointer(eventHubInfo, "0") + url, _ := cp.resolvePath(eventHubInfo) + + assert.Equal(t, url.Path, "/containername/dapr-hub-test-$default-0") +} + +func TestShouldParseCheckpointForDaprWithPodIdentity(t *testing.T) { + eventHubInfo := EventHubInfo{ + Namespace: "eventhubnamespace", + EventHubName: "hub-test", + EventHubConsumerGroup: "$default", + ServiceBusEndpointSuffix: "servicebus.windows.net", + BlobContainer: "containername", + CheckpointStrategy: "dapr", + } + + cp := newCheckpointer(eventHubInfo, "0") + url, _ := cp.resolvePath(eventHubInfo) + + assert.Equal(t, url.Path, "/containername/dapr-hub-test-$default-0") +} + func createNewCheckpointInStorage(urlPath string, containerName string, partitionID string, checkpoint string, metadata map[string]string) (context.Context, error) { ctx := context.Background()