Skip to content

Commit

Permalink
Rewrote ISM Policy reconciler
Browse files Browse the repository at this point in the history
  • Loading branch information
rkthtrifork committed Aug 21, 2024
1 parent 610222a commit 3cca2d0
Show file tree
Hide file tree
Showing 8 changed files with 343 additions and 305 deletions.
9 changes: 3 additions & 6 deletions opensearch-operator/opensearch-gateway/requests/IsmPolicy.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,11 @@
package requests

type Policy struct {
PolicyID string `json:"_id,omitempty"`
PrimaryTerm *int `json:"_primary_term,omitempty"`
SequenceNumber *int `json:"_seq_no,omitempty"`
Policy ISMPolicy `json:"policy"`
type ISMPolicy struct {
Policy ISMPolicySpec `json:"policy"`
}

// ISMPolicySpec is the specification for the ISM policy for OS.
type ISMPolicy struct {
type ISMPolicySpec struct {
// The default starting state for each index that uses this policy.
DefaultState string `json:"default_state"`
// A human-readable description of the policy.
Expand Down

This file was deleted.

10 changes: 10 additions & 0 deletions opensearch-operator/opensearch-gateway/responses/IsmPolicy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package responses

import "github.com/Opster/opensearch-k8s-operator/opensearch-operator/opensearch-gateway/requests"

type GetISMPolicyResponse struct {
PolicyID string `json:"_id"`
PrimaryTerm int `json:"_primary_term"`
SequenceNumber int `json:"_seq_no"`
Policy requests.ISMPolicySpec
}
33 changes: 10 additions & 23 deletions opensearch-operator/opensearch-gateway/services/os_ism_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"

"github.com/Opster/opensearch-k8s-operator/opensearch-operator/opensearch-gateway/requests"
"github.com/Opster/opensearch-k8s-operator/opensearch-operator/opensearch-gateway/responses"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/opensearch-project/opensearch-go/opensearchutil"
Expand All @@ -16,7 +17,7 @@ import (
var ErrNotFound = errors.New("policy not found")

// ShouldUpdateISMPolicy checks if the passed policy is same as existing or needs update
func ShouldUpdateISMPolicy(ctx context.Context, newPolicy, existingPolicy requests.Policy) (bool, error) {
func ShouldUpdateISMPolicy(ctx context.Context, newPolicy, existingPolicy requests.ISMPolicy) (bool, error) {
if cmp.Equal(newPolicy, existingPolicy, cmpopts.EquateEmpty()) {
return false, nil
}
Expand All @@ -27,34 +28,20 @@ func ShouldUpdateISMPolicy(ctx context.Context, newPolicy, existingPolicy reques
return true, nil
}

// PolicyExists checks if the passed policy already exists or not
func PolicyExists(ctx context.Context, service *OsClusterClient, policyName string) (bool, error) {
resp, err := service.GetISMConfig(ctx, policyName)
if err != nil {
return false, err
}
defer resp.Body.Close()
if resp.StatusCode == 404 {
return false, nil
} else if resp.IsError() {
return false, fmt.Errorf("response from API is %s", resp.Status())
}
return true, nil
}

// GetPolicy fetches the passed policy
func GetPolicy(ctx context.Context, service *OsClusterClient, policyName string) (*requests.Policy, error) {
func GetPolicy(ctx context.Context, service *OsClusterClient, policyName string) (*responses.GetISMPolicyResponse, error) {
resp, err := service.GetISMConfig(ctx, policyName)
if err != nil {
return nil, err
}
defer resp.Body.Close()
if resp.StatusCode == 404 {
return nil, ErrNotFound
} else if resp.IsError() {
}
if resp.IsError() {
return nil, fmt.Errorf("response from API is %s", resp.Status())
}
ismResponse := requests.Policy{}
ismResponse := responses.GetISMPolicyResponse{}
if resp != nil && resp.Body != nil {
err := json.NewDecoder(resp.Body).Decode(&ismResponse)
if err != nil {
Expand All @@ -66,7 +53,7 @@ func GetPolicy(ctx context.Context, service *OsClusterClient, policyName string)
}

// CreateISMPolicy creates the passed policy
func CreateISMPolicy(ctx context.Context, service *OsClusterClient, ismpolicy requests.Policy, policyId string) error {
func CreateISMPolicy(ctx context.Context, service *OsClusterClient, ismpolicy requests.ISMPolicy, policyId string) error {
spec := opensearchutil.NewJSONReader(ismpolicy)
resp, err := service.PutISMConfig(ctx, policyId, spec)
if err != nil {
Expand All @@ -80,15 +67,15 @@ func CreateISMPolicy(ctx context.Context, service *OsClusterClient, ismpolicy re
}

// UpdateISMPolicy updates the given policy
func UpdateISMPolicy(ctx context.Context, service *OsClusterClient, ismpolicy requests.Policy, seqno, primterm *int, policyName string) error {
func UpdateISMPolicy(ctx context.Context, service *OsClusterClient, ismpolicy requests.ISMPolicy, seqno, primterm *int, policyId string) error {
spec := opensearchutil.NewJSONReader(ismpolicy)
resp, err := service.UpdateISMConfig(ctx, policyName, *seqno, *primterm, spec)
resp, err := service.UpdateISMConfig(ctx, policyId, *seqno, *primterm, spec)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.IsError() {
return fmt.Errorf("failed to create ism policy: %s", resp.String())
return fmt.Errorf("failed to update ism policy: %s", resp.String())
}
return nil
}
Expand Down
1 change: 1 addition & 0 deletions opensearch-operator/pkg/reconcilers/indextemplate.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ func (r *IndexTemplateReconciler) Reconcile() (result ctrl.Result, err error) {
})
if err != nil {
reason = fmt.Sprintf("failed to update status: %s", err)
// r.logger.Error(retErr, reason) should this be added?
r.recorder.Event(r.instance, "Warning", statusError, reason)
return
}
Expand Down
Loading

0 comments on commit 3cca2d0

Please sign in to comment.