diff --git a/README.md b/README.md index 7be5fa9..0b93704 100644 --- a/README.md +++ b/README.md @@ -221,6 +221,12 @@ resource "pulsar_namespace" "test" { dispatch_byte_throttling_rate = 2048 } + subscription_dispatch_rate { + dispatch_msg_throttling_rate = 50 + rate_period_seconds = 50 + dispatch_byte_throttling_rate = 2048 + } + retention_policies { retention_minutes = "1600" retention_size_in_mb = "10000" @@ -249,17 +255,18 @@ resource "pulsar_namespace" "test" { #### Properties -| Property | Description | Required | -| ---------------------- | --------------------------------------------------------------------------------------------------------------------------------------------------------- | -------- | -| `tenant` | Name of the Tenant managing this namespace | Yes | -| `namespace` | name of the namespace | Yes | -| `enable_deduplication` | Message deduplication state on a namespace | No | -| `namespace_config` | Configuration for your namespaces like max allowed producers to produce messages | No | -| `dispatch_rate` | Apache Pulsar throttling config | No | -| `retention_policies` | Data retention policies | No | -| `backlog_quota` | [Backlog Quota](https://pulsar.apache.org/docs/en/admin-api-namespaces/#set-backlog-quota-policies) for all topics | No | -| `persistence_policies` | [Persistence policies](https://pulsar.apache.org/docs/en/admin-api-namespaces/#set-persistence-policies) for all topics under a given namespace | No | -| `permission_grant` | [Permission grants](https://pulsar.apache.org/docs/en/admin-api-permissions/) on a namespace. This block can be repeated for each grant you'd like to add | No | +| Property | Description | Required | +| ---------------------------- | --------------------------------------------------------------------------------------------------------------------------------------------------------- | -------- | +| `tenant` | Name of the Tenant managing this namespace | Yes | +| `namespace` | name of the namespace | Yes | +| `enable_deduplication` | Message deduplication state on a namespace | No | +| `namespace_config` | Configuration for your namespaces like max allowed producers to produce messages | No | +| `dispatch_rate` | [Apache Pulsar throttling config for topics](https://pulsar.apache.org/docs/en/admin-api-namespaces/#set-dispatch-throttling-for-topics) | No | +| `subscription_dispatch_rate` | [Apache Pulsar throttling config for subscriptions](https://pulsar.apache.org/docs/en/admin-api-namespaces/#set-dispatch-throttling-for-subscription) | No | +| `retention_policies` | Data retention policies | No | +| `backlog_quota` | [Backlog Quota](https://pulsar.apache.org/docs/en/admin-api-namespaces/#set-backlog-quota-policies) for all topics | No | +| `persistence_policies` | [Persistence policies](https://pulsar.apache.org/docs/en/admin-api-namespaces/#set-persistence-policies) for all topics under a given namespace | No | +| `permission_grant` | [Permission grants](https://pulsar.apache.org/docs/en/admin-api-permissions/) on a namespace. This block can be repeated for each grant you'd like to add | No | namespace_config nested schema diff --git a/docs/resources/namespace.md b/docs/resources/namespace.md index d19f28b..20778a8 100644 --- a/docs/resources/namespace.md +++ b/docs/resources/namespace.md @@ -20,8 +20,10 @@ description: |- ### Optional - `backlog_quota` (Block Set) (see [below for nested schema](#nestedblock--backlog_quota)) -- `dispatch_rate` (Block Set, Max: 1) Data transfer rate, in and out of the Pulsar Broker ( +- `dispatch_rate` (Block Set, Max: 1) Data transfer rate for all the topics under the given namespace ( see [below for nested schema](#nestedblock--dispatch_rate)) +- `subscription_dispatch_rate` (Block Set, Max: 1) Data transfer rate for all the subscriptions under the given + namespace (see [below for nested schema](#nestedblock--subscription_dispatch_rate)) - `enable_deduplication` (Boolean) - `namespace_config` (Block Set, Max: 1) (see [below for nested schema](#nestedblock--namespace_config)) - `permission_grant` (Block Set) (see [below for nested schema](#nestedblock--permission_grant)) @@ -54,6 +56,16 @@ Required: - `dispatch_msg_throttling_rate` (Number) - `rate_period_seconds` (Number) + + +### Nested Schema for `subscription_dispatch_rate` + +Required: + +- `dispatch_byte_throttling_rate` (Number) +- `dispatch_msg_throttling_rate` (Number) +- `rate_period_seconds` (Number) + ### Nested Schema for `namespace_config` diff --git a/pulsar/provider.go b/pulsar/provider.go index 154aaa5..47aa5cd 100644 --- a/pulsar/provider.go +++ b/pulsar/provider.go @@ -57,7 +57,8 @@ func init() { "max_consumers_per_subscription": "Max number of consumers per subscription", "max_consumers_per_topic": "Max number of consumers per topic", "message_ttl_seconds": "Sets the message time to live", - "dispatch_rate": "Data transfer rate, in and out of the Pulsar Broker", + "dispatch_rate": "Data transfer rate for all the topics under the given namespace", + "subscription_dispatch_rate": "Data transfer rate for all the subscriptions under the given namespace", "persistence_policy": "Policy for the namespace for data persistence", "backlog_quota": "", "issuer_url": "The OAuth 2.0 URL of the authentication provider which allows the Pulsar client to obtain an access token", diff --git a/pulsar/resource_pulsar_namespace.go b/pulsar/resource_pulsar_namespace.go index 54a6e83..183a68c 100644 --- a/pulsar/resource_pulsar_namespace.go +++ b/pulsar/resource_pulsar_namespace.go @@ -94,6 +94,29 @@ func resourcePulsarNamespace() *schema.Resource { }, Set: dispatchRateToHash, }, + "subscription_dispatch_rate": { + Type: schema.TypeSet, + Optional: true, + Description: descriptions["subscription_dispatch_rate"], + MaxItems: 1, + Elem: &schema.Resource{ + Schema: map[string]*schema.Schema{ + "dispatch_msg_throttling_rate": { + Type: schema.TypeInt, + Required: true, + }, + "rate_period_seconds": { + Type: schema.TypeInt, + Required: true, + }, + "dispatch_byte_throttling_rate": { + Type: schema.TypeInt, + Required: true, + }, + }, + }, + Set: dispatchRateToHash, + }, "retention_policies": { Type: schema.TypeSet, Optional: true, @@ -426,6 +449,21 @@ func resourcePulsarNamespaceRead(ctx context.Context, d *schema.ResourceData, me })) } + if subscriptionDispatchRateCfg, ok := d.GetOk("subscription_dispatch_rate"); ok && subscriptionDispatchRateCfg.(*schema.Set).Len() > 0 { //nolint:lll + sdr, err := client.GetSubscriptionDispatchRate(*ns) + if err != nil { + return diag.FromErr(fmt.Errorf("ERROR_READ_NAMESPACE: GetSubscriptionDispatchRate: %w", err)) + } + + _ = d.Set("subscription_dispatch_rate", schema.NewSet(dispatchRateToHash, []interface{}{ + map[string]interface{}{ + "dispatch_msg_throttling_rate": sdr.DispatchThrottlingRateInMsg, + "rate_period_seconds": sdr.RatePeriodInSecond, + "dispatch_byte_throttling_rate": int(sdr.DispatchThrottlingRateInByte), + }, + })) + } + if permissionGrantCfg, ok := d.GetOk("permission_grant"); ok && len(permissionGrantCfg.(*schema.Set).List()) > 0 { grants, err := client.GetNamespacePermissions(*ns) if err != nil { @@ -465,6 +503,7 @@ func resourcePulsarNamespaceUpdate(ctx context.Context, d *schema.ResourceData, retentionPoliciesConfig := d.Get("retention_policies").(*schema.Set) backlogQuotaConfig := d.Get("backlog_quota").(*schema.Set) dispatchRateConfig := d.Get("dispatch_rate").(*schema.Set) + subscriptionDispatchRateConfig := d.Get("subscription_dispatch_rate").(*schema.Set) persistencePoliciesConfig := d.Get("persistence_policies").(*schema.Set) permissionGrantConfig := d.Get("permission_grant").(*schema.Set) topicAutoCreation := d.Get("topic_auto_creation").(*schema.Set) @@ -560,6 +599,13 @@ func resourcePulsarNamespaceUpdate(ctx context.Context, d *schema.ResourceData, } } + if subscriptionDispatchRateConfig.Len() > 0 { + subscriptionDispatchRate := unmarshalDispatchRate(subscriptionDispatchRateConfig) + if err = client.SetSubscriptionDispatchRate(*nsName, *subscriptionDispatchRate); err != nil { + errs = multierror.Append(errs, fmt.Errorf("SetSubscriptionDispatchRate: %w", err)) + } + } + if persistencePoliciesConfig.Len() > 0 { persistencePolicies := unmarshalPersistencePolicies(persistencePoliciesConfig) if err = client.SetPersistence(nsName.String(), *persistencePolicies); err != nil { @@ -646,6 +692,7 @@ func resourcePulsarNamespaceDelete(ctx context.Context, d *schema.ResourceData, _ = d.Set("retention_policies", nil) _ = d.Set("backlog_quota", nil) _ = d.Set("dispatch_rate", nil) + _ = d.Set("subscription_dispatch_rate", nil) _ = d.Set("persistence_policies", nil) _ = d.Set("permission_grant", nil) _ = d.Set("topic_auto_creation", nil) diff --git a/pulsar/resource_pulsar_namespace_test.go b/pulsar/resource_pulsar_namespace_test.go index 9260157..a0eb450 100644 --- a/pulsar/resource_pulsar_namespace_test.go +++ b/pulsar/resource_pulsar_namespace_test.go @@ -110,6 +110,7 @@ func TestNamespaceWithUpdate(t *testing.T) { Check: resource.ComposeTestCheckFunc( testPulsarNamespaceExists(resourceName), resource.TestCheckResourceAttr(resourceName, "dispatch_rate.#", "0"), + resource.TestCheckResourceAttr(resourceName, "subscription_dispatch_rate.#", "0"), resource.TestCheckResourceAttr(resourceName, "retention_policies.#", "0"), resource.TestCheckResourceAttr(resourceName, "namespace_config.#", "0"), resource.TestCheckNoResourceAttr(resourceName, "enable_deduplication"), @@ -121,6 +122,7 @@ func TestNamespaceWithUpdate(t *testing.T) { Check: resource.ComposeTestCheckFunc( testPulsarNamespaceExists(resourceName), resource.TestCheckResourceAttr(resourceName, "dispatch_rate.#", "1"), + resource.TestCheckResourceAttr(resourceName, "subscription_dispatch_rate.#", "1"), resource.TestCheckResourceAttr(resourceName, "retention_policies.#", "1"), resource.TestCheckResourceAttr(resourceName, "namespace_config.#", "1"), resource.TestCheckResourceAttr(resourceName, "enable_deduplication", "true"), @@ -162,6 +164,7 @@ func TestNamespaceWithUndefinedOptionalsUpdate(t *testing.T) { Check: resource.ComposeTestCheckFunc( testPulsarNamespaceExists(resourceName), resource.TestCheckResourceAttr(resourceName, "dispatch_rate.#", "0"), + resource.TestCheckResourceAttr(resourceName, "subscription_dispatch_rate.#", "0"), resource.TestCheckResourceAttr(resourceName, "retention_policies.#", "0"), resource.TestCheckResourceAttr(resourceName, "backlog_quota.#", "0"), resource.TestCheckResourceAttr(resourceName, "namespace_config.#", "0"), @@ -174,6 +177,7 @@ func TestNamespaceWithUndefinedOptionalsUpdate(t *testing.T) { Check: resource.ComposeTestCheckFunc( testPulsarNamespaceExists(resourceName), resource.TestCheckResourceAttr(resourceName, "dispatch_rate.#", "0"), + resource.TestCheckResourceAttr(resourceName, "subscription_dispatch_rate.#", "0"), resource.TestCheckResourceAttr(resourceName, "retention_policies.#", "0"), resource.TestCheckResourceAttr(resourceName, "backlog_quota.#", "0"), resource.TestCheckResourceAttr(resourceName, "namespace_config.#", "1"), @@ -395,8 +399,8 @@ func testNamespaceImported() resource.ImportStateCheckFunc { return fmt.Errorf("expected %d states, got %d: %#v", 1, len(s), s) } - if len(s[0].Attributes) != 11 { - return fmt.Errorf("expected %d attrs, got %d: %#v", 11, len(s[0].Attributes), s[0].Attributes) + if len(s[0].Attributes) != 12 { + return fmt.Errorf("expected %d attrs, got %d: %#v", 12, len(s[0].Attributes), s[0].Attributes) } return nil @@ -507,6 +511,12 @@ resource "pulsar_namespace" "test" { dispatch_byte_throttling_rate = 2048 } + subscription_dispatch_rate { + dispatch_msg_throttling_rate = 50 + rate_period_seconds = 50 + dispatch_byte_throttling_rate = 2048 + } + retention_policies { retention_minutes = "1600" retention_size_in_mb = "10000"