Skip to content

Commit

Permalink
issue-488, saving default kafka connect credentials was implemented
Browse files Browse the repository at this point in the history
  • Loading branch information
worryg0d committed Jul 24, 2023
1 parent 836b5ec commit 9845330
Show file tree
Hide file tree
Showing 5 changed files with 103 additions and 0 deletions.
22 changes: 22 additions & 0 deletions apis/clusters/v1beta1/kafkaconnect_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"encoding/json"
"fmt"

v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/controller-runtime/pkg/client"

Expand Down Expand Up @@ -687,3 +688,24 @@ func (tc *TargetCluster) ManagedClustersToInstAPI() (iClusters []*models.Managed
}
return
}

func (k *KafkaConnect) NewDefaultUserSecret(username, password string) *v1.Secret {
return &v1.Secret{
TypeMeta: metav1.TypeMeta{
Kind: models.SecretKind,
APIVersion: models.K8sAPIVersionV1,
},
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf(models.DefaultUserSecretNameTemplate, models.DefaultUserSecretPrefix, k.Name),
Namespace: k.Namespace,
Labels: map[string]string{
models.ControlledByLabel: k.Name,
models.DefaultSecretLabel: "true",
},
},
StringData: map[string]string{
models.Username: username,
models.Password: password,
},
}
}
44 changes: 44 additions & 0 deletions controllers/clusters/kafkaconnect_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,21 @@ func (r *KafkaConnectReconciler) handleCreateCluster(ctx context.Context, kc *v1
"Cluster status check job is started",
)

err = r.createDefaultSecret(ctx, kc, l)
if err != nil {
l.Error(err, "Cannot create default secret for Kafka Connect",
"cluster name", kc.Spec.Name,
"clusterID", kc.Status.ID,
)
r.EventRecorder.Eventf(
kc, models.Warning, models.CreationFailed,
"Default user secret creation on the Instaclustr is failed. Reason: %v",
err,
)

return models.ReconcileRequeue
}

return models.ExitReconcile
}

Expand Down Expand Up @@ -386,6 +401,35 @@ func (r *KafkaConnectReconciler) handleDeleteCluster(ctx context.Context, kc *v1
return models.ExitReconcile
}

func (r *KafkaConnectReconciler) createDefaultSecret(ctx context.Context, kc *v1beta1.KafkaConnect, l logr.Logger) error {
username, password, err := r.API.GetDefaultCredentialsV1(kc.Status.ID)
if err != nil {
l.Error(err, "Cannot get default user creds for Kafka Connect cluster from the Instaclustr API",
"cluster ID", kc.Status.ID,
)
r.EventRecorder.Eventf(kc, models.Warning, models.FetchFailed,
"Default user password fetch from the Instaclustr API is failed. Reason: %v", err,
)

return err
}

secret := kc.NewDefaultUserSecret(username, password)
err = r.Create(ctx, secret)
if err != nil {
l.Error(err, "Cannot create secret with default user credentials",
"cluster ID", kc.Status.ID,
)
r.EventRecorder.Eventf(kc, models.Warning, models.CreationFailed,
"Creating secret with default user credentials is failed. Reason: %v", err,
)

return err
}

return nil
}

func (r *KafkaConnectReconciler) startClusterStatusJob(kc *v1beta1.KafkaConnect) error {
job := r.newWatchStatusJob(kc)

Expand Down
32 changes: 32 additions & 0 deletions pkg/instaclustr/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2184,3 +2184,35 @@ func (c *Client) DeleteUser(username, clusterID, app string) error {

return nil
}

func (c *Client) GetDefaultCredentialsV1(clusterID string) (string, string, error) {
url := c.serverHostname + ClustersEndpointV1 + clusterID

resp, err := c.DoRequest(url, http.MethodGet, nil)
if err != nil {
return "", "", err
}
defer resp.Body.Close()

b, err := io.ReadAll(resp.Body)
if err != nil {
return "", "", err
}

if resp.StatusCode != http.StatusAccepted {
return "", "", fmt.Errorf("status code: %d, message: %s", resp.StatusCode, b)
}

type credentials struct {
Username string `json:"username"`
InstaclustrUserPassword string `json:"instaclustrUserPassword"`
}

var creds credentials
err = json.Unmarshal(b, &creds)
if err != nil {
return "", "", err
}

return creds.Username, creds.InstaclustrUserPassword, nil
}
1 change: 1 addition & 0 deletions pkg/instaclustr/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,4 +96,5 @@ type API interface {
CreateUser(userSpec any, clusterID, app string) error
DeleteUser(username, clusterID, app string) error
ListAppVersions(app string) ([]*models.AppVersions, error)
GetDefaultCredentialsV1(clusterID string) (string, string, error)
}
4 changes: 4 additions & 0 deletions pkg/instaclustr/mock/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,3 +343,7 @@ func (c *mockClient) CreateUser(userSpec any, clusterID, app string) error {
func (c *mockClient) DeleteUser(username, clusterID, app string) error {
panic("DeleteUser: is not implemented")
}

func (c *mockClient) GetDefaultCredentialsV1(clusterID string) (string, string, error) {
panic("GetDefaultCredentialsV1: is not implemented")
}

0 comments on commit 9845330

Please sign in to comment.