Skip to content

Commit

Permalink
fix(redis): hydrating all values not just first 100 (#152)
Browse files Browse the repository at this point in the history
  • Loading branch information
emjburns authored Nov 13, 2018
1 parent f88120b commit 0a90e17
Show file tree
Hide file tree
Showing 7 changed files with 52 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -667,6 +667,8 @@ abstract class AbstractResourceTypeHandler<T : Resource>(
return
}

log.info("Processing ${markedResources.size} for notification in ${javaClass.simpleName}")

val maxItemsToProcess = Math.min(markedResources.size, workConfiguration.maxItemsProcessedPerCycle)
markedResources.subList(0, maxItemsToProcess)
.filter {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ open class InMemoryCacheStatus(
shutdown()
}
} catch (e: Exception) {
log.error("Failed while checking the caches in ${javaClass.simpleName}.")
log.error("Failed while checking the caches in ${javaClass.simpleName}.", e)
}
}, 0, 5, TimeUnit.SECONDS)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ abstract class ScheduledAgent(
startupExecutorService.shutdown()
}
} catch (e: Exception) {
log.error("Failed while waiting for cache to start in ${javaClass.simpleName}.")
log.error("Failed while waiting for cache to start in ${javaClass.simpleName}.", e)
}
}, 0, 5, TimeUnit.SECONDS)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,13 +73,16 @@ class RedisResourceStateRepository(
if (set.isEmpty()) {
emptyList()
} else {
set.chunked(REDIS_CHUNK_SIZE).map { sublist ->
this.withCommandsClient<Set<String>> { client ->
client.hmget(SINGLE_STATE_KEY, *sublist.map { it }.toTypedArray()).toSet()
val state = mutableSetOf<ResourceState>()
set.chunked(REDIS_CHUNK_SIZE).forEach { subset ->
val partialState = this.withCommandsClient<Set<String>> { client ->
client.hmget(SINGLE_STATE_KEY, *subset.map { it }.toTypedArray()).toSet()
}.map { json ->
objectMapper.readValue<ResourceState>(json)
}
}.flatten()
state.addAll(partialState)
}
state.toList()
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,13 +125,17 @@ class RedisResourceTrackingRepository(

private fun hydrateMarkedResources(resourseIds: Set<String>): List<MarkedResource> {
if (resourseIds.isEmpty()) return emptyList()
return resourseIds.chunked(REDIS_CHUNK_SIZE).map { sublist ->
return redisClientDelegate.withCommandsClient<Set<String>> { client ->
client.hmget(SINGLE_RESOURCES_KEY, *sublist.toTypedArray()).toSet()
}.map { json ->
objectMapper.readValue<MarkedResource>(json)
}
}.flatten()

val hydratedResources = mutableListOf<MarkedResource>()
resourseIds.chunked(REDIS_CHUNK_SIZE).forEach { sublist ->
val hydrated = redisClientDelegate.withCommandsClient<Set<String>> { client ->
client.hmget(SINGLE_RESOURCES_KEY, *sublist.toTypedArray()).toSet()
}.map { json ->
objectMapper.readValue<MarkedResource>(json)
}
hydratedResources.addAll(hydrated)
}
return hydratedResources
}

override fun upsert(markedResource: MarkedResource, deleteScore: Long, softDeleteScore: Long) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,13 +126,17 @@ class RedisResourceUseTrackingRepository(

private fun hydrateLastSeen(keys: Set<String>): List<LastSeenInfo> {
if (keys.isEmpty()) return emptyList()
return keys.chunked(REDIS_CHUNK_SIZE).map { sublist ->
redisClientDelegate.withCommandsClient<Set<String>> { client ->

val hydratedLastSeen = mutableListOf<LastSeenInfo>()
keys.chunked(REDIS_CHUNK_SIZE).forEach { sublist ->
val hydrated = redisClientDelegate.withCommandsClient<Set<String>> { client ->
client.hmget(LAST_SEEN, *sublist.toTypedArray()).toSet()
}.map { json ->
objectMapper.readValue<LastSeenInfo>(json)
}
}.flatten()
hydratedLastSeen.addAll(hydrated)
}
return hydratedLastSeen
}

override fun isUnused(resourceIdentifier: String): Boolean {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,19 @@ object RedisResourceTrackingRepositoryTest {
RedisClientSelector(listOf(JedisClientDelegate("primaryDefault", jedisPool))), objectMapper, clock
)

private val defaultMarkedResource = MarkedResource(
resource = TestResource(resourceId = "test"),
summaries = listOf(Summary("invalid resourceHash 1", "rule 1")),
namespace = "namespace",
projectedDeletionStamp = 1,
projectedSoftDeletionStamp = 1,
notificationInfo = NotificationInfo(
recipient = "yolo@netflixcom",
notificationType = "Email",
notificationStamp = clock.instant().toEpochMilli()
)
)

@BeforeEach
fun setup() {
jedisPool.resource.use {
Expand Down Expand Up @@ -216,4 +229,14 @@ object RedisResourceTrackingRepositoryTest {

resourceRepository.getNumMarkedResources() shouldMatch equalTo(1L)
}

@Test
fun `scanning should work`() {
for(i in 1..200) {
resourceRepository.upsert(defaultMarkedResource.copy(resource = TestResource(resourceId = "$i")))
}

resourceRepository.getNumMarkedResources() shouldMatch equalTo(200L)
resourceRepository.getMarkedResources().size shouldMatch equalTo(200)
}
}

0 comments on commit 0a90e17

Please sign in to comment.