Skip to content

Commit

Permalink
add new dapr checkpoint strategy to eventhub scaler
Browse files Browse the repository at this point in the history
Signed-off-by: Christian Leinweber <[email protected]>
  • Loading branch information
christle committed Oct 8, 2022
1 parent 14ece68 commit fd720f0
Show file tree
Hide file tree
Showing 3 changed files with 108 additions and 0 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ To learn more about active deprecations, we recommend checking [GitHub Discussio
- **General:** Add `Min` column to ScaledJob visualization ([#3689](https://github.com/kedacore/keda/issues/3689))
- **Azure AD Pod Identity Authentication:** Improve error messages to emphasize problems around the integration with aad-pod-identity itself ([#3610](https://github.com/kedacore/keda/issues/3610))
- **Prometheus Scaler:** Introduce skipping of certificate check for unsigned certs ([#2310](https://github.com/kedacore/keda/issues/2310))
- **Eventhub Scaler:** Add new Checkpoint Strategy for Dapr ([#3022](https://github.com/kedacore/keda/issues/3022))

### Fixes

Expand Down
34 changes: 34 additions & 0 deletions pkg/scalers/azure/azure_eventhub_checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,11 @@ type goSdkCheckpointer struct {
containerName string
}

type daprCheckpointer struct {
partitionID string
containerName string
}

type defaultCheckpointer struct {
partitionID string
containerName string
Expand All @@ -100,6 +105,11 @@ func newCheckpointer(info EventHubInfo, partitionID string) checkpointer {
containerName: info.BlobContainer,
partitionID: partitionID,
}
case (info.CheckpointStrategy == "dapr"):
return &daprCheckpointer{
containerName: info.BlobContainer,
partitionID: partitionID,
}
case (info.CheckpointStrategy == "blobMetadata"):
return &blobMetadataCheckpointer{
containerName: info.BlobContainer,
Expand Down Expand Up @@ -174,8 +184,32 @@ func (checkpointer *goSdkCheckpointer) resolvePath(info EventHubInfo) (*url.URL,
return path, nil
}

// resolve path for daprCheckpointer
func (checkpointer *daprCheckpointer) resolvePath(info EventHubInfo) (*url.URL, error) {
_, eventHubName, err := getHubAndNamespace(info)
if err != nil {
return nil, err
}

path, err := url.Parse(fmt.Sprintf("/%s/dapr-%s-%s-%s", info.BlobContainer, eventHubName, info.EventHubConsumerGroup, checkpointer.partitionID))
if err != nil {
return nil, err
}

return path, nil
}

// extract checkpoint for DaprCheckpointer
func (checkpointer *daprCheckpointer) extractCheckpoint(get *azblob.DownloadResponse) (Checkpoint, error) {
return newGoSdkCheckpoint(get)
}

// extract checkpoint for goSdkCheckpointer
func (checkpointer *goSdkCheckpointer) extractCheckpoint(get *azblob.DownloadResponse) (Checkpoint, error) {
return newGoSdkCheckpoint(get)
}

func newGoSdkCheckpoint(get *azblob.DownloadResponse) (Checkpoint, error) {
var checkpoint goCheckpoint
err := readToCheckpointFromBody(get, &checkpoint)
if err != nil {
Expand Down
73 changes: 73 additions & 0 deletions pkg/scalers/azure/azure_eventhub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,49 @@ func TestCheckpointFromBlobStorageGoSdk(t *testing.T) {
assert.Equal(t, check, expectedCheckpoint)
}

func TestCheckpointFromBlobStorageDapr(t *testing.T) {
if StorageConnectionString == "" {
return
}

partitionID := "0"
offset := "1004"
consumerGroup := "$default"
eventhubName := "hub"

sequencenumber := int64(1)

containerName := fmt.Sprintf("dapr-%s-%s-%s", eventhubName, consumerGroup, partitionID)
checkpointFormat := "{\"partitionID\":\"%s\",\"epoch\":0,\"owner\":\"\",\"checkpoint\":{\"offset\":\"%s\",\"sequenceNumber\":%d,\"enqueueTime\":\"\"},\"state\":\"\",\"token\":\"\"}"
checkpoint := fmt.Sprintf(checkpointFormat, partitionID, offset, sequencenumber)

urlPath := ""

ctx, err := createNewCheckpointInStorage(urlPath, containerName, partitionID, checkpoint, nil)
assert.Equal(t, err, nil)

expectedCheckpoint := Checkpoint{
baseCheckpoint: baseCheckpoint{
Offset: offset,
},
PartitionID: partitionID,
SequenceNumber: sequencenumber,
}

eventHubInfo := EventHubInfo{
EventHubConnection: fmt.Sprintf("Endpoint=sb://eventhubnamespace.servicebus.windows.net/;EntityPath=%s", eventhubName),
StorageConnection: StorageConnectionString,
EventHubName: eventhubName,
BlobContainer: containerName,
CheckpointStrategy: "dapr",
}

check, _ := GetCheckpointFromBlobStorage(ctx, http.DefaultClient, eventHubInfo, partitionID)
_ = check.Offset
_ = expectedCheckpoint.Offset
assert.Equal(t, check, expectedCheckpoint)
}

func TestShouldParseCheckpointForFunction(t *testing.T) {
eventHubInfo := EventHubInfo{
EventHubConnection: "Endpoint=sb://eventhubnamespace.servicebus.windows.net/;EntityPath=hub-test",
Expand Down Expand Up @@ -368,6 +411,36 @@ func TestShouldParseCheckpointForGoSdk(t *testing.T) {
assert.Equal(t, url.Path, "/containername/0")
}

func TestShouldParseCheckpointForDapr(t *testing.T) {
eventHubInfo := EventHubInfo{
EventHubConnection: "Endpoint=sb://eventhubnamespace.servicebus.windows.net/;EntityPath=hub-test",
EventHubConsumerGroup: "$default",
BlobContainer: "containername",
CheckpointStrategy: "dapr",
}

cp := newCheckpointer(eventHubInfo, "0")
url, _ := cp.resolvePath(eventHubInfo)

assert.Equal(t, url.Path, "/containername/dapr-hub-test-$default-0")
}

func TestShouldParseCheckpointForDaprWithPodIdentity(t *testing.T) {
eventHubInfo := EventHubInfo{
Namespace: "eventhubnamespace",
EventHubName: "hub-test",
EventHubConsumerGroup: "$default",
ServiceBusEndpointSuffix: "servicebus.windows.net",
BlobContainer: "containername",
CheckpointStrategy: "dapr",
}

cp := newCheckpointer(eventHubInfo, "0")
url, _ := cp.resolvePath(eventHubInfo)

assert.Equal(t, url.Path, "/containername/dapr-hub-test-$default-0")
}

func createNewCheckpointInStorage(urlPath string, containerName string, partitionID string, checkpoint string, metadata map[string]string) (context.Context, error) {
ctx := context.Background()

Expand Down

0 comments on commit fd720f0

Please sign in to comment.