From ab2a157f97e27de3cde0731d0757de76614c709d Mon Sep 17 00:00:00 2001 From: Kairo Araujo Date: Tue, 17 Sep 2024 16:56:47 +0200 Subject: [PATCH] External messaging (#377) Add a publisher store to enable Archivista to publish information using different protocols and integrations * feat: add dapr publisher this publisher allows users to publish messages (gitoid/dsse payload) to a dapr HTTP pub/sub. * feat: add RSTUF publisher this publisher allows users to integrate Archivista with Repository Service for TUF, in order to secure the Archivista repository using TUF metadata signatures. --------- Signed-off-by: John Kjell Signed-off-by: Kairo Araujo Co-authored-by: John Kjell --- README.md | 8 ++ pkg/config/config.go | 8 ++ pkg/publisherstore/dapr/http.go | 92 ++++++++++++++++++++ pkg/publisherstore/publisherstore.go | 47 ++++++++++ pkg/publisherstore/rstuf/rstuf.go | 124 +++++++++++++++++++++++++++ pkg/publisherstore/rstuf/structs.go | 51 +++++++++++ pkg/server/server.go | 27 ++++-- pkg/server/services.go | 14 ++- 8 files changed, 362 insertions(+), 9 deletions(-) create mode 100644 pkg/publisherstore/dapr/http.go create mode 100644 pkg/publisherstore/publisherstore.go create mode 100644 pkg/publisherstore/rstuf/rstuf.go create mode 100644 pkg/publisherstore/rstuf/structs.go diff --git a/README.md b/README.md index 74896e84..97259b9c 100644 --- a/README.md +++ b/README.md @@ -110,6 +110,14 @@ Archivista is configured through environment variables currently. | ARCHIVISTA_GRAPHQL_WEB_CLIENT_ENABLE | TRUE | Enable GraphiQL, the GraphQL web client | | ARCHIVISTA_ENABLE_ARTIFACT_STORE | FALSE | Enable Artifact Store Endpoints | | ARCHIVISTA_ARTIFACT_STORE_CONFIG | /tmp/artifacts/config.yaml | Location of the config describing available artifacts | +| ARCHIVISTA_PUBLISHER | "" | Publisher to use. Options are DAPR, RSTUF. Supports multiple, Comma-separated list of String | +| ARCHIVISTA_PUBLISHER_DAPR_HOST | localhost | Dapr host | +| ARCHIVISTA_PUBLISHER_DAPR_PORT | 3500 | Dapr port | +| ARCHIVISTA_PUBLISHER_DAPR_COMPONENT_NAME | "archivista" | Dapr pubsub component name | +| ARCHIVISTA_PUBLISHER_DAPR_TOPIC | "attestations" | Dapr pubsub topic | +| ARCHIVISTA_PUBLISHER_DAPR_URL | | Dapr full URL | +| ARCHIVISTA_PUBLISHER_RSTUF_HOST | | RSTUF URL | + ## Using Archivista diff --git a/pkg/config/config.go b/pkg/config/config.go index 69d581eb..a2b91a74 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -56,6 +56,14 @@ type Config struct { EnableArtifactStore bool `default:"FALSE" desc:"*** Enable Artifact Store Endpoints" split_words:"true"` ArtifactStoreConfig string `default:"/tmp/artifacts/config.yaml" desc:"Location of the config describing available artifacts" split_words:"true"` + + Publisher []string `default:"" desc:"Publisher to use. Options are DAPR, RSTUF or empty string for disabled." split_words:"true"` + PublisherDaprHost string `default:"http://127.0.0.1" desc:"Host for Dapr" split_words:"true"` + PublisherDaprPort string `default:"3500" desc:"Port for Dapr" split_words:"true"` + PublisherDaprURL string `default:"" desc:"URL for Dapr" split_words:"true"` + PublisherDaprComponentName string `default:"archivista" desc:"Dapr pubsub component name" split_words:"true"` + PublisherDaprTopic string `default:"attestations" desc:"Dapr pubsub topic" split_words:"true"` + PublisherRstufHost string `default:"http://127.0.0.1" desc:"Host for RSTUF" split_words:"true"` } // Process reads config from env diff --git a/pkg/publisherstore/dapr/http.go b/pkg/publisherstore/dapr/http.go new file mode 100644 index 00000000..342dc520 --- /dev/null +++ b/pkg/publisherstore/dapr/http.go @@ -0,0 +1,92 @@ +// Copyright 2024 The Archivista Contributors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package dapr + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "net/http" + "time" + + "github.com/in-toto/archivista/pkg/config" + "github.com/sirupsen/logrus" +) + +type DaprHttp struct { + Client *http.Client + Host string + HttpPort string + PubsubComponentName string + PubsubTopic string + Url string +} + +type daprPayload struct { + Gitoid string + Payload []byte +} + +type Publisher interface { + Publish(ctx context.Context, gitoid string, payload []byte) error +} + +func (d *DaprHttp) Publish(ctx context.Context, gitoid string, payload []byte) error { + if d.Client == nil { + d.Client = &http.Client{ + Timeout: 15 * time.Second, + } + } + + if d.Url == "" { + d.Url = d.Host + ":" + d.HttpPort + + "/v1.0/publish/" + d.PubsubComponentName + "/" + d.PubsubTopic + } + + dp := daprPayload{ + Gitoid: gitoid, + Payload: payload, + } + // Marshal the message to JSON + msgBytes, err := json.Marshal(dp) + if err != nil { + logrus.Error(err.Error()) + return err + } + + res, err := d.Client.Post(d.Url, "application/json", bytes.NewReader(msgBytes)) + if err != nil { + logrus.Error(err.Error()) + return err + } + if res.StatusCode != http.StatusNoContent { + logrus.Printf("failed to publish message: %s", res.Body) + return fmt.Errorf("failed to publish message: %s", res.Body) + } + defer res.Body.Close() + + return nil +} + +func NewPublisher(config *config.Config) Publisher { + daprPublisher := &DaprHttp{ + Host: config.PublisherDaprHost, + HttpPort: config.PublisherDaprPort, + PubsubComponentName: config.PublisherDaprComponentName, + PubsubTopic: config.PublisherDaprTopic, + Url: config.PublisherDaprURL, + } + return daprPublisher +} diff --git a/pkg/publisherstore/publisherstore.go b/pkg/publisherstore/publisherstore.go new file mode 100644 index 00000000..77ace7b4 --- /dev/null +++ b/pkg/publisherstore/publisherstore.go @@ -0,0 +1,47 @@ +// Copyright 2024 The Archivista Contributors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package publisherstore + +import ( + "context" + "strings" + + "github.com/in-toto/archivista/pkg/config" + "github.com/in-toto/archivista/pkg/publisherstore/dapr" + "github.com/in-toto/archivista/pkg/publisherstore/rstuf" + "github.com/sirupsen/logrus" +) + +type Publisher interface { + Publish(ctx context.Context, gitoid string, payload []byte) error +} + +func New(config *config.Config) []Publisher { + var publisherStore []Publisher + for _, pubType := range config.Publisher { + pubType = strings.ToUpper(pubType) // Normalize the input + switch pubType { + case "DAPR": + publisherStore = append(publisherStore, dapr.NewPublisher(config)) + logrus.Info("Using publisher: DAPR") + + case "RSTUF": + publisherStore = append(publisherStore, rstuf.NewPublisher(config)) + logrus.Info("Using publisher: RSTUF") + default: + logrus.Errorf("unsupported publisher type: %s", pubType) + } + } + return publisherStore +} diff --git a/pkg/publisherstore/rstuf/rstuf.go b/pkg/publisherstore/rstuf/rstuf.go new file mode 100644 index 00000000..b0b9abf3 --- /dev/null +++ b/pkg/publisherstore/rstuf/rstuf.go @@ -0,0 +1,124 @@ +// Copyright 2024 The Archivista Contributors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package rstuf + +import ( + "bytes" + "context" + "crypto/sha256" + "encoding/hex" + "encoding/json" + "fmt" + "io" + "net/http" + "net/http/httputil" + + "github.com/in-toto/archivista/pkg/config" + "github.com/sirupsen/logrus" +) + +type RSTUF struct { + Host string +} + +type Publisher interface { + Publish(ctx context.Context, gitoid string, payload []byte) error +} + +func (r *RSTUF) parseRSTUFPayload(gitoid string, payload []byte) ([]byte, error) { + objHash := sha256.Sum256(payload) + // custom := make(map[string]any) + // custom["gitoid"] = gitoid + artifacts := []Artifact{ + { + Path: gitoid, + Info: ArtifactInfo{ + Length: len(payload), + Hashes: Hashes{ + Sha256: hex.EncodeToString(objHash[:]), + }, + // Custom: custom, + }, + }, + } + + artifactPayload := ArtifactPayload{ + Artifacts: artifacts, + AddTaskIDToCustom: false, + PublishTargets: true, + } + + payloadBytes, err := json.Marshal(artifactPayload) + if err != nil { + return nil, fmt.Errorf("error marshaling payload: %v", err) + } + return payloadBytes, nil +} + +func (r *RSTUF) Publish(ctx context.Context, gitoid string, payload []byte) error { + // this publisher allows integration with the RSTUF project to store + // the attestation and policy in the TUF metadata. + // this TUF metadata can be used to build truste when distributing the + // attestations and policies. + // Convert payload to JSON + url := r.Host + "/api/v1/artifacts" + + payloadBytes, err := r.parseRSTUFPayload(gitoid, payload) + if err != nil { + return fmt.Errorf("error parsing payload: %v", err) + } + + req, err := http.NewRequest("POST", url, bytes.NewBuffer(payloadBytes)) + if err != nil { + return fmt.Errorf("error creating request: %v", err) + } + + req.Header.Set("Content-Type", "application/json") + // Add any additional headers or authentication if needed + + client := &http.Client{} + resp, err := client.Do(req) + if err != nil { + logrus.Errorf("error making request: %v", err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusAccepted { + logb, _ := httputil.DumpResponse(resp, true) + logrus.Errorf("error body from RSTUF: %v", string(logb)) + return fmt.Errorf("error response from RSTUF: %v", err) + } + + // Handle the response as needed + body, err := io.ReadAll(resp.Body) + if err != nil { + logrus.Errorf("error reading response body: %v", err) + } + + response := Response{} + err = json.Unmarshal(body, &response) + if err != nil { + logrus.Errorf("error unmarshaling response: %v", err) + } + logrus.Debugf("RSTUF task id: %v", response.Data.TaskId) + // TODO: monitor RSTUF task id for completion + return nil +} + +func NewPublisher(config *config.Config) Publisher { + return &RSTUF{ + Host: config.PublisherRstufHost, + } +} diff --git a/pkg/publisherstore/rstuf/structs.go b/pkg/publisherstore/rstuf/structs.go new file mode 100644 index 00000000..7bdb953d --- /dev/null +++ b/pkg/publisherstore/rstuf/structs.go @@ -0,0 +1,51 @@ +// Copyright 2024 The Archivista Contributors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package rstuf + +// Hashes represents the Hashes structure +type Hashes struct { + Sha256 string `json:"sha256"` +} + +// ArtifactInfo represents the ArtifactInfo structure +type ArtifactInfo struct { + Length int `json:"length"` + Hashes Hashes `json:"hashes"` + Custom map[string]any `json:"custom,omitempty"` +} + +// Artifact represents the Artifact structure +type Artifact struct { + Path string `json:"path"` + Info ArtifactInfo `json:"info"` +} + +// ArtifactPayload represents the payload structure +type ArtifactPayload struct { + Artifacts []Artifact `json:"artifacts"` + AddTaskIDToCustom bool `json:"add_task_id_to_custom"` + PublishTargets bool `json:"publish_targets"` +} + +type ArtifactsResponse struct { + Artifacts []string `json:"artifacts"` + TaskId string `json:"task_id"` + LastUpdate string `json:"last_update"` + Message string `json:"message"` +} + +type Response struct { + Data ArtifactsResponse `json:"data"` +} diff --git a/pkg/server/server.go b/pkg/server/server.go index ee117856..f992e3df 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -38,16 +38,18 @@ import ( "github.com/in-toto/archivista/pkg/api" "github.com/in-toto/archivista/pkg/artifactstore" "github.com/in-toto/archivista/pkg/config" + "github.com/in-toto/archivista/pkg/publisherstore" "github.com/sirupsen/logrus" httpSwagger "github.com/swaggo/http-swagger/v2" ) type Server struct { - metadataStore Storer - objectStore StorerGetter - artifactStore artifactstore.Store - router *mux.Router - sqlClient *ent.Client + metadataStore Storer + objectStore StorerGetter + artifactStore artifactstore.Store + router *mux.Router + sqlClient *ent.Client + publisherStore []publisherstore.Publisher } type Storer interface { @@ -89,6 +91,12 @@ func WithArtifactStore(wds artifactstore.Store) Option { } } +func WithPublishers(pub []publisherstore.Publisher) Option { + return func(s *Server) { + s.publisherStore = pub + } +} + func New(cfg *config.Config, opts ...Option) (Server, error) { r := mux.NewRouter() s := Server{ @@ -168,6 +176,15 @@ func (s *Server) Upload(ctx context.Context, r io.Reader) (api.UploadResponse, e return api.UploadResponse{}, err } + if s.publisherStore != nil { + for _, publisher := range s.publisherStore { + // TODO: Make publish asynchrouns and use goroutine + if err := publisher.Publish(ctx, gid.String(), payload); err != nil { + logrus.Errorf("received error from publisher: %+v", err) + } + } + } + return api.UploadResponse{Gitoid: gid.String()}, nil } diff --git a/pkg/server/services.go b/pkg/server/services.go index 5255864f..9587d4ee 100644 --- a/pkg/server/services.go +++ b/pkg/server/services.go @@ -30,6 +30,7 @@ import ( "github.com/in-toto/archivista/pkg/metadatastorage/sqlstore" "github.com/in-toto/archivista/pkg/objectstorage/blobstore" "github.com/in-toto/archivista/pkg/objectstorage/filestore" + "github.com/in-toto/archivista/pkg/publisherstore" "github.com/minio/minio-go/v7/pkg/credentials" "github.com/sirupsen/logrus" ) @@ -53,10 +54,11 @@ type ArchivistaService struct { // Setup Archivista Service func (a *ArchivistaService) Setup() (*Server, error) { var ( - level logrus.Level - err error - sqlStore *sqlstore.Store - fileStore StorerGetter + level logrus.Level + err error + sqlStore *sqlstore.Store + fileStore StorerGetter + publisherStore []publisherstore.Publisher ) serverOpts := make([]Option, 0) @@ -128,6 +130,10 @@ func (a *ArchivistaService) Setup() (*Server, error) { serverOpts = append(serverOpts, WithArtifactStore(wds)) } + if a.Cfg.Publisher != nil { + publisherStore = publisherstore.New(a.Cfg) + serverOpts = append(serverOpts, WithPublishers(publisherStore)) + } // Create the Archivista server with all options server, err := New(a.Cfg, serverOpts...) if err != nil {