From ec525738d8c42a3c622be1fd78977b0753bd3e69 Mon Sep 17 00:00:00 2001 From: rxg8255 <105524324+rxg8255@users.noreply.github.com> Date: Wed, 26 Jun 2024 02:19:26 -0500 Subject: [PATCH] Add support for cassandra tls (#5803) Signed-off-by: Ranjith Gopal Co-authored-by: Ranjith Gopal --- CHANGELOG.md | 1 + pkg/scalers/cassandra_scaler.go | 118 ++++++++++++++++++++++++++- pkg/scalers/cassandra_scaler_test.go | 72 ++++++++++++++++ 3 files changed, 189 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index e83ddf2bad1..dd14e97d968 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -72,6 +72,7 @@ Here is an overview of all new **experimental** features: ### Improvements +- **Cassandra Scaler**: Add TLS support for cassandra scaler ([#5802](https://github.com/kedacore/keda/issues/5802)) - **GCP Scalers**: Added custom time horizon in GCP scalers ([#5778](https://github.com/kedacore/keda/issues/5778)) - **GitHub Scaler**: Fixed pagination, fetching repository list ([#5738](https://github.com/kedacore/keda/issues/5738)) - **Kafka**: Fix logic to scale to zero on invalid offset even with earliest offsetResetPolicy ([#5689](https://github.com/kedacore/keda/issues/5689)) diff --git a/pkg/scalers/cassandra_scaler.go b/pkg/scalers/cassandra_scaler.go index b224f99be72..6e8705d2d8d 100644 --- a/pkg/scalers/cassandra_scaler.go +++ b/pkg/scalers/cassandra_scaler.go @@ -2,8 +2,10 @@ package scalers import ( "context" + "errors" "fmt" "net" + "os" "strconv" "strings" @@ -28,6 +30,10 @@ type cassandraScaler struct { type CassandraMetadata struct { username string password string + enableTLS bool + cert string + key string + ca string clusterIPAddress string port int consistency gocql.Consistency @@ -39,6 +45,11 @@ type CassandraMetadata struct { triggerIndex int } +const ( + tlsEnable = "enable" + tlsDisable = "disable" +) + // NewCassandraScaler creates a new Cassandra scaler. func NewCassandraScaler(config *scalersconfig.ScalerConfig) (Scaler, error) { metricType, err := GetMetricTargetType(config) @@ -68,7 +79,8 @@ func NewCassandraScaler(config *scalersconfig.ScalerConfig) (Scaler, error) { // parseCassandraMetadata parses the metadata and returns a CassandraMetadata or an error if the ScalerConfig is invalid. func parseCassandraMetadata(config *scalersconfig.ScalerConfig) (*CassandraMetadata, error) { - meta := CassandraMetadata{} + meta := &CassandraMetadata{} + var err error if val, ok := config.TriggerMetadata["query"]; ok { meta.query = val @@ -157,9 +169,86 @@ func parseCassandraMetadata(config *scalersconfig.ScalerConfig) (*CassandraMetad return nil, fmt.Errorf("no password given") } + if err = parseCassandraTLS(config, meta); err != nil { + return meta, err + } + meta.triggerIndex = config.TriggerIndex - return &meta, nil + return meta, nil +} + +func createTempFile(prefix string, content string) (string, error) { + tempCassandraDir := fmt.Sprintf("%s%c%s", os.TempDir(), os.PathSeparator, "cassandra") + err := os.MkdirAll(tempCassandraDir, 0700) + if err != nil { + return "", fmt.Errorf(`error creating temporary directory: %s. Error: %w + Note, when running in a container a writable /tmp/cassandra emptyDir must be mounted. Refer to documentation`, tempCassandraDir, err) + } + + f, err := os.CreateTemp(tempCassandraDir, prefix+"-*.pem") + if err != nil { + return "", err + } + defer f.Close() + + _, err = f.WriteString(content) + if err != nil { + return "", err + } + + return f.Name(), nil +} + +func parseCassandraTLS(config *scalersconfig.ScalerConfig, meta *CassandraMetadata) error { + meta.enableTLS = false + if val, ok := config.AuthParams["tls"]; ok { + val = strings.TrimSpace(val) + if val == tlsEnable { + certGiven := config.AuthParams["cert"] != "" + keyGiven := config.AuthParams["key"] != "" + caCertGiven := config.AuthParams["ca"] != "" + if certGiven && !keyGiven { + return errors.New("no key given") + } + if keyGiven && !certGiven { + return errors.New("no cert given") + } + if !keyGiven && !certGiven { + return errors.New("no cert/key given") + } + + certFilePath, err := createTempFile("cert", config.AuthParams["cert"]) + if err != nil { + // handle error + return errors.New("Error creating cert file: " + err.Error()) + } + + keyFilePath, err := createTempFile("key", config.AuthParams["key"]) + if err != nil { + // handle error + return errors.New("Error creating key file: " + err.Error()) + } + + meta.cert = certFilePath + meta.key = keyFilePath + meta.ca = config.AuthParams["ca"] + if !caCertGiven { + meta.ca = "" + } else { + caCertFilePath, err := createTempFile("caCert", config.AuthParams["ca"]) + meta.ca = caCertFilePath + if err != nil { + // handle error + return errors.New("Error creating ca file: " + err.Error()) + } + } + meta.enableTLS = true + } else if val != tlsDisable { + return fmt.Errorf("err incorrect value for TLS given: %s", val) + } + } + return nil } // newCassandraSession returns a new Cassandra session for the provided CassandraMetadata. @@ -172,6 +261,14 @@ func newCassandraSession(meta *CassandraMetadata, logger logr.Logger) (*gocql.Se Password: meta.password, } + if meta.enableTLS { + cluster.SslOpts = &gocql.SslOptions{ + CertPath: meta.cert, + KeyPath: meta.key, + CaPath: meta.ca, + } + } + session, err := cluster.CreateSession() if err != nil { logger.Error(err, "found error creating session") @@ -223,6 +320,23 @@ func (s *cassandraScaler) GetQueryResult(ctx context.Context) (int64, error) { // Close closes the Cassandra session connection. func (s *cassandraScaler) Close(_ context.Context) error { + // clean up any temporary files + if strings.TrimSpace(s.metadata.cert) != "" { + if err := os.Remove(s.metadata.cert); err != nil { + return err + } + } + if strings.TrimSpace(s.metadata.key) != "" { + if err := os.Remove(s.metadata.key); err != nil { + return err + } + } + if strings.TrimSpace(s.metadata.ca) != "" { + if err := os.Remove(s.metadata.ca); err != nil { + return err + } + } + if s.session != nil { s.session.Close() } diff --git a/pkg/scalers/cassandra_scaler_test.go b/pkg/scalers/cassandra_scaler_test.go index 730055e2148..39930946a56 100644 --- a/pkg/scalers/cassandra_scaler_test.go +++ b/pkg/scalers/cassandra_scaler_test.go @@ -2,6 +2,8 @@ package scalers import ( "context" + "fmt" + "os" "testing" "github.com/go-logr/logr" @@ -16,6 +18,12 @@ type parseCassandraMetadataTestData struct { authParams map[string]string } +type parseCassandraTLSTestData struct { + authParams map[string]string + isError bool + enableTLS bool +} + type cassandraMetricIdentifier struct { metadataTestData *parseCassandraMetadataTestData triggerIndex int @@ -47,6 +55,17 @@ var testCassandraMetadata = []parseCassandraMetadataTestData{ {map[string]string{"query": "SELECT COUNT(*) FROM test_keyspace.test_table;", "targetQueryValue": "1", "username": "cassandra", "port": "9042", "clusterIPAddress": "https://cassandra.test", "keyspace": "test_keyspace", "TriggerIndex": "0"}, false, map[string]string{"password": "Y2Fzc2FuZHJhCg=="}}, } +var tlsAuthParamsTestData = []parseCassandraTLSTestData{ + // success, TLS cert/key + {map[string]string{"tls": "enable", "cert": "ceert", "key": "keey", "password": "Y2Fzc2FuZHJhCg=="}, false, true}, + // failure, TLS missing cert + {map[string]string{"tls": "enable", "key": "keey", "password": "Y2Fzc2FuZHJhCg=="}, true, false}, + // failure, TLS missing key + {map[string]string{"tls": "enable", "cert": "ceert", "password": "Y2Fzc2FuZHJhCg=="}, true, false}, + // failure, TLS invalid + {map[string]string{"tls": "yes", "cert": "ceert", "key": "keeey", "password": "Y2Fzc2FuZHJhCg=="}, true, false}, +} + var cassandraMetricIdentifiers = []cassandraMetricIdentifier{ {&testCassandraMetadata[1], 0, "s0-cassandra-test_keyspace"}, {&testCassandraMetadata[2], 1, "s1-cassandra-test_keyspace"}, @@ -83,3 +102,56 @@ func TestCassandraGetMetricSpecForScaling(t *testing.T) { } } } + +func assertCertContents(testData parseCassandraTLSTestData, meta *CassandraMetadata, prop string) error { + if testData.authParams[prop] != "" { + var path string + switch prop { + case "cert": + path = meta.cert + case "key": + path = meta.key + } + data, err := os.ReadFile(path) + if err != nil { + return fmt.Errorf("expected to find '%v' file at %v", prop, path) + } + contents := string(data) + if contents != testData.authParams[prop] { + return fmt.Errorf("expected value: '%v' but got '%v'", testData.authParams[prop], contents) + } + } + return nil +} + +var successMetaData = map[string]string{"query": "SELECT COUNT(*) FROM test_keyspace.test_table;", "targetQueryValue": "1", "username": "cassandra", "clusterIPAddress": "cassandra.test:9042", "keyspace": "test_keyspace", "TriggerIndex": "0"} + +func TestParseCassandraTLS(t *testing.T) { + for _, testData := range tlsAuthParamsTestData { + meta, err := parseCassandraMetadata(&scalersconfig.ScalerConfig{TriggerMetadata: successMetaData, AuthParams: testData.authParams}) + + if err != nil && !testData.isError { + t.Error("Expected success but got error", err) + } + if testData.isError && err == nil { + t.Error("Expected error but got success") + } + if meta.enableTLS != testData.enableTLS { + t.Errorf("Expected enableTLS to be set to %v but got %v\n", testData.enableTLS, meta.enableTLS) + } + if meta.enableTLS { + if meta.cert != testData.authParams["cert"] { + err := assertCertContents(testData, meta, "cert") + if err != nil { + t.Errorf(err.Error()) + } + } + if meta.key != testData.authParams["key"] { + err := assertCertContents(testData, meta, "key") + if err != nil { + t.Errorf(err.Error()) + } + } + } + } +}