From ce9b7240113467af1ed85b1508c02fd00c27991c Mon Sep 17 00:00:00 2001 From: Michael Landis Date: Wed, 28 Aug 2024 15:01:28 -0700 Subject: [PATCH] feat: add the unlink command (#4) Adds the Lettuce `unlink` command, which is similar to `delete`. Because the command takes variadic arguments, we fan out the delete requests. In the future we can hook up our rate limiting to this section. --- .../lettuce/MomentoRedisReactiveClient.java | 29 ++++++++++- .../lettuce/MomentoRedisReactiveCommands.java | 2 + .../java/momento/lettuce/ItemCommandTest.java | 50 +++++++++++++++++++ .../momento/lettuce/ScalarCommandTest.java | 2 +- .../RedisCodecByteArrayConverterTest.java | 2 +- 5 files changed, 82 insertions(+), 3 deletions(-) create mode 100644 src/test/java/momento/lettuce/ItemCommandTest.java diff --git a/src/main/java/momento/lettuce/MomentoRedisReactiveClient.java b/src/main/java/momento/lettuce/MomentoRedisReactiveClient.java index 8dd8a78..0a7386e 100644 --- a/src/main/java/momento/lettuce/MomentoRedisReactiveClient.java +++ b/src/main/java/momento/lettuce/MomentoRedisReactiveClient.java @@ -76,15 +76,18 @@ import io.netty.util.concurrent.ImmediateEventExecutor; import java.time.Duration; import java.time.Instant; +import java.util.Arrays; import java.util.Date; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; import momento.lettuce.utils.MomentoLettuceExceptionMapper; import momento.lettuce.utils.RedisCodecByteArrayConverter; import momento.lettuce.utils.RedisResponse; import momento.sdk.CacheClient; +import momento.sdk.responses.cache.DeleteResponse; import momento.sdk.responses.cache.GetResponse; import momento.sdk.responses.cache.SetResponse; import reactor.core.publisher.Flux; @@ -1018,7 +1021,31 @@ public Mono del(K... ks) { @Override public Mono unlink(K... ks) { - return null; + // Delete the keys from Momento + var deleteFutures = + Arrays.stream(ks) + .map(k -> client.delete(cacheName, codec.encodeKeyToBytes(k))) + .collect(Collectors.toList()); + + // Wait for all the delete commands to complete + var compositeFuture = CompletableFuture.allOf(deleteFutures.toArray(new CompletableFuture[0])); + return Mono.fromFuture(compositeFuture) + .then( + Mono.defer( + () -> { + var deletedKeys = + deleteFutures.stream() + .map(CompletableFuture::join) + .collect(Collectors.toList()); + + // If any of the delete commands was an error, then return an error. + for (var deleteResponse : deletedKeys) { + if (deleteResponse instanceof DeleteResponse.Error error) { + return Mono.error(MomentoLettuceExceptionMapper.mapException(error)); + } + } + return Mono.just((long) ks.length); + })); } @Override diff --git a/src/main/java/momento/lettuce/MomentoRedisReactiveCommands.java b/src/main/java/momento/lettuce/MomentoRedisReactiveCommands.java index a5c6342..7fefbdc 100644 --- a/src/main/java/momento/lettuce/MomentoRedisReactiveCommands.java +++ b/src/main/java/momento/lettuce/MomentoRedisReactiveCommands.java @@ -14,4 +14,6 @@ public interface MomentoRedisReactiveCommands { Mono get(K k); Mono set(K k, V v); + + Mono unlink(K... ks); } diff --git a/src/test/java/momento/lettuce/ItemCommandTest.java b/src/test/java/momento/lettuce/ItemCommandTest.java new file mode 100644 index 0000000..85f2298 --- /dev/null +++ b/src/test/java/momento/lettuce/ItemCommandTest.java @@ -0,0 +1,50 @@ +package momento.lettuce; + +import static momento.lettuce.TestUtils.randomString; +import static org.junit.jupiter.api.Assertions.assertEquals; + +import org.junit.jupiter.api.Test; + +final class ItemCommandTest extends BaseTestClass { + @Test + public void testUnlink() { + // Set 3 keys in the cache + var key1 = randomString(); + var value1 = randomString(); + var setResponse1 = client.set(key1, value1).block(); + assertEquals("OK", setResponse1); + + var key2 = randomString(); + var value2 = randomString(); + var setResponse2 = client.set(key2, value2).block(); + assertEquals("OK", setResponse2); + + var key3 = randomString(); + var value3 = randomString(); + var setResponse3 = client.set(key3, value3).block(); + assertEquals("OK", setResponse3); + + // Generate a random key for good measure + var key4 = randomString(); + + // Go unlink 2 of them and one that isn't there + var unlinkResponse = client.unlink(key1, key2, key4).block(); + + // Since Redis tells you which keys were removed, we can get the exact number. + if (isRedisTest()) { + assertEquals(2, unlinkResponse); + } else { + assertEquals(3, unlinkResponse); + } + + // Verify 2 are gone but key3 still there + var storedValue1 = client.get(key1).block(); + assertEquals(null, storedValue1); + + var storedValue2 = client.get(key2).block(); + assertEquals(null, storedValue2); + + var storedValue3 = client.get(key3).block(); + assertEquals(value3, storedValue3); + } +} diff --git a/src/test/java/momento/lettuce/ScalarCommandTest.java b/src/test/java/momento/lettuce/ScalarCommandTest.java index e95bbee..3c9c12e 100644 --- a/src/test/java/momento/lettuce/ScalarCommandTest.java +++ b/src/test/java/momento/lettuce/ScalarCommandTest.java @@ -36,6 +36,6 @@ public void testSetAndGet() { assertEquals("OK", setResponse); var storedValue = client.get(key).block(); - assertEquals(storedValue, value); + assertEquals(value, storedValue); } } diff --git a/src/test/java/momento/lettuce/utils/RedisCodecByteArrayConverterTest.java b/src/test/java/momento/lettuce/utils/RedisCodecByteArrayConverterTest.java index ef34ab9..a02a5d8 100644 --- a/src/test/java/momento/lettuce/utils/RedisCodecByteArrayConverterTest.java +++ b/src/test/java/momento/lettuce/utils/RedisCodecByteArrayConverterTest.java @@ -8,7 +8,7 @@ class RedisCodecByteArrayConverterTest { @Test - void encodeAndDecodeKey() { + void encodeAndDecodeKeyToBytes() { var converter = new RedisCodecByteArrayConverter(StringCodec.UTF8); var key = "key"; var encodedKey = converter.encodeKeyToBytes(key);