Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rewrote ISM Policy reconciler #846

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,14 +1,11 @@
package requests

type Policy struct {
PolicyID string `json:"_id,omitempty"`
PrimaryTerm *int `json:"_primary_term,omitempty"`
SequenceNumber *int `json:"_seq_no,omitempty"`
Policy ISMPolicy `json:"policy"`
type ISMPolicy struct {
Policy ISMPolicySpec `json:"policy"`
}

// ISMPolicySpec is the specification for the ISM policy for OS.
type ISMPolicy struct {
type ISMPolicySpec struct {
// The default starting state for each index that uses this policy.
DefaultState string `json:"default_state"`
// A human-readable description of the policy.
Expand Down

This file was deleted.

10 changes: 10 additions & 0 deletions opensearch-operator/opensearch-gateway/responses/IsmPolicy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package responses

import "github.com/Opster/opensearch-k8s-operator/opensearch-operator/opensearch-gateway/requests"

type GetISMPolicyResponse struct {
PolicyID string `json:"_id"`
PrimaryTerm int `json:"_primary_term"`
SequenceNumber int `json:"_seq_no"`
Policy requests.ISMPolicySpec
}
33 changes: 10 additions & 23 deletions opensearch-operator/opensearch-gateway/services/os_ism_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"

"github.com/Opster/opensearch-k8s-operator/opensearch-operator/opensearch-gateway/requests"
"github.com/Opster/opensearch-k8s-operator/opensearch-operator/opensearch-gateway/responses"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/opensearch-project/opensearch-go/opensearchutil"
Expand All @@ -16,7 +17,7 @@ import (
var ErrNotFound = errors.New("policy not found")

// ShouldUpdateISMPolicy checks if the passed policy is same as existing or needs update
func ShouldUpdateISMPolicy(ctx context.Context, newPolicy, existingPolicy requests.Policy) (bool, error) {
func ShouldUpdateISMPolicy(ctx context.Context, newPolicy, existingPolicy requests.ISMPolicy) (bool, error) {
if cmp.Equal(newPolicy, existingPolicy, cmpopts.EquateEmpty()) {
return false, nil
}
Expand All @@ -27,34 +28,20 @@ func ShouldUpdateISMPolicy(ctx context.Context, newPolicy, existingPolicy reques
return true, nil
}

// PolicyExists checks if the passed policy already exists or not
func PolicyExists(ctx context.Context, service *OsClusterClient, policyName string) (bool, error) {
resp, err := service.GetISMConfig(ctx, policyName)
if err != nil {
return false, err
}
defer resp.Body.Close()
if resp.StatusCode == 404 {
return false, nil
} else if resp.IsError() {
return false, fmt.Errorf("response from API is %s", resp.Status())
}
return true, nil
}

// GetPolicy fetches the passed policy
func GetPolicy(ctx context.Context, service *OsClusterClient, policyName string) (*requests.Policy, error) {
func GetPolicy(ctx context.Context, service *OsClusterClient, policyName string) (*responses.GetISMPolicyResponse, error) {
resp, err := service.GetISMConfig(ctx, policyName)
if err != nil {
return nil, err
}
defer resp.Body.Close()
if resp.StatusCode == 404 {
return nil, ErrNotFound
} else if resp.IsError() {
}
if resp.IsError() {
return nil, fmt.Errorf("response from API is %s", resp.Status())
}
ismResponse := requests.Policy{}
ismResponse := responses.GetISMPolicyResponse{}
if resp != nil && resp.Body != nil {
err := json.NewDecoder(resp.Body).Decode(&ismResponse)
if err != nil {
Expand All @@ -66,7 +53,7 @@ func GetPolicy(ctx context.Context, service *OsClusterClient, policyName string)
}

// CreateISMPolicy creates the passed policy
func CreateISMPolicy(ctx context.Context, service *OsClusterClient, ismpolicy requests.Policy, policyId string) error {
func CreateISMPolicy(ctx context.Context, service *OsClusterClient, ismpolicy requests.ISMPolicy, policyId string) error {
spec := opensearchutil.NewJSONReader(ismpolicy)
resp, err := service.PutISMConfig(ctx, policyId, spec)
if err != nil {
Expand All @@ -80,15 +67,15 @@ func CreateISMPolicy(ctx context.Context, service *OsClusterClient, ismpolicy re
}

// UpdateISMPolicy updates the given policy
func UpdateISMPolicy(ctx context.Context, service *OsClusterClient, ismpolicy requests.Policy, seqno, primterm *int, policyName string) error {
func UpdateISMPolicy(ctx context.Context, service *OsClusterClient, ismpolicy requests.ISMPolicy, seqno, primterm *int, policyId string) error {
spec := opensearchutil.NewJSONReader(ismpolicy)
resp, err := service.UpdateISMConfig(ctx, policyName, *seqno, *primterm, spec)
resp, err := service.UpdateISMConfig(ctx, policyId, *seqno, *primterm, spec)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.IsError() {
return fmt.Errorf("failed to create ism policy: %s", resp.String())
return fmt.Errorf("failed to update ism policy: %s", resp.String())
}
return nil
}
Expand Down
Loading