Skip to content

Commit

Permalink
dq redis transaction
Browse files Browse the repository at this point in the history
  • Loading branch information
smallfish committed Nov 11, 2021
1 parent b5f742f commit 3682a1a
Showing 1 changed file with 28 additions and 7 deletions.
35 changes: 28 additions & 7 deletions xdelay_queue/delay_queue.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package xdelay_queue

import (
"encoding/json"
"github.com/go-redis/redis"
"github.com/pkg/errors"
"github.com/smallfish-root/common-pkg/xredis"
Expand All @@ -25,22 +26,42 @@ func GetDelayQueue(alias string) *DelayQueue {
return delayQueue[alias]
}

//func (dq *DelayQueue) AddJob(job *Job) error {
// if job.Id == "" || job.Topic == "" || job.Delay < 0 || job.TTR <= 0 {
// return errors.New("invalid job")
// }
//
// err := dq.addJob(job.Id, job)
// if err != nil {
// return errors.WithStack(err)
// }
//
// err = dq.addJobToBucketZ(job.Delay, job.Id)
// if err != nil {
// return errors.WithStack(err)
// }
//
// return nil
//}

// AddJob transaction
func (dq *DelayQueue) AddJob(job *Job) error {
if job.Id == "" || job.Topic == "" || job.Delay < 0 || job.TTR <= 0 {
return errors.New("invalid job")
}

err := dq.addJob(job.Id, job)
value, err := json.Marshal(job)
if err != nil {
return errors.WithStack(err)
}

err = dq.addJobToBucketZ(job.Delay, job.Id)
if err != nil {
return errors.WithStack(err)
}

return nil
return redis.NewScript(`
redis.call("SET", KEYS[1], ARGV[1])
redis.call("ZADD", KEYS[2], ARGV[2])
`).Run(dq.Client, []string{job.Id, <- dq.bucketNameChan}, value, redis.Z{
Score: float64(job.Delay),
Member: job.Id,
}).Err()
}

func (dq *DelayQueue) GetJob(topics []string) (*Job, error) {
Expand Down

0 comments on commit 3682a1a

Please sign in to comment.