Skip to content

Commit

Permalink
fix: fix populating results from channel (#349)
Browse files Browse the repository at this point in the history
  • Loading branch information
pratik151192 authored Aug 31, 2023
1 parent 41396e7 commit b862c91
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 34 deletions.
34 changes: 17 additions & 17 deletions batchutils/batch_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,7 @@ func getWorker(
client momento.CacheClient,
cacheName string,
keyChan chan momento.Key,
errChan chan *errKeyVal,
getChan chan *getKeyResp,
resultChan chan *getResultOrError,
) {
for {
myKey := <-keyChan
Expand All @@ -33,17 +32,15 @@ func getWorker(
Key: myKey,
})
if err != nil {
getChan <- nil
errChan <- &errKeyVal{
resultChan <- &getResultOrError{err: &errKeyVal{
key: myKey,
error: err,
}
}}
} else {
errChan <- nil
getChan <- &getKeyResp{
resultChan <- &getResultOrError{result: &getKeyResp{
key: myKey,
resp: getResponse,
}
}}
}
}
}
Expand All @@ -63,6 +60,11 @@ type BatchGetError struct {
errors map[momento.Value]error
}

type getResultOrError struct {
result *getKeyResp
err *errKeyVal
}

func (e *BatchGetError) Error() string {
return "errors occurred during batch delete"
}
Expand Down Expand Up @@ -95,15 +97,14 @@ func BatchGet(ctx context.Context, props *BatchGetRequest) (*BatchGetResponse, *
props.MaxConcurrentGets = len(props.Keys)
}
keyChan := make(chan momento.Key, props.MaxConcurrentGets)
errChan := make(chan *errKeyVal, len(props.Keys))
getChan := make(chan *getKeyResp, len(props.Keys))
resultChan := make(chan *getResultOrError, len(props.Keys))

for i := 0; i < props.MaxConcurrentGets; i++ {
wg.Add(1)

go func() {
defer wg.Done()
getWorker(ctx, props.Client, props.CacheName, keyChan, errChan, getChan)
getWorker(ctx, props.Client, props.CacheName, keyChan, resultChan)
}()
}

Expand All @@ -115,12 +116,11 @@ func BatchGet(ctx context.Context, props *BatchGetRequest) (*BatchGetResponse, *
var errors = make(map[momento.Value]error, 0)
var results = make(map[momento.Value]responses.GetResponse, 0)
for i := 0; i < len(props.Keys); i++ {
res := <-getChan
err := <-errChan
if res != nil {
results[res.key] = res.resp
} else if err != nil {
errors[err.key] = err.error
resOrError := <-resultChan
if resOrError.result != nil {
results[resOrError.result.key] = resOrError.result.resp
} else if resOrError.err != nil {
errors[resOrError.err.key] = resOrError.err.error
}
}

Expand Down
36 changes: 19 additions & 17 deletions batchutils/batch_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@ func setWorker(
client momento.CacheClient,
cacheName string,
itemChan chan BatchSetItem,
errChan chan *errKeyVal,
setChan chan *setKeyResp,
resultChan chan *setResultOrError,
) {
for {
item := <-itemChan
Expand All @@ -35,18 +34,17 @@ func setWorker(
Value: item.Value,
Ttl: item.Ttl,
})

if err != nil {
setChan <- nil
errChan <- &errKeyVal{
resultChan <- &setResultOrError{err: &errKeyVal{
key: item.Key,
error: err,
}
}}
} else {
errChan <- nil
setChan <- &setKeyResp{
resultChan <- &setResultOrError{result: &setKeyResp{
key: item.Key,
resp: setResponse,
}
}}
}
}
}
Expand All @@ -64,6 +62,11 @@ type BatchSetItem struct {
Ttl time.Duration
}

type setResultOrError struct {
result *setKeyResp
err *errKeyVal
}

// BatchSetError contains a map associating failing cache keys with their specific errors.
// It may be necessary to use a type assertion to access the errors:
//
Expand Down Expand Up @@ -120,15 +123,14 @@ func BatchSet(ctx context.Context, props *BatchSetRequest) (*BatchSetResponse, *
props.MaxConcurrentSets = len(props.Items)
}
itemChan := make(chan BatchSetItem, props.MaxConcurrentSets)
errChan := make(chan *errKeyVal, len(props.Items))
setChan := make(chan *setKeyResp, len(props.Items))
resultChan := make(chan *setResultOrError, len(props.Items))

for i := 0; i < props.MaxConcurrentSets; i++ {
wg.Add(1)

go func() {
defer wg.Done()
setWorker(ctx, props.Client, props.CacheName, itemChan, errChan, setChan)
setWorker(ctx, props.Client, props.CacheName, itemChan, resultChan)
}()
}

Expand All @@ -139,13 +141,13 @@ func BatchSet(ctx context.Context, props *BatchSetRequest) (*BatchSetResponse, *

var errors = make(map[momento.Value]error, 0)
var results = make(map[momento.Value]responses.SetResponse, 0)

for i := 0; i < len(props.Items); i++ {
res := <-setChan
err := <-errChan
if res != nil {
results[res.key] = res.resp
} else if err != nil {
errors[err.key] = err.error
resOrErr := <-resultChan
if resOrErr.result != nil {
results[resOrErr.result.key] = resOrErr.result.resp
} else if resOrErr.err != nil {
errors[resOrErr.err.key] = resOrErr.err.error
}
}

Expand Down
1 change: 1 addition & 0 deletions batchutils/batch_set_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,7 @@ var _ = Describe("Batch set operations", func() {
Items: items,
})

Expect(len(setBatch.Responses())).To(Equal(len(batchSetSuccessfulKeys)))
// Assuming errors is an instance of *BatchSetError
Expect(len(errors.Errors())).To(Equal(len(batchSetErrorKeys)))
for v, e := range errors.Errors() {
Expand Down

0 comments on commit b862c91

Please sign in to comment.