Skip to content

Commit

Permalink
feat: add subscription dispatch rate to namespace resource (#118)
Browse files Browse the repository at this point in the history
* feat: add subscription dispatch rate to namespace resource

* fix golangci-ling error

* update tests to cater for new namespace attribute
  • Loading branch information
efcasado authored Apr 20, 2024
1 parent 5fd42a9 commit 969d5ef
Show file tree
Hide file tree
Showing 5 changed files with 92 additions and 15 deletions.
29 changes: 18 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,12 @@ resource "pulsar_namespace" "test" {
dispatch_byte_throttling_rate = 2048
}
subscription_dispatch_rate {
dispatch_msg_throttling_rate = 50
rate_period_seconds = 50
dispatch_byte_throttling_rate = 2048
}
retention_policies {
retention_minutes = "1600"
retention_size_in_mb = "10000"
Expand Down Expand Up @@ -249,17 +255,18 @@ resource "pulsar_namespace" "test" {

#### Properties

| Property | Description | Required |
| ---------------------- | --------------------------------------------------------------------------------------------------------------------------------------------------------- | -------- |
| `tenant` | Name of the Tenant managing this namespace | Yes |
| `namespace` | name of the namespace | Yes |
| `enable_deduplication` | Message deduplication state on a namespace | No |
| `namespace_config` | Configuration for your namespaces like max allowed producers to produce messages | No |
| `dispatch_rate` | Apache Pulsar throttling config | No |
| `retention_policies` | Data retention policies | No |
| `backlog_quota` | [Backlog Quota](https://pulsar.apache.org/docs/en/admin-api-namespaces/#set-backlog-quota-policies) for all topics | No |
| `persistence_policies` | [Persistence policies](https://pulsar.apache.org/docs/en/admin-api-namespaces/#set-persistence-policies) for all topics under a given namespace | No |
| `permission_grant` | [Permission grants](https://pulsar.apache.org/docs/en/admin-api-permissions/) on a namespace. This block can be repeated for each grant you'd like to add | No |
| Property | Description | Required |
| ---------------------------- | --------------------------------------------------------------------------------------------------------------------------------------------------------- | -------- |
| `tenant` | Name of the Tenant managing this namespace | Yes |
| `namespace` | name of the namespace | Yes |
| `enable_deduplication` | Message deduplication state on a namespace | No |
| `namespace_config` | Configuration for your namespaces like max allowed producers to produce messages | No |
| `dispatch_rate` | [Apache Pulsar throttling config for topics](https://pulsar.apache.org/docs/en/admin-api-namespaces/#set-dispatch-throttling-for-topics) | No |
| `subscription_dispatch_rate` | [Apache Pulsar throttling config for subscriptions](https://pulsar.apache.org/docs/en/admin-api-namespaces/#set-dispatch-throttling-for-subscription) | No |
| `retention_policies` | Data retention policies | No |
| `backlog_quota` | [Backlog Quota](https://pulsar.apache.org/docs/en/admin-api-namespaces/#set-backlog-quota-policies) for all topics | No |
| `persistence_policies` | [Persistence policies](https://pulsar.apache.org/docs/en/admin-api-namespaces/#set-persistence-policies) for all topics under a given namespace | No |
| `permission_grant` | [Permission grants](https://pulsar.apache.org/docs/en/admin-api-permissions/) on a namespace. This block can be repeated for each grant you'd like to add | No |

namespace_config nested schema

Expand Down
14 changes: 13 additions & 1 deletion docs/resources/namespace.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@ description: |-
### Optional

- `backlog_quota` (Block Set) (see [below for nested schema](#nestedblock--backlog_quota))
- `dispatch_rate` (Block Set, Max: 1) Data transfer rate, in and out of the Pulsar Broker (
- `dispatch_rate` (Block Set, Max: 1) Data transfer rate for all the topics under the given namespace (
see [below for nested schema](#nestedblock--dispatch_rate))
- `subscription_dispatch_rate` (Block Set, Max: 1) Data transfer rate for all the subscriptions under the given
namespace (see [below for nested schema](#nestedblock--subscription_dispatch_rate))
- `enable_deduplication` (Boolean)
- `namespace_config` (Block Set, Max: 1) (see [below for nested schema](#nestedblock--namespace_config))
- `permission_grant` (Block Set) (see [below for nested schema](#nestedblock--permission_grant))
Expand Down Expand Up @@ -54,6 +56,16 @@ Required:
- `dispatch_msg_throttling_rate` (Number)
- `rate_period_seconds` (Number)

<a id="nestedblock--subscription_dispatch_rate"></a>

### Nested Schema for `subscription_dispatch_rate`

Required:

- `dispatch_byte_throttling_rate` (Number)
- `dispatch_msg_throttling_rate` (Number)
- `rate_period_seconds` (Number)

<a id="nestedblock--namespace_config"></a>

### Nested Schema for `namespace_config`
Expand Down
3 changes: 2 additions & 1 deletion pulsar/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ func init() {
"max_consumers_per_subscription": "Max number of consumers per subscription",
"max_consumers_per_topic": "Max number of consumers per topic",
"message_ttl_seconds": "Sets the message time to live",
"dispatch_rate": "Data transfer rate, in and out of the Pulsar Broker",
"dispatch_rate": "Data transfer rate for all the topics under the given namespace",
"subscription_dispatch_rate": "Data transfer rate for all the subscriptions under the given namespace",
"persistence_policy": "Policy for the namespace for data persistence",
"backlog_quota": "",
"issuer_url": "The OAuth 2.0 URL of the authentication provider which allows the Pulsar client to obtain an access token",
Expand Down
47 changes: 47 additions & 0 deletions pulsar/resource_pulsar_namespace.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,29 @@ func resourcePulsarNamespace() *schema.Resource {
},
Set: dispatchRateToHash,
},
"subscription_dispatch_rate": {
Type: schema.TypeSet,
Optional: true,
Description: descriptions["subscription_dispatch_rate"],
MaxItems: 1,
Elem: &schema.Resource{
Schema: map[string]*schema.Schema{
"dispatch_msg_throttling_rate": {
Type: schema.TypeInt,
Required: true,
},
"rate_period_seconds": {
Type: schema.TypeInt,
Required: true,
},
"dispatch_byte_throttling_rate": {
Type: schema.TypeInt,
Required: true,
},
},
},
Set: dispatchRateToHash,
},
"retention_policies": {
Type: schema.TypeSet,
Optional: true,
Expand Down Expand Up @@ -426,6 +449,21 @@ func resourcePulsarNamespaceRead(ctx context.Context, d *schema.ResourceData, me
}))
}

if subscriptionDispatchRateCfg, ok := d.GetOk("subscription_dispatch_rate"); ok && subscriptionDispatchRateCfg.(*schema.Set).Len() > 0 { //nolint:lll
sdr, err := client.GetSubscriptionDispatchRate(*ns)
if err != nil {
return diag.FromErr(fmt.Errorf("ERROR_READ_NAMESPACE: GetSubscriptionDispatchRate: %w", err))
}

_ = d.Set("subscription_dispatch_rate", schema.NewSet(dispatchRateToHash, []interface{}{
map[string]interface{}{
"dispatch_msg_throttling_rate": sdr.DispatchThrottlingRateInMsg,
"rate_period_seconds": sdr.RatePeriodInSecond,
"dispatch_byte_throttling_rate": int(sdr.DispatchThrottlingRateInByte),
},
}))
}

if permissionGrantCfg, ok := d.GetOk("permission_grant"); ok && len(permissionGrantCfg.(*schema.Set).List()) > 0 {
grants, err := client.GetNamespacePermissions(*ns)
if err != nil {
Expand Down Expand Up @@ -465,6 +503,7 @@ func resourcePulsarNamespaceUpdate(ctx context.Context, d *schema.ResourceData,
retentionPoliciesConfig := d.Get("retention_policies").(*schema.Set)
backlogQuotaConfig := d.Get("backlog_quota").(*schema.Set)
dispatchRateConfig := d.Get("dispatch_rate").(*schema.Set)
subscriptionDispatchRateConfig := d.Get("subscription_dispatch_rate").(*schema.Set)
persistencePoliciesConfig := d.Get("persistence_policies").(*schema.Set)
permissionGrantConfig := d.Get("permission_grant").(*schema.Set)
topicAutoCreation := d.Get("topic_auto_creation").(*schema.Set)
Expand Down Expand Up @@ -560,6 +599,13 @@ func resourcePulsarNamespaceUpdate(ctx context.Context, d *schema.ResourceData,
}
}

if subscriptionDispatchRateConfig.Len() > 0 {
subscriptionDispatchRate := unmarshalDispatchRate(subscriptionDispatchRateConfig)
if err = client.SetSubscriptionDispatchRate(*nsName, *subscriptionDispatchRate); err != nil {
errs = multierror.Append(errs, fmt.Errorf("SetSubscriptionDispatchRate: %w", err))
}
}

if persistencePoliciesConfig.Len() > 0 {
persistencePolicies := unmarshalPersistencePolicies(persistencePoliciesConfig)
if err = client.SetPersistence(nsName.String(), *persistencePolicies); err != nil {
Expand Down Expand Up @@ -646,6 +692,7 @@ func resourcePulsarNamespaceDelete(ctx context.Context, d *schema.ResourceData,
_ = d.Set("retention_policies", nil)
_ = d.Set("backlog_quota", nil)
_ = d.Set("dispatch_rate", nil)
_ = d.Set("subscription_dispatch_rate", nil)
_ = d.Set("persistence_policies", nil)
_ = d.Set("permission_grant", nil)
_ = d.Set("topic_auto_creation", nil)
Expand Down
14 changes: 12 additions & 2 deletions pulsar/resource_pulsar_namespace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ func TestNamespaceWithUpdate(t *testing.T) {
Check: resource.ComposeTestCheckFunc(
testPulsarNamespaceExists(resourceName),
resource.TestCheckResourceAttr(resourceName, "dispatch_rate.#", "0"),
resource.TestCheckResourceAttr(resourceName, "subscription_dispatch_rate.#", "0"),
resource.TestCheckResourceAttr(resourceName, "retention_policies.#", "0"),
resource.TestCheckResourceAttr(resourceName, "namespace_config.#", "0"),
resource.TestCheckNoResourceAttr(resourceName, "enable_deduplication"),
Expand All @@ -121,6 +122,7 @@ func TestNamespaceWithUpdate(t *testing.T) {
Check: resource.ComposeTestCheckFunc(
testPulsarNamespaceExists(resourceName),
resource.TestCheckResourceAttr(resourceName, "dispatch_rate.#", "1"),
resource.TestCheckResourceAttr(resourceName, "subscription_dispatch_rate.#", "1"),
resource.TestCheckResourceAttr(resourceName, "retention_policies.#", "1"),
resource.TestCheckResourceAttr(resourceName, "namespace_config.#", "1"),
resource.TestCheckResourceAttr(resourceName, "enable_deduplication", "true"),
Expand Down Expand Up @@ -162,6 +164,7 @@ func TestNamespaceWithUndefinedOptionalsUpdate(t *testing.T) {
Check: resource.ComposeTestCheckFunc(
testPulsarNamespaceExists(resourceName),
resource.TestCheckResourceAttr(resourceName, "dispatch_rate.#", "0"),
resource.TestCheckResourceAttr(resourceName, "subscription_dispatch_rate.#", "0"),
resource.TestCheckResourceAttr(resourceName, "retention_policies.#", "0"),
resource.TestCheckResourceAttr(resourceName, "backlog_quota.#", "0"),
resource.TestCheckResourceAttr(resourceName, "namespace_config.#", "0"),
Expand All @@ -174,6 +177,7 @@ func TestNamespaceWithUndefinedOptionalsUpdate(t *testing.T) {
Check: resource.ComposeTestCheckFunc(
testPulsarNamespaceExists(resourceName),
resource.TestCheckResourceAttr(resourceName, "dispatch_rate.#", "0"),
resource.TestCheckResourceAttr(resourceName, "subscription_dispatch_rate.#", "0"),
resource.TestCheckResourceAttr(resourceName, "retention_policies.#", "0"),
resource.TestCheckResourceAttr(resourceName, "backlog_quota.#", "0"),
resource.TestCheckResourceAttr(resourceName, "namespace_config.#", "1"),
Expand Down Expand Up @@ -395,8 +399,8 @@ func testNamespaceImported() resource.ImportStateCheckFunc {
return fmt.Errorf("expected %d states, got %d: %#v", 1, len(s), s)
}

if len(s[0].Attributes) != 11 {
return fmt.Errorf("expected %d attrs, got %d: %#v", 11, len(s[0].Attributes), s[0].Attributes)
if len(s[0].Attributes) != 12 {
return fmt.Errorf("expected %d attrs, got %d: %#v", 12, len(s[0].Attributes), s[0].Attributes)
}

return nil
Expand Down Expand Up @@ -507,6 +511,12 @@ resource "pulsar_namespace" "test" {
dispatch_byte_throttling_rate = 2048
}
subscription_dispatch_rate {
dispatch_msg_throttling_rate = 50
rate_period_seconds = 50
dispatch_byte_throttling_rate = 2048
}
retention_policies {
retention_minutes = "1600"
retention_size_in_mb = "10000"
Expand Down

0 comments on commit 969d5ef

Please sign in to comment.