Skip to content
This repository has been archived by the owner on May 24, 2024. It is now read-only.

Commit

Permalink
Implement caching v2 (#17)
Browse files Browse the repository at this point in the history
* implement caching v2

Signed-off-by: Ayman <[email protected]>

* clean up

Signed-off-by: Ayman <[email protected]>

Signed-off-by: Ayman <[email protected]>
Co-authored-by: Ayman <[email protected]>
  • Loading branch information
khalifapro and enkhalifapro authored Jan 20, 2023
1 parent 3c29cff commit f8a6868
Show file tree
Hide file tree
Showing 3 changed files with 197 additions and 74 deletions.
237 changes: 177 additions & 60 deletions cmd/jira/jira.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
package main

import (
"bytes"
"context"
"crypto/sha256"
"encoding/base64"
"encoding/csv"
"encoding/json"
"flag"
"fmt"
Expand Down Expand Up @@ -76,6 +79,9 @@ const (
Success = "success"
// JiraIssue type
JiraIssue = "issue"
// IssuesCacheFile name
IssuesCacheFile = "issues-cache.csv"
commentsCacheFile = "comments-cache"
)

var (
Expand All @@ -102,11 +108,13 @@ var (
// "comment_author": "jira_comment_created",
// "comment_updateAuthor": "jira_comment_updated",
// }
cachedIssues = make(map[string]EntityCache)
cachedComments = make(map[string][]EntityCache)
)

// Publisher - for streaming data to Kinesis
type Publisher interface {
PushEvents(action, source, eventType, subEventType, env string, data []interface{}) error
PushEvents(action, source, eventType, subEventType, env string, data []interface{}) (string, error)
}

// DSJira - DS implementation for Jira
Expand Down Expand Up @@ -401,22 +409,7 @@ func (j *DSJira) GetModelData(ctx *shared.Ctx, docs []interface{}) (map[string][
}
// Comments start
uComments := make(map[string]jira.IssueComment)
commentsCacheID := fmt.Sprintf("%s-%s-comments", JiraIssue, issueID)
commentsFileData, er := j.cacheProvider.GetFileByKey(fmt.Sprintf("%s/%s", j.endpoint, JiraIssue), commentsCacheID)
if er != nil {
err = er
j.log.WithFields(logrus.Fields{"operation": "GetModelData"}).Errorf("GetFileByKey get cached comments error: %v", err)
return data, err
}
oldComments := IssueComments{}
if commentsFileData != nil {
er = json.Unmarshal(commentsFileData, &oldComments)
if er != nil {
err = er
j.log.WithFields(logrus.Fields{"operation": "GetModelData"}).Errorf("unmarshall old cached comments error: %v", err)
return data, err
}
}
oldComments := cachedComments[issueID]
comments, okComments := doc["issue_comments"].([]map[string]interface{})
if okComments {
for _, comment := range comments {
Expand Down Expand Up @@ -531,8 +524,8 @@ func (j *DSJira) GetModelData(ctx *shared.Ctx, docs []interface{}) (map[string][
}
data[key] = ary
found := false
for _, oldc := range oldComments.Comments {
if oldc.ID == issueCommentID {
for _, oldc := range oldComments {
if oldc.EntityID == issueCommentID {
found = true
break
}
Expand All @@ -551,21 +544,22 @@ func (j *DSJira) GetModelData(ctx *shared.Ctx, docs []interface{}) (map[string][
}
}
}
for _, comm := range oldComments.Comments {
for _, comm := range oldComments {
deleted := true
edited := false
for newCommID, commentVal := range uComments {
if newCommID == comm.ID {
if newCommID == comm.EntityID {
deleted = false
if commentVal.Body != comm.Body {
contentHash := fmt.Sprintf("%x", sha256.Sum256([]byte(commentVal.Body)))
if contentHash != comm.Hash {
edited = true
}
break
}
}
if deleted {
rvComm := jira.DeleteIssueComment{
ID: comm.ID,
ID: comm.EntityID,
IssueID: issueID,
}
key := "comment_deleted"
Expand All @@ -579,15 +573,15 @@ func (j *DSJira) GetModelData(ctx *shared.Ctx, docs []interface{}) (map[string][
}
if edited {
editedComment := jira.IssueComment{
ID: comm.ID,
ID: comm.EntityID,
IssueID: issueID,
Comment: insights.Comment{
Body: uComments[comm.ID].Body,
CommentURL: uComments[comm.ID].CommentURL,
CommentID: uComments[comm.ID].CommentID,
Contributor: uComments[comm.ID].Contributor,
Body: uComments[comm.EntityID].Body,
CommentURL: uComments[comm.EntityID].CommentURL,
CommentID: uComments[comm.EntityID].CommentID,
Contributor: uComments[comm.EntityID].Contributor,
SyncTimestamp: time.Now(),
SourceTimestamp: uComments[comm.ID].SourceTimestamp,
SourceTimestamp: uComments[comm.EntityID].SourceTimestamp,
},
}
key := "comment_edited"
Expand All @@ -600,25 +594,17 @@ func (j *DSJira) GetModelData(ctx *shared.Ctx, docs []interface{}) (map[string][
data[key] = ary
}
}
if len(comments) > 0 {
var updatedComments IssueComments
for _, comm := range uComments {
updatedComments.Comments = append(updatedComments.Comments, IssueComment{
ID: comm.ID,
Body: comm.Body,
})
}
b, er := json.Marshal(updatedComments)
if er != nil {
err = er
j.log.WithFields(logrus.Fields{"operation": "GetModelData"}).Errorf("error marshal updated issue comments cache. comments data: %+v, error: %v", updatedComments, err)
return data, err
}
if err = j.cacheProvider.UpdateFileByKey(fmt.Sprintf("%s/%s", j.endpoint, JiraIssue), commentsCacheID, b); err != nil {
j.log.WithFields(logrus.Fields{"operation": "GetModelData"}).Errorf("UpdateFileByKey error update issue comments cache. path: %s, cache id: %s, comments data: %v, error: %v", fmt.Sprintf("%s/%s/", j.endpoint, JiraIssue), commentsCacheID, b, err)
return data, err
}
updatedComments := make([]EntityCache, 0)
for _, c := range uComments {
updatedComments = append(updatedComments, EntityCache{
Timestamp: fmt.Sprintf("%v", c.SyncTimestamp.Unix()),
EntityID: c.ID,
SourceEntityID: c.CommentID,
Hash: fmt.Sprintf("%x", sha256.Sum256([]byte(c.Body))),
Orphaned: false,
})
}
cachedComments[issueID] = updatedComments
sourceTimestamp := createdOn
if updatedOn.After(createdOn) {
sourceTimestamp = updatedOn
Expand Down Expand Up @@ -646,8 +632,7 @@ func (j *DSJira) GetModelData(ctx *shared.Ctx, docs []interface{}) (map[string][
Orphaned: false,
},
}
cacheID := fmt.Sprintf("%s-%s", JiraIssue, issueID)
isCreated, err := j.cacheProvider.IsKeyCreated(fmt.Sprintf("%s/%s", j.endpoint, JiraIssue), cacheID)
isCreated := isKeyCreated(issueID)
if err != nil {
j.log.WithFields(logrus.Fields{"operation": "GetModelDataPullRequest"}).Errorf("error getting cache for endpoint %s/%s. error: %+v", j.endpoint, JiraIssue, err)
return data, err
Expand Down Expand Up @@ -695,33 +680,56 @@ func (j *DSJira) OutputDocs(ctx *shared.Ctx, items []interface{}, docs *[]interf
switch k {
case "created":
ev, _ := v[0].(jira.IssueCreatedEvent)
err = j.Publisher.PushEvents(ev.Event(), insightsStr, JiraDataSource, issuesStr, envStr, v)
path, err := j.Publisher.PushEvents(ev.Event(), insightsStr, JiraDataSource, issuesStr, envStr, v)
for _, val := range v {
id := fmt.Sprintf("%s-%s", "issue", val.(jira.IssueCreatedEvent).Payload.ID)
data = append(data, map[string]interface{}{
"id": id,
"data": "",
})
payload := val.(jira.IssueCreatedEvent).Payload
b, er := json.Marshal(val.(jira.IssueCreatedEvent))
if er != nil {
j.log.WithFields(logrus.Fields{"operation": "GitEnrichItems"}).Errorf("error marshal data for issue %s, error %v", payload.IssueID, err)
continue
}

tStamp := payload.SyncTimestamp.Unix()
contentHash := fmt.Sprintf("%x", sha256.Sum256(b))
cachedIssues[payload.ID] = EntityCache{
Timestamp: fmt.Sprintf("%v", tStamp),
EntityID: payload.ID,
SourceEntityID: payload.IssueID,
Hash: contentHash,
FileLocation: path,
}
}
case "updated":
ev, _ := v[0].(jira.IssueUpdatedEvent)
err = j.Publisher.PushEvents(ev.Event(), insightsStr, JiraDataSource, issuesStr, envStr, v)
_, err = j.Publisher.PushEvents(ev.Event(), insightsStr, JiraDataSource, issuesStr, envStr, v)
case "comment_added":
ev, _ := v[0].(jira.IssueCommentAddedEvent)
err = j.Publisher.PushEvents(ev.Event(), insightsStr, JiraDataSource, issuesStr, envStr, v)
_, err = j.Publisher.PushEvents(ev.Event(), insightsStr, JiraDataSource, issuesStr, envStr, v)
case "comment_edited":
ev, _ := v[0].(jira.IssueCommentEditedEvent)
err = j.Publisher.PushEvents(ev.Event(), insightsStr, JiraDataSource, issuesStr, envStr, v)
_, err = j.Publisher.PushEvents(ev.Event(), insightsStr, JiraDataSource, issuesStr, envStr, v)
case "comment_deleted":
ev, _ := v[0].(jira.IssueCommentDeletedEvent)
err = j.Publisher.PushEvents(ev.Event(), insightsStr, JiraDataSource, issuesStr, envStr, v)
_, err = j.Publisher.PushEvents(ev.Event(), insightsStr, JiraDataSource, issuesStr, envStr, v)
default:
err = fmt.Errorf("unknown jira issue event type '%s'", k)
}
if err != nil {
break
}
}
err = j.createCacheFile([]EntityCache{}, "")
if err != nil {
return
}

comB, err := jsoniter.Marshal(cachedComments)
if err != nil {
return
}
if err = j.cacheProvider.UpdateFileByKey(j.endpoint, commentsCacheFile, comB); err != nil {
return
}
err = j.cacheProvider.Create(fmt.Sprintf("%s/%s", j.endpoint, JiraIssue), data)
if err != nil {
j.log.WithFields(logrus.Fields{"operation": "OutputDocs"}).Errorf("error creating cache for endpoint %s/%s. Error: %+v", j.endpoint, JiraIssue, err)
Expand Down Expand Up @@ -1816,6 +1824,10 @@ func (j *DSJira) Sync(ctx *shared.Ctx) (err error) {
j.log.WithFields(logrus.Fields{"operation": "Sync"}).Infof("%s resuming from %v (%d threads)", j.Endpoint(ctx), ctx.DateFrom, thrN)
}
}
j.getCachedIssues()
if err = j.getCachedComments(); err != nil {
return
}

if ctx.DateTo != nil {
j.log.WithFields(logrus.Fields{"operation": "Sync"}).Infof("%s fetching till %v (%d threads)", j.Endpoint(ctx), ctx.DateTo, thrN)
Expand Down Expand Up @@ -2150,7 +2162,7 @@ func (j *DSJira) createStructuredLogger() {

// AddCacheProvider - adds cache provider
func (j *DSJira) AddCacheProvider() {
cacheProvider := cache.NewManager(JiraDataSource, os.Getenv("STAGE"))
cacheProvider := cache.NewManager(fmt.Sprintf("v2/%s", JiraDataSource), os.Getenv("STAGE"))
j.cacheProvider = *cacheProvider
}

Expand Down Expand Up @@ -2178,6 +2190,111 @@ func (j *DSJira) getProjects() ([]project, error) {
return projectsRes, nil
}

func (j *DSJira) getCachedIssues() {
comB, err := j.cacheProvider.GetFileByKey(j.endpoint, IssuesCacheFile)
if err != nil {
return
}
reader := csv.NewReader(bytes.NewBuffer(comB))
records, err := reader.ReadAll()
if err != nil {
return
}
for i, record := range records {
if i == 0 {
continue
}
orphaned, err := strconv.ParseBool(record[5])
if err != nil {
orphaned = false
}

cachedIssues[record[1]] = EntityCache{
Timestamp: record[0],
EntityID: record[1],
SourceEntityID: record[2],
FileLocation: record[3],
Hash: record[4],
Orphaned: orphaned,
}
}
}

func (j *DSJira) getCachedComments() error {
commentsB, err := j.cacheProvider.GetFileByKey(j.endpoint, commentsCacheFile)
records := make(map[string][]EntityCache)
if commentsB != nil {
if err = json.Unmarshal(commentsB, &records); err != nil {
return err
}
}
for key, val := range records {
cachedComments[key] = val
}
return nil
}

func (j *DSJira) createCacheFile(cache []EntityCache, path string) error {
for _, comm := range cache {
comm.FileLocation = path
cachedIssues[comm.EntityID] = comm
}
records := [][]string{
{"timestamp", "entity_id", "source_entity_id", "file_location", "hash", "orphaned"},
}
for _, c := range cachedIssues {
records = append(records, []string{c.Timestamp, c.EntityID, c.SourceEntityID, c.FileLocation, c.Hash, strconv.FormatBool(c.Orphaned)})
}

csvFile, err := os.Create(IssuesCacheFile)
if err != nil {
return err
}

w := csv.NewWriter(csvFile)
err = w.WriteAll(records)
if err != nil {
return err
}
err = csvFile.Close()
if err != nil {
return err
}
file, err := os.ReadFile(IssuesCacheFile)
if err != nil {
return err
}
err = os.Remove(IssuesCacheFile)
if err != nil {
return err
}
err = j.cacheProvider.UpdateFileByKey(j.endpoint, IssuesCacheFile, file)
if err != nil {
return err
}

return nil
}

func isKeyCreated(id string) bool {
c, ok := cachedIssues[id]
if ok {
cachedIssues[id] = c
return true
}
return false
}

// EntityCache single commit cache schema
type EntityCache struct {
Timestamp string `json:"timestamp"`
EntityID string `json:"entity_id"`
SourceEntityID string `json:"source_entity_id"`
FileLocation string `json:"file_location"`
Hash string `json:"hash"`
Orphaned bool `json:"orphaned"`
}

type project struct {
ID string `json:"id"`
Key string `json:"key"`
Expand Down
10 changes: 4 additions & 6 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ go 1.17

require (
github.com/LF-Engineering/insights-datasource-shared v1.5.20
github.com/LF-Engineering/lfx-event-schema v0.1.29-0.20220814063134-d194fa6ec2ef
github.com/LF-Engineering/lfx-event-schema v0.1.36
github.com/aws/aws-lambda-go v1.28.0
github.com/aws/aws-sdk-go v1.43.4
github.com/json-iterator/go v1.1.12
Expand Down Expand Up @@ -44,18 +44,16 @@ require (
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/philhofer/fwd v1.1.1 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/stretchr/testify v1.7.1 // indirect
github.com/tinylib/msgp v1.1.2 // indirect
go4.org/intern v0.0.0-20211027215823-ae77deb06f29 // indirect
go4.org/unsafe/assume-no-moving-gc v0.0.0-20220617031537-928513b29760 // indirect
golang.org/x/crypto v0.0.0-20220214200702-86341886e292 // indirect
golang.org/x/net v0.0.0-20220722155237-a158d28d115b // indirect
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f // indirect
golang.org/x/lint v0.0.0-20210508222113-6edffad5e616 // indirect
golang.org/x/sys v0.4.0 // indirect
golang.org/x/text v0.3.7 // indirect
golang.org/x/time v0.0.0-20211116232009-f0f3c7e86c11 // indirect
golang.org/x/tools v0.5.0 // indirect
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect
google.golang.org/protobuf v1.28.0 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
inet.af/netaddr v0.0.0-20220617031823-097006376321 // indirect
)
Loading

0 comments on commit f8a6868

Please sign in to comment.