Skip to content

Commit

Permalink
Add logic to scale to zero on invalid offset even with earliest offse…
Browse files Browse the repository at this point in the history
…tResetPolicy (#5689)

Signed-off-by: dttung2905 <[email protected]>
  • Loading branch information
dttung2905 authored May 15, 2024
1 parent 613919b commit a375d17
Show file tree
Hide file tree
Showing 5 changed files with 195 additions and 26 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ Here is an overview of all new **experimental** features:
### Improvements

- **GCP Scalers**: Added custom time horizon in GCP scalers ([#5778](https://github.com/kedacore/keda/issues/5778))
- **Kafka**: Fix logic to scale to zero on invalid offset even with earliest offsetResetPolicy ([#5689](https://github.com/kedacore/keda/issues/5689))

### Fixes

Expand Down
3 changes: 3 additions & 0 deletions pkg/scalers/apache_kafka_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -531,6 +531,9 @@ func (s *apacheKafkaScaler) getLagForPartition(topic string, partitionID int, co
}
producerOffset := producerOffsets[topic][partitionID]
if consumerOffset == invalidOffset && s.metadata.offsetResetPolicy == earliest {
if s.metadata.scaleToZeroOnInvalidOffset {
return 0, 0, nil
}
return producerOffset, producerOffset, nil
}

Expand Down
3 changes: 3 additions & 0 deletions pkg/scalers/kafka_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -805,6 +805,9 @@ func (s *kafkaScaler) getLagForPartition(topic string, partitionID int32, offset
}
latestOffset := topicPartitionOffsets[topic][partitionID]
if consumerOffset == invalidOffset && s.metadata.offsetResetPolicy == earliest {
if s.metadata.scaleToZeroOnInvalidOffset {
return 0, 0, nil
}
return latestOffset, latestOffset, nil
}

Expand Down
105 changes: 93 additions & 12 deletions tests/scalers/apache_kafka/apache_kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ spec:
lagThreshold: '1'
offsetResetPolicy: 'latest'`

invalidOffsetScaledObjectTemplate = `
invalidOffsetWithLatestOffsetResetPolicyScaledObjectTemplate = `
apiVersion: keda.sh/v1alpha1
kind: ScaledObject
metadata:
Expand Down Expand Up @@ -243,6 +243,44 @@ spec:
scaleToZeroOnInvalidOffset: '{{.ScaleToZeroOnInvalid}}'
offsetResetPolicy: 'latest'`

invalidOffsetWithEarliestOffsetResetPolicyScaledObjectTemplate = `
apiVersion: keda.sh/v1alpha1
kind: ScaledObject
metadata:
name: {{.ScaledObjectName}}
namespace: {{.TestNamespace}}
labels:
app: {{.DeploymentName}}
spec:
pollingInterval: 5
cooldownPeriod: 0
scaleTargetRef:
name: {{.DeploymentName}}
advanced:
horizontalPodAutoscalerConfig:
behavior:
scaleUp:
stabilizationWindowSeconds: 0
policies:
- type: Percent
value: 100
periodSeconds: 15
scaleDown:
stabilizationWindowSeconds: 0
policies:
- type: Percent
value: 100
periodSeconds: 15
triggers:
- type: kafka
metadata:
topic: {{.TopicName}}
bootstrapServers: {{.BootstrapServer}}
consumerGroup: {{.ResetPolicy}}
lagThreshold: '1'
scaleToZeroOnInvalidOffset: '{{.ScaleToZeroOnInvalid}}'
offsetResetPolicy: 'earliest'`

persistentLagScaledObjectTemplate = `
apiVersion: keda.sh/v1alpha1
kind: ScaledObject
Expand Down Expand Up @@ -407,8 +445,10 @@ func TestScaler(t *testing.T) {
testEarliestPolicy(t, kc, data)
testLatestPolicy(t, kc, data)
testMultiTopic(t, kc, data)
testZeroOnInvalidOffset(t, kc, data)
testOneOnInvalidOffset(t, kc, data)
testZeroOnInvalidOffsetWithLatestOffsetResetPolicy(t, kc, data)
testZeroOnInvalidOffsetWithEarliestOffsetResetPolicy(t, kc, data)
testOneOnInvalidOffsetWithLatestOffsetResetPolicy(t, kc, data)
testOneOnInvalidOffsetWithEarliestOffsetResetPolicy(t, kc, data)
testPersistentLag(t, kc, data)
testScalingOnlyPartitionsWithLag(t, kc, data)
}
Expand Down Expand Up @@ -509,7 +549,7 @@ func testMultiTopic(t *testing.T, kc *kubernetes.Clientset, data templateData) {
"replica count should be %d after 2 minute", 2)
}

func testZeroOnInvalidOffset(t *testing.T, kc *kubernetes.Clientset, data templateData) {
func testZeroOnInvalidOffsetWithLatestOffsetResetPolicy(t *testing.T, kc *kubernetes.Clientset, data templateData) {
t.Log("--- testing zeroInvalidOffsetTopic: scale out ---")
data.Params = fmt.Sprintf("--topic %s --group %s", zeroInvalidOffsetTopic, invalidOffsetGroup)
data.Commit = StringTrue
Expand All @@ -518,14 +558,54 @@ func testZeroOnInvalidOffset(t *testing.T, kc *kubernetes.Clientset, data templa
data.ScaleToZeroOnInvalid = StringTrue
KubectlApplyWithTemplate(t, data, "singleDeploymentTemplate", singleDeploymentTemplate)
defer KubectlDeleteWithTemplate(t, data, "singleDeploymentTemplate", singleDeploymentTemplate)
KubectlApplyWithTemplate(t, data, "invalidOffsetScaledObjectTemplate", invalidOffsetScaledObjectTemplate)
defer KubectlDeleteWithTemplate(t, data, "invalidOffsetScaledObjectTemplate", invalidOffsetScaledObjectTemplate)
KubectlApplyWithTemplate(t, data, "invalidOffsetWithLatestOffsetResetPolicyScaledObjectTemplate", invalidOffsetWithLatestOffsetResetPolicyScaledObjectTemplate)
defer KubectlDeleteWithTemplate(t, data, "invalidOffsetWithLatestOffsetResetPolicyScaledObjectTemplate", invalidOffsetWithLatestOffsetResetPolicyScaledObjectTemplate)

// Shouldn't scale pods
AssertReplicaCountNotChangeDuringTimePeriod(t, kc, deploymentName, testNamespace, 0, 30)
}

func testOneOnInvalidOffset(t *testing.T, kc *kubernetes.Clientset, data templateData) {
func testZeroOnInvalidOffsetWithEarliestOffsetResetPolicy(t *testing.T, kc *kubernetes.Clientset, data templateData) {
t.Log("--- testing zeroInvalidOffsetTopicWithEarliestOffsetResetPolicy: scale out ---")
data.Params = fmt.Sprintf("--topic %s --group %s", zeroInvalidOffsetTopic, invalidOffsetGroup)
data.Commit = StringTrue
data.TopicName = zeroInvalidOffsetTopic
data.ResetPolicy = invalidOffsetGroup
data.ScaleToZeroOnInvalid = StringTrue
KubectlApplyWithTemplate(t, data, "singleDeploymentTemplate", singleDeploymentTemplate)
defer KubectlDeleteWithTemplate(t, data, "singleDeploymentTemplate", singleDeploymentTemplate)
KubectlApplyWithTemplate(t, data, "invalidOffsetWithEarliestOffsetResetPolicyScaledObjectTemplate", invalidOffsetWithEarliestOffsetResetPolicyScaledObjectTemplate)
defer KubectlDeleteWithTemplate(t, data, "invalidOffsetWithEarliestOffsetResetPolicyScaledObjectTemplate", invalidOffsetWithEarliestOffsetResetPolicyScaledObjectTemplate)

// Shouldn't scale pods
AssertReplicaCountNotChangeDuringTimePeriod(t, kc, deploymentName, testNamespace, 0, 30)
}

func testOneOnInvalidOffsetWithLatestOffsetResetPolicy(t *testing.T, kc *kubernetes.Clientset, data templateData) {
t.Log("--- testing oneInvalidOffsetTopicWithLatestOffsetResetPolicy: scale out ---")
data.Params = fmt.Sprintf("--topic %s --group %s --from-beginning", oneInvalidOffsetTopic, invalidOffsetGroup)
data.Commit = StringTrue
data.TopicName = oneInvalidOffsetTopic
data.ResetPolicy = invalidOffsetGroup
data.ScaleToZeroOnInvalid = StringFalse
KubectlApplyWithTemplate(t, data, "singleDeploymentTemplate", singleDeploymentTemplate)
defer KubectlDeleteWithTemplate(t, data, "singleDeploymentTemplate", singleDeploymentTemplate)
KubectlApplyWithTemplate(t, data, "invalidOffsetWithLatestOffsetResetPolicyScaledObjectTemplate", invalidOffsetWithLatestOffsetResetPolicyScaledObjectTemplate)
defer KubectlDeleteWithTemplate(t, data, "invalidOffsetWithLatestOffsetResetPolicyScaledObjectTemplate", invalidOffsetWithLatestOffsetResetPolicyScaledObjectTemplate)

// Should scale to 1
assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, 1, 60, 2),
"replica count should be %d after 2 minute", 1)

commitPartition(t, oneInvalidOffsetTopic, invalidOffsetGroup)
publishMessage(t, oneInvalidOffsetTopic)

// Should scale to 0
assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, 0, 60, 10),
"replica count should be %d after 10 minute", 0)
}

func testOneOnInvalidOffsetWithEarliestOffsetResetPolicy(t *testing.T, kc *kubernetes.Clientset, data templateData) {
t.Log("--- testing oneInvalidOffsetTopic: scale out ---")
data.Params = fmt.Sprintf("--topic %s --group %s --from-beginning", oneInvalidOffsetTopic, invalidOffsetGroup)
data.Commit = StringTrue
Expand All @@ -534,8 +614,9 @@ func testOneOnInvalidOffset(t *testing.T, kc *kubernetes.Clientset, data templat
data.ScaleToZeroOnInvalid = StringFalse
KubectlApplyWithTemplate(t, data, "singleDeploymentTemplate", singleDeploymentTemplate)
defer KubectlDeleteWithTemplate(t, data, "singleDeploymentTemplate", singleDeploymentTemplate)
KubectlApplyWithTemplate(t, data, "invalidOffsetScaledObjectTemplate", invalidOffsetScaledObjectTemplate)
defer KubectlDeleteWithTemplate(t, data, "invalidOffsetScaledObjectTemplate", invalidOffsetScaledObjectTemplate)
publishMessage(t, oneInvalidOffsetTopic) // So that the latest offset is not 0
KubectlApplyWithTemplate(t, data, "invalidOffsetWithEarliestOffsetResetPolicyScaledObjectTemplate", invalidOffsetWithEarliestOffsetResetPolicyScaledObjectTemplate)
defer KubectlDeleteWithTemplate(t, data, "invalidOffsetWithEarliestOffsetResetPolicyScaledObjectTemplate", invalidOffsetWithEarliestOffsetResetPolicyScaledObjectTemplate)

// Should scale to 1
assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, 1, 60, 2),
Expand Down Expand Up @@ -570,7 +651,7 @@ func testPersistentLag(t *testing.T, kc *kubernetes.Clientset, data templateData
t.Log("--- testing persistentLag: no scale out ---")

// Simulate Consumption from topic by consumer group
// To avoid edge case where where scaling could be effectively disabled (Consumer never makes a commit)
// To avoid edge case where scaling could be effectively disabled (Consumer never makes a commit)
data.Params = fmt.Sprintf("--topic %s --group %s --from-beginning", persistentLagTopic, persistentLagGroup)
data.Commit = StringTrue
data.TopicName = persistentLagTopic
Expand All @@ -583,7 +664,7 @@ func testPersistentLag(t *testing.T, kc *kubernetes.Clientset, data templateData
publishMessage(t, persistentLagTopic)
assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, 1, 60, 2),
"replica count should be %d after 2 minute", 1)
// Recreate Deployment to delibrately assign different consumer group to deployment and scaled object
// Recreate Deployment to deliberately assign different consumer group to deployment and scaled object
// This is to simulate inability to consume from topic
// Scaled Object remains unchanged
KubernetesScaleDeployment(t, kc, deploymentName, 0, testNamespace)
Expand Down Expand Up @@ -613,7 +694,7 @@ func testScalingOnlyPartitionsWithLag(t *testing.T, kc *kubernetes.Clientset, da
t.Log("--- testing limitToPartitionsWithLag: no scale out ---")

// Simulate Consumption from topic by consumer group
// To avoid edge case where where scaling could be effectively disabled (Consumer never makes a commit)
// To avoid edge case where scaling could be effectively disabled (Consumer never makes a commit)
commitPartition(t, limitToPartitionsWithLagTopic, "latest")

data.Params = fmt.Sprintf("--topic %s --group %s", limitToPartitionsWithLagTopic, limitToPartitionsWithLagGroup)
Expand Down
109 changes: 95 additions & 14 deletions tests/scalers/kafka/kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ spec:
lagThreshold: '1'
offsetResetPolicy: 'latest'`

invalidOffsetScaledObjectTemplate = `
invalidOffsetWithLatestOffsetResetPolicyScaledObjectTemplate = `
apiVersion: keda.sh/v1alpha1
kind: ScaledObject
metadata:
Expand Down Expand Up @@ -242,6 +242,44 @@ spec:
scaleToZeroOnInvalidOffset: '{{.ScaleToZeroOnInvalid}}'
offsetResetPolicy: 'latest'`

invalidOffsetWithEarliestOffsetResetPolicyScaledObjectTemplate = `
apiVersion: keda.sh/v1alpha1
kind: ScaledObject
metadata:
name: {{.ScaledObjectName}}
namespace: {{.TestNamespace}}
labels:
app: {{.DeploymentName}}
spec:
pollingInterval: 5
cooldownPeriod: 0
scaleTargetRef:
name: {{.DeploymentName}}
advanced:
horizontalPodAutoscalerConfig:
behavior:
scaleUp:
stabilizationWindowSeconds: 0
policies:
- type: Percent
value: 100
periodSeconds: 15
scaleDown:
stabilizationWindowSeconds: 0
policies:
- type: Percent
value: 100
periodSeconds: 15
triggers:
- type: kafka
metadata:
topic: {{.TopicName}}
bootstrapServers: {{.BootstrapServer}}
consumerGroup: {{.ResetPolicy}}
lagThreshold: '1'
scaleToZeroOnInvalidOffset: '{{.ScaleToZeroOnInvalid}}'
offsetResetPolicy: 'earliest'`

persistentLagScaledObjectTemplate = `
apiVersion: keda.sh/v1alpha1
kind: ScaledObject
Expand Down Expand Up @@ -406,8 +444,10 @@ func TestScaler(t *testing.T) {
testEarliestPolicy(t, kc, data)
testLatestPolicy(t, kc, data)
testMultiTopic(t, kc, data)
testZeroOnInvalidOffset(t, kc, data)
testOneOnInvalidOffset(t, kc, data)
testZeroOnInvalidOffsetWithLatestOffsetResetPolicy(t, kc, data)
testZeroOnInvalidOffsetWithEarliestOffsetResetPolicy(t, kc, data)
testOneOnInvalidOffsetWithLatestOffsetResetPolicy(t, kc, data)
testOneOnInvalidOffsetWithEarliestOffsetResetPolicy(t, kc, data)
testPersistentLag(t, kc, data)
testScalingOnlyPartitionsWithLag(t, kc, data)
}
Expand Down Expand Up @@ -507,33 +547,74 @@ func testMultiTopic(t *testing.T, kc *kubernetes.Clientset, data templateData) {
"replica count should be %d after 2 minute", 2)
}

func testZeroOnInvalidOffset(t *testing.T, kc *kubernetes.Clientset, data templateData) {
t.Log("--- testing zeroInvalidOffsetTopic: scale out ---")
func testZeroOnInvalidOffsetWithLatestOffsetResetPolicy(t *testing.T, kc *kubernetes.Clientset, data templateData) {
t.Log("--- testing zeroInvalidOffsetTopicWithLatestOffsetResetPolicy: scale out ---")
data.Params = fmt.Sprintf("--topic %s --group %s", zeroInvalidOffsetTopic, invalidOffsetGroup)
data.Commit = StringTrue
data.TopicName = zeroInvalidOffsetTopic
data.ResetPolicy = invalidOffsetGroup
data.ScaleToZeroOnInvalid = StringTrue
KubectlApplyWithTemplate(t, data, "singleDeploymentTemplate", singleDeploymentTemplate)
defer KubectlDeleteWithTemplate(t, data, "singleDeploymentTemplate", singleDeploymentTemplate)
KubectlApplyWithTemplate(t, data, "invalidOffsetScaledObjectTemplate", invalidOffsetScaledObjectTemplate)
defer KubectlDeleteWithTemplate(t, data, "invalidOffsetScaledObjectTemplate", invalidOffsetScaledObjectTemplate)
KubectlApplyWithTemplate(t, data, "invalidOffsetWithLatestOffsetResetPolicyScaledObjectTemplate", invalidOffsetWithLatestOffsetResetPolicyScaledObjectTemplate)
defer KubectlDeleteWithTemplate(t, data, "invalidOffsetWithLatestOffsetResetPolicyScaledObjectTemplate", invalidOffsetWithLatestOffsetResetPolicyScaledObjectTemplate)

// Shouldn't scale pods
AssertReplicaCountNotChangeDuringTimePeriod(t, kc, deploymentName, testNamespace, 0, 30)
}

func testOneOnInvalidOffset(t *testing.T, kc *kubernetes.Clientset, data templateData) {
t.Log("--- testing oneInvalidOffsetTopic: scale out ---")
func testZeroOnInvalidOffsetWithEarliestOffsetResetPolicy(t *testing.T, kc *kubernetes.Clientset, data templateData) {
t.Log("--- testing zeroInvalidOffsetTopicWithEarliestOffsetResetPolicy: scale out ---")
data.Params = fmt.Sprintf("--topic %s --group %s", zeroInvalidOffsetTopic, invalidOffsetGroup)
data.Commit = StringTrue
data.TopicName = zeroInvalidOffsetTopic
data.ResetPolicy = invalidOffsetGroup
data.ScaleToZeroOnInvalid = StringTrue
KubectlApplyWithTemplate(t, data, "singleDeploymentTemplate", singleDeploymentTemplate)
defer KubectlDeleteWithTemplate(t, data, "singleDeploymentTemplate", singleDeploymentTemplate)
KubectlApplyWithTemplate(t, data, "invalidOffsetWithEarliestOffsetResetPolicyScaledObjectTemplate", invalidOffsetWithEarliestOffsetResetPolicyScaledObjectTemplate)
defer KubectlDeleteWithTemplate(t, data, "invalidOffsetWithEarliestOffsetResetPolicyScaledObjectTemplate", invalidOffsetWithEarliestOffsetResetPolicyScaledObjectTemplate)

// Shouldn't scale pods
AssertReplicaCountNotChangeDuringTimePeriod(t, kc, deploymentName, testNamespace, 0, 30)
}

func testOneOnInvalidOffsetWithLatestOffsetResetPolicy(t *testing.T, kc *kubernetes.Clientset, data templateData) {
t.Log("--- testing oneInvalidOffsetTopicWithLatestOffsetResetPolicy: scale out ---")
data.Params = fmt.Sprintf("--topic %s --group %s --from-beginning", oneInvalidOffsetTopic, invalidOffsetGroup)
data.Commit = StringTrue
data.TopicName = oneInvalidOffsetTopic
data.ResetPolicy = invalidOffsetGroup
data.ScaleToZeroOnInvalid = StringFalse
KubectlApplyWithTemplate(t, data, "singleDeploymentTemplate", singleDeploymentTemplate)
defer KubectlDeleteWithTemplate(t, data, "singleDeploymentTemplate", singleDeploymentTemplate)
KubectlApplyWithTemplate(t, data, "invalidOffsetWithLatestOffsetResetPolicyScaledObjectTemplate", invalidOffsetWithLatestOffsetResetPolicyScaledObjectTemplate)
defer KubectlDeleteWithTemplate(t, data, "invalidOffsetWithLatestOffsetResetPolicyScaledObjectTemplate", invalidOffsetWithLatestOffsetResetPolicyScaledObjectTemplate)

// Should scale to 1
assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, 1, 60, 2),
"replica count should be %d after 2 minute", 1)

commitPartition(t, oneInvalidOffsetTopic, invalidOffsetGroup)
publishMessage(t, oneInvalidOffsetTopic)

// Should scale to 0
assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, 0, 60, 10),
"replica count should be %d after 10 minute", 0)
}

func testOneOnInvalidOffsetWithEarliestOffsetResetPolicy(t *testing.T, kc *kubernetes.Clientset, data templateData) {
t.Log("--- testing oneInvalidOffsetTopicWithEarliestOffsetResetPolicy: scale out ---")
data.Params = fmt.Sprintf("--topic %s --group %s --from-beginning", oneInvalidOffsetTopic, invalidOffsetGroup)
data.Commit = StringTrue
data.TopicName = oneInvalidOffsetTopic
data.ResetPolicy = invalidOffsetGroup
data.ScaleToZeroOnInvalid = StringFalse
KubectlApplyWithTemplate(t, data, "singleDeploymentTemplate", singleDeploymentTemplate)
defer KubectlDeleteWithTemplate(t, data, "singleDeploymentTemplate", singleDeploymentTemplate)
KubectlApplyWithTemplate(t, data, "invalidOffsetScaledObjectTemplate", invalidOffsetScaledObjectTemplate)
defer KubectlDeleteWithTemplate(t, data, "invalidOffsetScaledObjectTemplate", invalidOffsetScaledObjectTemplate)
publishMessage(t, oneInvalidOffsetTopic) // So that the latest offset is not 0
KubectlApplyWithTemplate(t, data, "invalidOffsetWithEarliestOffsetResetPolicyScaledObjectTemplate", invalidOffsetWithEarliestOffsetResetPolicyScaledObjectTemplate)
defer KubectlDeleteWithTemplate(t, data, "invalidOffsetWithEarliestOffsetResetPolicyScaledObjectTemplate", invalidOffsetWithEarliestOffsetResetPolicyScaledObjectTemplate)

// Should scale to 1
assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, 1, 60, 2),
Expand Down Expand Up @@ -568,7 +649,7 @@ func testPersistentLag(t *testing.T, kc *kubernetes.Clientset, data templateData
t.Log("--- testing persistentLag: no scale out ---")

// Simulate Consumption from topic by consumer group
// To avoid edge case where where scaling could be effectively disabled (Consumer never makes a commit)
// To avoid edge case where scaling could be effectively disabled (Consumer never makes a commit)
data.Params = fmt.Sprintf("--topic %s --group %s --from-beginning", persistentLagTopic, persistentLagGroup)
data.Commit = StringTrue
data.TopicName = persistentLagTopic
Expand All @@ -581,7 +662,7 @@ func testPersistentLag(t *testing.T, kc *kubernetes.Clientset, data templateData
publishMessage(t, persistentLagTopic)
assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, 1, 60, 2),
"replica count should be %d after 2 minute", 1)
// Recreate Deployment to delibrately assign different consumer group to deployment and scaled object
// Recreate Deployment to deliberately assign different consumer group to deployment and scaled object
// This is to simulate inability to consume from topic
// Scaled Object remains unchanged
KubernetesScaleDeployment(t, kc, deploymentName, 0, testNamespace)
Expand Down Expand Up @@ -611,7 +692,7 @@ func testScalingOnlyPartitionsWithLag(t *testing.T, kc *kubernetes.Clientset, da
t.Log("--- testing limitToPartitionsWithLag: no scale out ---")

// Simulate Consumption from topic by consumer group
// To avoid edge case where where scaling could be effectively disabled (Consumer never makes a commit)
// To avoid edge case where scaling could be effectively disabled (Consumer never makes a commit)
commitPartition(t, limitToPartitionsWithLagTopic, "latest")

data.Params = fmt.Sprintf("--topic %s --group %s", limitToPartitionsWithLagTopic, limitToPartitionsWithLagGroup)
Expand Down

0 comments on commit a375d17

Please sign in to comment.