diff --git a/cmd/root.go b/cmd/root.go index 82d3e88..d594b54 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -297,10 +297,12 @@ func mustInitializeQueueBackend() { queueBackend, err = backendfactory.NewBackend(logger, backendconfig.Config{ BackendType: cmdOpts.Backend, Redis: &backendconfig.RedisConfig{ - KeyPrefix: cmdOpts.Redis.KeyPrefix, - Client: cmdOpts.Redis.NewClient(), - Backoff: cmdOpts.Redis.Backoff, - ChunkSizeInGet: cmdOpts.Redis.ChunkSizeInGet, + KeyPrefix: cmdOpts.Redis.KeyPrefix, + Client: cmdOpts.Redis.NewClient(), + Backoff: cmdOpts.Redis.Backoff, + ChunkSizeInGet: cmdOpts.Redis.ChunkSizeInGet, + ChunkSizeInDelete: cmdOpts.Redis.ChunkSizeInDelete, + WithoutTransaction: cmdOpts.Redis.WithoutTransaction, }, }) diff --git a/pkg/backend/config/config.go b/pkg/backend/config/config.go index 1d148ff..74b9bb0 100644 --- a/pkg/backend/config/config.go +++ b/pkg/backend/config/config.go @@ -30,10 +30,12 @@ type Config struct { } type RedisConfig struct { - KeyPrefix string - Client *redis.Client - Backoff BackoffConfig - ChunkSizeInGet int + KeyPrefix string + Client *redis.Client + Backoff BackoffConfig + ChunkSizeInGet int + ChunkSizeInDelete int + WithoutTransaction bool } // TODO: support UniversalOptions @@ -52,7 +54,9 @@ type RedisClientConfig struct { IdleTimeout time.Duration `json:"idleTimeout" yaml:"idleTimeout" default:"5m"` IdleCheckFrequency time.Duration `json:"idleCheckFrequency" yaml:"idleCheckFrequency" default:"1m"` - ChunkSizeInGet int `json:"chunkSizeInGet" yaml:"chunkSizeInGet" default:"10000"` + ChunkSizeInGet int `json:"chunkSizeInGet" yaml:"chunkSizeInGet" default:"10000"` + ChunkSizeInDelete int `json:"chunkSizeInDelete" yaml:"chunkSizeInDelete" default:"1000"` + WithoutTransaction bool `json:"withoutTransaction" yaml:"withoutTransaction" default:"false"` } func (c RedisClientConfig) NewClient() *redis.Client { diff --git a/pkg/backend/iface/backend.go b/pkg/backend/iface/backend.go index 69b9a6f..aacabb0 100644 --- a/pkg/backend/iface/backend.go +++ b/pkg/backend/iface/backend.go @@ -32,6 +32,7 @@ var ( TaskQueueNotFound = errors.New("Queue not found") TaskQueueExisted = errors.New("Queue already exists") TaskQueueEmptyError = errors.New("Queue is empty") + TaskQueueIsTooLarge = errors.New("Queue have many tasks so we need --without_transaction option") TaskSuspendedError = errors.New("Queue is suspended") WorkerNotFound = errors.New("Worker not found") WorkerExitedError = errors.New("Worker already exists") diff --git a/pkg/backend/redis/queue.go b/pkg/backend/redis/queue.go index 44ea134..90ec6a4 100644 --- a/pkg/backend/redis/queue.go +++ b/pkg/backend/redis/queue.go @@ -204,7 +204,7 @@ func (b *Backend) UpdateQueue(ctx context.Context, queueSpec taskqueue.TaskQueue return queue, nil } -func (b *Backend) DeleteQueue(ctx context.Context, queueName string) error { +func (b *Backend) deleteQueueWithTransaction(ctx context.Context, queueName string) error { if err := taskqueue.ValidateQueueName(queueName); err != nil { return err } @@ -212,6 +212,16 @@ func (b *Backend) DeleteQueue(ctx context.Context, queueName string) error { if err != nil { return err } + + numOfTasks, err := b.Client.SCard(b.tasksKey(queue.UID.String())).Result() + if err != nil { + return err + } + + if int64(b.ChunkSizeInDelete) <= numOfTasks { + return iface.TaskQueueIsTooLarge + } + // WATCH {all_queues_key} {queue_key} // .. worker_keys = collect worker keys // WATCH worker_keys @@ -252,13 +262,108 @@ func (b *Backend) DeleteQueue(ctx context.Context, queueName string) error { b.Logger.With(). Str("queueName", queueName). Str("queueUID", queue.UID.String()). - Str("operation", "DeleteQueue"). + Str("operation", "DeleteQueueWithTransaction"). Logger(), txf, b.allQueuesKey(), b.queueKey(queue.UID.String()), ) } +func (b *Backend) deleteQueueWithoutTransaction(ctx context.Context, queueName string) error { + if err := taskqueue.ValidateQueueName(queueName); err != nil { + return err + } + queue, err := b.ensureQueueExistsByName(b.Client, queueName) + if err != nil { + return err + } + // + // .. worker_keys = collect worker keys + // .. task_keys = collect task keys + // Use chunk to divide and loop until UNLINK all keys + // ---loop start--- + // WATCH {all_queues_key} {queue_key} + // UNLINK chulk_keys + // ---loop end--- + // WATCH {all_queues_key} {queue_key} + // MULTI + // DEL {queue_key} worker_keys task_keys + // HDEL {all_queues_key} {queueName} + // EXEC + keysToDelete := []string{} + + workerKeysToDelete, err := b.allWorkersKeysForDeleteQueue(b.Client, queue.UID.String()) + if err != nil { + return err + } + keysToDelete = append(keysToDelete, workerKeysToDelete...) + + taskKeysToDelete, err := b.allTasksKeysForDeleteQueue(b.Client, queue.UID.String()) + if err != nil { + return err + } + keysToDelete = append(keysToDelete, taskKeysToDelete...) + + // chunk delete + for cursor := 0; cursor < len(keysToDelete); cursor += b.ChunkSizeInDelete { + time.Sleep(100 * time.Millisecond) + end := cursor + b.ChunkSizeInDelete + if len(keysToDelete) <= end { + end = len(keysToDelete) + } + + deleteKeys := keysToDelete[cursor:end] + if len(deleteKeys) == 0 { + continue + } + err = b.runTxWithBackOff( + ctx, + b.Logger.With(). + Str("queueName", queueName). + Str("queueUID", queue.UID.String()). + Str("operation", "DeleteQueueWithTransaction"). + Logger(), + func(tx *redis.Tx) error { + _, err = tx.Unlink(deleteKeys...).Result() + return err + }, + b.allQueuesKey(), b.queueKey(queue.UID.String()), + ) + if err != nil { + return err + } + } + + err = b.runTxWithBackOff( + ctx, + b.Logger.With(). + Str("queueName", queueName). + Str("queueUID", queue.UID.String()). + Str("operation", "DeleteQueueWithTransaction"). + Logger(), + func(tx *redis.Tx) error { + _, err = tx.Unlink(b.queueKey(queue.UID.String())).Result() + if err != nil { + return err + } + + _, err = tx.HDel(b.allQueuesKey(), queue.Spec.Name).Result() + return err + }, + b.allQueuesKey(), b.queueKey(queue.UID.String()), + ) + + return err +} + +func (b *Backend) DeleteQueue(ctx context.Context, queueName string) error { + if b.WithoutTransaction { + return b.deleteQueueWithoutTransaction(ctx, queueName) + } else { + return b.deleteQueueWithTransaction(ctx, queueName) + } +} + func (b *Backend) ensureQueueExistsByName(rds redis.Cmdable, queueName string) (*taskqueue.TaskQueue, error) { uid, err := b.lookupQueueUID(rds, queueName) if err != nil { diff --git a/pkg/backend/redis/redis_test.go b/pkg/backend/redis/redis_test.go index 36ce2c5..c8e864a 100644 --- a/pkg/backend/redis/redis_test.go +++ b/pkg/backend/redis/redis_test.go @@ -107,10 +107,11 @@ var _ = Describe("Backend", func() { ibackend, err := NewBackend(logger, backendconfig.Config{ BackendType: "redis", Redis: &backendconfig.RedisConfig{ - KeyPrefix: "test", - Client: client, - Backoff: backoffConfig, - ChunkSizeInGet: 1000, + KeyPrefix: "test", + Client: client, + Backoff: backoffConfig, + ChunkSizeInGet: 1000, + ChunkSizeInDelete: 1000, }, }) Expect(err).NotTo(HaveOccurred()) @@ -277,6 +278,20 @@ var _ = Describe("Backend", func() { Expect(err).To(Equal(iface.TaskQueueNotFound)) }) }) + When("the large queue exists", func() { + It("can delete the queue", func() { + testutil.MustCreateQueue(backend, SampleQueueSpec) + // numOfTasks % chunkSize != 0 && numOfTasks > chunkSize + numOfTasks := 1000 // numOfTasks < ChunkSizeInDelete + for i := 0; i < numOfTasks; i++ { + _, err := backend.AddTask(context.Background(), QueueName, SampleTaskSpec) + Expect(err).NotTo(HaveOccurred()) + } + + err := backend.DeleteQueue(context.Background(), SampleQueueSpec.Name) + Expect(err).To(Equal(iface.TaskQueueIsTooLarge)) + }) + }) }) }) @@ -1032,3 +1047,61 @@ var _ = Describe("Backend", func() { }) }) }) + +var _ = Describe("BackendWihoutTransaction", func() { + var backend *Backend + BeforeEach(func() { + var err error + backoffConfig := backendconfig.DefaultBackoffConfig() + backoffConfig.MaxRetry = 0 + ibackend, err := NewBackend(logger, backendconfig.Config{ + BackendType: "redis", + Redis: &backendconfig.RedisConfig{ + KeyPrefix: "test", + Client: client, + Backoff: backoffConfig, + ChunkSizeInGet: 1000, + ChunkSizeInDelete: 1000, + WithoutTransaction: true, + }, + }) + Expect(err).NotTo(HaveOccurred()) + backend, _ = ibackend.(*Backend) + }) + + AfterEach(func() { + keys, err := client.Keys("*").Result() + Expect(err).NotTo(HaveOccurred()) + if len(keys) > 0 { + num, err := client.Del(keys...).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(num).To(Equal(int64(len(keys)))) + } + }) + + Context("Queue Operation", func() { + Context("DeleteQueue", func() { + When("the large queue exists", func() { + It("can delete the queue", func() { + queue := testutil.MustCreateQueue(backend, SampleQueueSpec) + // numOfTasks % chunkSize != 0 && numOfTasks > chunkSize + // numOfTaksk <= chunSize + numOfTasks := 1000 + for i := 0; i < numOfTasks; i++ { + _, err := backend.AddTask(context.Background(), QueueName, SampleTaskSpec) + Expect(err).NotTo(HaveOccurred()) + } + + Expect(backend.DeleteQueue(context.Background(), SampleQueueSpec.Name)).NotTo(HaveOccurred()) + + queuesHash, err := client.HGetAll(backend.allQueuesKey()).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(len(queuesHash)).To(Equal(0)) + keys, err := client.Keys(backend.queueKey(queue.UID.String()) + "*").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(len(keys)).To(Equal(0)) + }) + }) + }) + }) +}) diff --git a/pkg/backend/redis/task.go b/pkg/backend/redis/task.go index dec1c6d..e90a6a2 100644 --- a/pkg/backend/redis/task.go +++ b/pkg/backend/redis/task.go @@ -162,10 +162,7 @@ func (b *Backend) getTasksByUIDs(queueUID string, taskUIDs []string, filter func } func (b *Backend) getTasks(queueUID string, filter func(*task.Task) bool, lggr zerolog.Logger) ([]*task.Task, error) { - taskUIDs, err := b.Client.SMembers(b.tasksKey(queueUID)).Result() - if err == redis.Nil { - return []*task.Task{}, nil - } + taskUIDs, err := b.allTaskUIDsByQueueUID(b.Client, queueUID) if err != nil { return nil, err } @@ -938,10 +935,7 @@ func (b *Backend) allTasksKeysForDeleteQueue(rds redis.Cmdable, queueUID string) b.deadletterQueueKey(queueUID), b.pendingTaskQueueKey(queueUID), } - taskUIDs, err := rds.SMembers(b.tasksKey(queueUID)).Result() - if err == redis.Nil { - return []string{}, nil - } + taskUIDs, err := b.allTaskUIDsByQueueUID(rds, queueUID) if err != nil { return []string{}, err } @@ -950,3 +944,24 @@ func (b *Backend) allTasksKeysForDeleteQueue(rds redis.Cmdable, queueUID string) } return keysToDelete, nil } + +func (b *Backend) allTaskUIDsByQueueUID(rds redis.Cmdable, queueUID string) ([]string, error) { + var chunkSize = int64(b.ChunkSizeInGet) + var cursor uint64 + var taskUIDs []string + for { + keys, nextCursor, err := rds.SScan(b.tasksKey(queueUID), cursor, "", chunkSize).Result() + if err == redis.Nil { + return []string{}, nil + } + if err != nil { + return []string{}, err + } + taskUIDs = append(taskUIDs, keys...) + cursor = nextCursor + if cursor == 0 { + break + } + } + return taskUIDs, nil +} diff --git a/pkg/backend/redis/worker.go b/pkg/backend/redis/worker.go index 79afea5..c205cae 100644 --- a/pkg/backend/redis/worker.go +++ b/pkg/backend/redis/worker.go @@ -483,7 +483,7 @@ func (b *Backend) ensureWorkerExistsByUID(rds redis.Cmdable, queue *taskqueue.Ta } func (b *Backend) allWorkersKeysForDeleteQueue(rds redis.Cmdable, queueUID string) ([]string, error) { - keysToDelete := []string{b.workersKey(queueUID)} + keysToDelete := []string{} workerUIDs, err := rds.SMembers(b.workersKey(queueUID)).Result() if err == redis.Nil { return []string{}, nil @@ -497,5 +497,8 @@ func (b *Backend) allWorkersKeysForDeleteQueue(rds redis.Cmdable, queueUID strin b.workerPendingTaskQueueKey(queueUID, workerUID), b.workerTasksKey(queueUID, workerUID)) } + + // If you are not using transactions, you need to delete the wokers key at the end. + keysToDelete = append(keysToDelete, b.workersKey(queueUID)) return keysToDelete, nil } diff --git a/pkg/worker/worker_test.go b/pkg/worker/worker_test.go index e3b67fc..5f32855 100644 --- a/pkg/worker/worker_test.go +++ b/pkg/worker/worker_test.go @@ -74,9 +74,10 @@ var _ = Describe("Worker", func() { bcknd, err = backendfactory.NewBackend(logger, backendconfig.Config{ BackendType: "redis", Redis: &backendconfig.RedisConfig{ - Client: client, - Backoff: backendConfig, - ChunkSizeInGet: 1000, + Client: client, + Backoff: backendConfig, + ChunkSizeInGet: 1000, + ChunkSizeInDelete: 1000, }, }) Expect(err).NotTo(HaveOccurred())