Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use the common etcd client in etos-iut #83

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 4 additions & 11 deletions cmd/iut/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,14 @@ import (
"time"

config "github.com/eiffel-community/etos-api/internal/configs/iut"
"github.com/eiffel-community/etos-api/internal/database/etcd"
"github.com/eiffel-community/etos-api/internal/logging"
server "github.com/eiffel-community/etos-api/internal/server"
"github.com/eiffel-community/etos-api/pkg/application"
"github.com/eiffel-community/etos-api/pkg/iut/v1alpha1"
"github.com/sirupsen/logrus"
"github.com/snowzach/rotatefilehook"
"go.elastic.co/ecslogrus"
clientv3 "go.etcd.io/etcd/client/v3"
)

// main sets up logging and starts up the webserver.
Expand Down Expand Up @@ -62,17 +62,10 @@ func main() {
"user_log": false,
})

// Database connection test
cli, err := clientv3.New(clientv3.Config{
Endpoints: []string{cfg.DatabaseURI()},
DialTimeout: 5 * time.Second,
})
if err != nil {
log.WithError(err).Fatal("failed to create etcd connection")
}

iutEtcdTreePrefix := "/iut"
db := etcd.New(cfg, logger, iutEtcdTreePrefix)
log.Info("Loading v1alpha1 routes")
v1alpha1App := v1alpha1.New(cfg, log, ctx, cli)
v1alpha1App := v1alpha1.New(cfg, log, ctx, db)
defer v1alpha1App.Close()
router := application.New(v1alpha1App)

Expand Down
42 changes: 28 additions & 14 deletions internal/database/etcd/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,21 @@ func (etcd Etcd) Open(ctx context.Context, id uuid.UUID) io.ReadWriter {
}
}

// Write writes data to etcd
// Write writes data to etcd. If data is nil, the current key will be deleted from the database.
func (etcd Etcd) Write(p []byte) (int, error) {
if etcd.ID == uuid.Nil {
return 0, errors.New("please create a new etcd client using Open")
}
key := fmt.Sprintf("%s/%s", etcd.treePrefix, etcd.ID.String())

if p == nil {
_, err := etcd.client.Delete(etcd.ctx, key)
if err != nil {
return 0, fmt.Errorf("Failed to delete key %s: %s", key, err.Error())
}
return 0, nil
}
Comment on lines +78 to +84
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this added? and why in the write function?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This client shall implement the io.ReadWriter interface. The ETOS API applications are designed in a way that a database client shall be easily swapped with any other client which implements this common interface.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the question is why are we adding delete in the write function. A question I also have

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If all potential database clients are supposed to conform to io.ReadWriter interface and be swappable, we have no other choice.

The io.ReadWriter interface seems to be more-suitable for a storage like a binary file, but here it requires workarounds to fit it into a key-value pair storage like etcd.

Copy link
Collaborator

@t-persson t-persson Nov 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would rather we write a null value into the database rather than having this function do things that it shouldn't do.

If deletion is required then we should have a separate interface for that and in the Stop function cast the database to that interface to see if it is possible to delete.

client := h.database.Open(r.Context(), identifier)
c, canDelete := client.(database.Deleter)
if canDelete {
  err := c.Delete()
} else {
  _, err = client.Write(nil)
}


_, err := etcd.client.Put(etcd.ctx, key, string(p))
if err != nil {
return 0, err
Expand All @@ -91,35 +100,40 @@ func (etcd *Etcd) readByte() byte {
// Read reads data from etcd and returns p bytes to user
func (etcd *Etcd) Read(p []byte) (n int, err error) {
if etcd.ID == uuid.Nil {
err = errors.New("please create a new etcd client using NewWithID")
return n, err
return 0, errors.New("please create a new etcd client using NewWithID")
}

key := fmt.Sprintf("%s/%s", etcd.treePrefix, etcd.ID.String())

if !etcd.hasRead {
resp, err := etcd.client.Get(etcd.ctx, key)
if err != nil {
return n, err
return 0, err
t-persson marked this conversation as resolved.
Show resolved Hide resolved
}
if len(resp.Kvs) == 0 {
return n, io.EOF
return 0, io.EOF
}
etcd.data = resp.Kvs[0].Value
etcd.hasRead = true
}

if len(etcd.data) == 0 {
return n, io.EOF
return 0, io.EOF
}
if c := cap(p); c > 0 {
for n < c {
p[n] = etcd.readByte()
n++
if len(etcd.data) == 0 {
return n, io.EOF
}
}

// Copy as much data as possible to p
// The copy function copies the minimum of len(p) and len(etcd.data) bytes from etcd.data to p
// It returns the number of bytes copied, which is stored in n
n = copy(p, etcd.data)
t-persson marked this conversation as resolved.
Show resolved Hide resolved

// Update etcd.data to remove the portion of data that has already been copied to p
// etcd.data[n:] creates a new slice that starts from the n-th byte to the end of the original slice
// This effectively removes the first n bytes from etcd.data, ensuring that subsequent reads start from the correct position
etcd.data = etcd.data[n:]

if n == 0 {
return 0, io.EOF
}

return n, nil
}
55 changes: 37 additions & 18 deletions pkg/iut/v1alpha1/v1alpha1.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ import (

eiffelevents "github.com/eiffel-community/eiffelevents-sdk-go"
config "github.com/eiffel-community/etos-api/internal/configs/iut"
"github.com/eiffel-community/etos-api/internal/database"
"github.com/eiffel-community/etos-api/pkg/application"
packageurl "github.com/package-url/packageurl-go"
clientv3 "go.etcd.io/etcd/client/v3"

"github.com/google/uuid"
"github.com/julienschmidt/httprouter"
Expand All @@ -38,14 +38,14 @@ import (
type V1Alpha1Application struct {
logger *logrus.Entry
cfg config.Config
database *clientv3.Client
database database.Opener
wg *sync.WaitGroup
}

type V1Alpha1Handler struct {
logger *logrus.Entry
cfg config.Config
database *clientv3.Client
database database.Opener
wg *sync.WaitGroup
}

Expand All @@ -72,11 +72,11 @@ func (a *V1Alpha1Application) Close() {
}

// New returns a new V1Alpha1Application object/struct
func New(cfg config.Config, log *logrus.Entry, ctx context.Context, cli *clientv3.Client) application.Application {
func New(cfg config.Config, log *logrus.Entry, ctx context.Context, db database.Opener) application.Application {
return &V1Alpha1Application{
logger: log,
cfg: cfg,
database: cli,
database: db,
wg: &sync.WaitGroup{},
}
}
Expand Down Expand Up @@ -127,19 +127,27 @@ type StopRequest struct {

// Start creates a number of IUTs and stores them in the ETCD database returning a checkout ID.
func (h V1Alpha1Handler) Start(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
identifier, err := uuid.Parse(r.Header.Get("X-Etos-Id"))
logger := h.logger.WithField("identifier", identifier).WithContext(r.Context())
if err != nil {
RespondWithError(w, http.StatusInternalServerError, err.Error())
}
checkOutID := uuid.New()

w.Header().Set("X-Content-Type-Options", "nosniff")
w.Header().Set("Content-Type", "application/json")

var startReq StartRequest
if err := json.NewDecoder(r.Body).Decode(&startReq); err != nil {
logger.Errorf("Failed to decode request body: %s", r.Body)
RespondWithError(w, http.StatusBadRequest, err.Error())
return
}
defer r.Body.Close()
purl, err := packageurl.FromString(startReq.ArtifactIdentity)

if err != nil {
logger.Errorf("Failed to get purl from artifact identity: %s", startReq.ArtifactIdentity)
andmat900 marked this conversation as resolved.
Show resolved Hide resolved
RespondWithError(w, http.StatusBadRequest, err.Error())
return
}
Expand All @@ -150,11 +158,14 @@ func (h V1Alpha1Handler) Start(w http.ResponseWriter, r *http.Request, ps httpro
}
iuts, err := json.Marshal(purls)
if err != nil {
logger.Errorf("Failed to marshal purls: %s", purls)
RespondWithError(w, http.StatusInternalServerError, err.Error())
return
}
_, err = h.database.Put(r.Context(), fmt.Sprintf("/iut/%s", checkOutID.String()), string(iuts))
client := h.database.Open(r.Context(), identifier)
_, err = client.Write([]byte(string(iuts)))
if err != nil {
logger.Errorf("Failed to write to database: %s", string(iuts))
RespondWithError(w, http.StatusInternalServerError, err.Error())
return
}
Expand All @@ -166,43 +177,50 @@ func (h V1Alpha1Handler) Start(w http.ResponseWriter, r *http.Request, ps httpro

// Status creates a simple DONE Status response with IUTs.
func (h V1Alpha1Handler) Status(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
identifier := r.Header.Get("X-Etos-Id")
identifier, err := uuid.Parse(r.Header.Get("X-Etos-Id"))
if err != nil {
RespondWithError(w, http.StatusInternalServerError, err.Error())
}
logger := h.logger.WithField("identifier", identifier).WithContext(r.Context())

id, err := uuid.Parse(r.URL.Query().Get("id"))
client := h.database.Open(r.Context(), identifier)

data := make([]byte, 4096)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is 4096 always goign to be enough here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Normally it is around 130-150 bytes.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make([]byte) should be fine here since we cannot know the size

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make([]byte) will not compile. It has to be a list of bytes that is larger than the actual data.

Copy link
Collaborator

@t-persson t-persson Nov 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then I would recommend using something io.ReadAll instead, which will allocate a small size and grow it when necessary.

I.e. data, err := io.ReadAll(client)

byteCount, err := client.Read(data)
data = data[:byteCount]

key := fmt.Sprintf("/iut/%s", id)
dbResp, err := h.database.Get(r.Context(), key)
if err != nil {
logger.Errorf("Failed to look up status request id: %s", id)
RespondWithError(w, http.StatusInternalServerError, err.Error())
return
}
if len(dbResp.Kvs) == 0 {
err = fmt.Errorf("No key found: %s", key)
logger.Errorf("Failed to look up status request id: %s, %s", identifier, err.Error())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we really doing a lookup here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In line 190 we do a look up by reading from the database.

RespondWithError(w, http.StatusInternalServerError, err.Error())
return
}
statusResp := StatusResponse{
Id: id,
Status: "DONE",
}
if err = json.Unmarshal(dbResp.Kvs[0].Value, &statusResp.Iuts); err != nil {
if err = json.Unmarshal(data, &statusResp.Iuts); err != nil {
logger.Errorf("Failed to unmarshal data: %s", data)
RespondWithError(w, http.StatusInternalServerError, err.Error())
return
}
response, err := json.Marshal(statusResp)
if err != nil {
logger.Errorf("Failed to marshal status response: %s", statusResp)
RespondWithError(w, http.StatusInternalServerError, err.Error())
return
}
w.WriteHeader(http.StatusOK)
_, _ = w.Write(response)

}

// Stop deletes the given IUTs from the database and returns an empty response.
func (h V1Alpha1Handler) Stop(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
identifier := r.Header.Get("X-Etos-Id")
identifier, err := uuid.Parse(r.Header.Get("X-Etos-Id"))
if err != nil {
RespondWithError(w, http.StatusInternalServerError, err.Error())
}
logger := h.logger.WithField("identifier", identifier).WithContext(r.Context())

var stopReq StopRequest
Expand All @@ -212,7 +230,8 @@ func (h V1Alpha1Handler) Stop(w http.ResponseWriter, r *http.Request, ps httprou
RespondWithError(w, http.StatusBadRequest, err.Error())
return
}
_, err := h.database.Delete(r.Context(), fmt.Sprintf("/iut/%s", stopReq.Id))
client := h.database.Open(r.Context(), identifier)
_, err = client.Write(nil)
if err != nil {
logger.Errorf("Etcd delete failed: %s", err.Error())
RespondWithError(w, http.StatusInternalServerError, err.Error())
Expand Down
Loading