Skip to content

Commit

Permalink
fix(redis): read in smaller chunks (#137)
Browse files Browse the repository at this point in the history
  • Loading branch information
emjburns authored Nov 5, 2018
1 parent 5cc721e commit 56e82d0
Show file tree
Hide file tree
Showing 6 changed files with 122 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,5 @@ import org.springframework.context.annotation.Import
@Import(JedisClientConfiguration::class, DynomiteClientConfiguration::class)
open class RedisConfiguration


const val REDIS_CHUNK_SIZE = 100
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@ package com.netflix.spinnaker.swabbie.redis

import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.kotlin.readValue
import com.netflix.spinnaker.config.REDIS_CHUNK_SIZE
import com.netflix.spinnaker.kork.jedis.RedisClientDelegate
import com.netflix.spinnaker.kork.jedis.RedisClientSelector
import com.netflix.spinnaker.swabbie.repository.ResourceStateRepository
import com.netflix.spinnaker.swabbie.model.ResourceState
import org.slf4j.LoggerFactory
import org.springframework.stereotype.Component
import redis.clients.jedis.ScanParams

@Component
class RedisResourceStateRepository(
Expand All @@ -50,20 +52,36 @@ class RedisResourceStateRepository(
}

override fun getAll(): List<ResourceState> {
ALL_STATES_KEY.let { key ->
return redisClientDelegate.run {
this.withCommandsClient<Set<String>> { client ->
client.smembers(key)
}.let { set ->
if (set.isEmpty()) emptyList()
else this.withCommandsClient<Set<String>> { client ->
client.hmget(SINGLE_STATE_KEY, *set.map { it }.toTypedArray()).toSet()
}.map { json ->
objectMapper.readValue<ResourceState>(json)
}
val set = this.withCommandsClient<Set<String>> { client ->
val results = mutableListOf<String>()
val scanParams: ScanParams = ScanParams().count(REDIS_CHUNK_SIZE)
var cursor = "0"
var shouldContinue = true

while (shouldContinue) {
val scanResult = client.sscan(ALL_STATES_KEY, cursor, scanParams)
results.addAll(scanResult.result)
cursor = scanResult.stringCursor
if ("0" == cursor) {
shouldContinue = false
}
}
results.toSet()
}

if (set.isEmpty()) {
emptyList()
} else {
set.chunked(REDIS_CHUNK_SIZE).map { sublist ->
this.withCommandsClient<Set<String>> { client ->
client.hmget(SINGLE_STATE_KEY, *sublist.map { it }.toTypedArray()).toSet()
}.map { json ->
objectMapper.readValue<ResourceState>(json)
}
}.flatten()
}
}
}
}

override fun get(resourceId: String, namespace: String): ResourceState? {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,16 @@ package com.netflix.spinnaker.swabbie.redis

import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.kotlin.readValue
import com.netflix.spinnaker.config.REDIS_CHUNK_SIZE
import com.netflix.spinnaker.kork.jedis.RedisClientDelegate
import com.netflix.spinnaker.kork.jedis.RedisClientSelector
import com.netflix.spinnaker.swabbie.repository.ResourceTrackingRepository
import com.netflix.spinnaker.swabbie.model.MarkedResource
import org.slf4j.LoggerFactory
import org.springframework.beans.factory.annotation.Qualifier
import org.springframework.stereotype.Component
import redis.clients.jedis.ScanParams
import redis.clients.jedis.ScanResult
import redis.clients.jedis.Tuple
import java.time.Clock
import java.time.Instant

Expand Down Expand Up @@ -87,26 +90,42 @@ class RedisResourceTrackingRepository(
* If true, includes all ids in the sorted set.
*/
private fun getAllIds(key: String, includeFutureIds: Boolean): Set<String> {
return redisClientDelegate.run {
this.withCommandsClient<Set<String>> { client ->
if (includeFutureIds) {
client.zrangeByScore(key, "-inf", "+inf")
} else {
client.zrangeByScore(key, 0.0, clock.instant().toEpochMilli().toDouble())
val results : MutableList<Tuple> = redisClientDelegate.withCommandsClient<MutableList<Tuple>> { client ->
val results = mutableListOf<Tuple>()
val scanParams: ScanParams = ScanParams().count(REDIS_CHUNK_SIZE)
var cursor = "0"
var shouldContinue = true

while (shouldContinue) {
val scanResult: ScanResult<Tuple> = client.zscan(key, cursor, scanParams)
results.addAll(scanResult.result)
cursor = scanResult.stringCursor
if ("0" == cursor) {
shouldContinue = false
}
}
results
}

return if (includeFutureIds) {
results.map { it.element }.toSet()
} else {
results
.filter { it.score >= 0.0 && it.score <= clock.instant().toEpochMilli().toDouble() }
.map { it.element }
.toSet()
}
}

private fun hydrateMarkedResources(resourseIds: Set<String>): List<MarkedResource> {
if (resourseIds.isEmpty()) return emptyList()
return redisClientDelegate.run {
this.withCommandsClient<Set<String>> { client ->
client.hmget(SINGLE_RESOURCES_KEY, *resourseIds.toTypedArray()).toSet()
}.map { json ->
objectMapper.readValue<MarkedResource>(json)
}
}
return resourseIds.chunked(REDIS_CHUNK_SIZE).map { sublist ->
return redisClientDelegate.withCommandsClient<Set<String>> { client ->
client.hmget(SINGLE_RESOURCES_KEY, *sublist.toTypedArray()).toSet()
}.map { json ->
objectMapper.readValue<MarkedResource>(json)
}
}.flatten()
}

override fun upsert(markedResource: MarkedResource, deleteScore: Long, softDeleteScore: Long) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,17 @@ package com.netflix.spinnaker.swabbie.redis

import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.kotlin.readValue
import com.netflix.spinnaker.config.REDIS_CHUNK_SIZE
import com.netflix.spinnaker.config.SwabbieProperties
import com.netflix.spinnaker.kork.jedis.RedisClientDelegate
import com.netflix.spinnaker.kork.jedis.RedisClientSelector
import com.netflix.spinnaker.swabbie.repository.LastSeenInfo
import com.netflix.spinnaker.swabbie.repository.ResourceUseTrackingRepository
import org.slf4j.LoggerFactory
import org.springframework.stereotype.Component
import redis.clients.jedis.ScanParams
import redis.clients.jedis.ScanResult
import redis.clients.jedis.Tuple
import java.time.Clock
import java.time.Duration
import java.time.temporal.ChronoUnit
Expand Down Expand Up @@ -80,16 +84,36 @@ class RedisResourceUseTrackingRepository(
}

override fun getUnused(): List<LastSeenInfo> {
val keys = redisClientDelegate.withCommandsClient<Set<String>> { client ->
client.zrangeByScore(LAST_SEEN_INDEX, 0.0, minusXdays(outOfUseThresholdDays).toDouble())
}
val keys = getWithParams(LAST_SEEN_INDEX, 0.0, minusXdays(outOfUseThresholdDays).toDouble())
return hydrateLastSeen(keys)
}

override fun getUsed(): Set<String> {
return redisClientDelegate.withCommandsClient<Set<String>> { client ->
client.zrangeByScore(LAST_SEEN_INDEX, minusXdays(outOfUseThresholdDays).toDouble(), Double.MAX_VALUE)
return getWithParams(LAST_SEEN_INDEX, minusXdays(outOfUseThresholdDays).toDouble(), Double.MAX_VALUE)
}

private fun getWithParams(key: String, min: Double, max: Double): Set<String> {
val results : MutableList<Tuple> = redisClientDelegate.withCommandsClient<MutableList<Tuple>> { client ->
val results = mutableListOf<Tuple>()
val scanParams: ScanParams = ScanParams().count(REDIS_CHUNK_SIZE)
var cursor = "0"
var shouldContinue = true

while (shouldContinue) {
val scanResult: ScanResult<Tuple> = client.zscan(key, cursor, scanParams)
results.addAll(scanResult.result)
cursor = scanResult.stringCursor
if ("0" == cursor) {
shouldContinue = false
}
}
results
}

return results
.filter { it.score in min..max }
.map { it.element }
.toSet()
}

override fun getLastSeenInfo(resourceIdentifier: String): LastSeenInfo? {
Expand All @@ -102,11 +126,13 @@ class RedisResourceUseTrackingRepository(

private fun hydrateLastSeen(keys: Set<String>): List<LastSeenInfo> {
if (keys.isEmpty()) return emptyList()
return redisClientDelegate.withCommandsClient<Set<String>> { client ->
client.hmget(LAST_SEEN, *keys.toTypedArray()).toSet()
}.map { json ->
objectMapper.readValue<LastSeenInfo>(json)
}
return keys.chunked(REDIS_CHUNK_SIZE).map { sublist ->
redisClientDelegate.withCommandsClient<Set<String>> { client ->
client.hmget(LAST_SEEN, *sublist.toTypedArray()).toSet()
}.map { json ->
objectMapper.readValue<LastSeenInfo>(json)
}
}.flatten()
}

override fun isUnused(resourceIdentifier: String): Boolean {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,17 @@
package com.netflix.spinnaker.swabbie.redis

import com.fasterxml.jackson.databind.ObjectMapper
import com.netflix.spinnaker.config.REDIS_CHUNK_SIZE
import com.netflix.spinnaker.kork.jedis.RedisClientDelegate
import com.netflix.spinnaker.kork.jedis.RedisClientSelector
import com.netflix.spinnaker.swabbie.repository.TaskCompleteEventInfo
import com.netflix.spinnaker.swabbie.repository.TaskState
import com.netflix.spinnaker.swabbie.repository.TaskTrackingRepository
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import org.springframework.beans.factory.annotation.Qualifier
import org.springframework.stereotype.Component
import redis.clients.jedis.ScanParams
import redis.clients.jedis.ScanResult
import java.time.Clock
import java.util.concurrent.TimeUnit

Expand Down Expand Up @@ -99,10 +101,23 @@ class RedisTaskTrackingRepository(
}

private fun getAll(key: String): Map<String, String> {
return redisClientDelegate.run {
this.withCommandsClient<Map<String, String>> { client ->
client.hgetAll(key)
return redisClientDelegate.withCommandsClient<Map<String,String>> { client ->
val results = mutableMapOf<String,String>()
val scanParams: ScanParams = ScanParams().count(REDIS_CHUNK_SIZE)
var cursor = "0"
var shouldContinue = true

while (shouldContinue) {
val scanResult: ScanResult<Map.Entry<String, String>> = client.hscan(key, cursor, scanParams)
scanResult.result.forEach { entry ->
results[entry.key] = entry.value
}
cursor = scanResult.stringCursor
if ("0" == cursor) {
shouldContinue = false
}
}
results
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ object RedisResourceTrackingRepositoryTest {

listOf(
MarkedResource(
resource = TestResource("marked resourceHash due for deletion now"),
resource = TestResource(resourceId = "1", name = "marked resourceHash due for deletion now"),
summaries = listOf(Summary("invalid resourceHash 1", "rule 1")),
namespace = configuration.namespace,
projectedDeletionStamp = 0,
Expand All @@ -151,14 +151,14 @@ object RedisResourceTrackingRepositoryTest {
)
),
MarkedResource(
resource = TestResource("marked resourceHash not due for deletion 2 seconds later"),
resource = TestResource(resourceId = "2", name = "marked resourceHash not due for deletion 2 seconds later"),
summaries = listOf(Summary("invalid resourceHash 2", "rule 2")),
namespace = configuration.namespace,
projectedDeletionStamp = twoDaysFromNow.toEpochMilli(),
projectedSoftDeletionStamp = 0
),
MarkedResource(
resource = TestResource("random"),
resource = TestResource(resourceId = "3", name = "random"),
summaries = listOf(Summary("invalid resourceHash 3", "rule 3")),
namespace = configuration.namespace,
projectedDeletionStamp = twoDaysFromNow.toEpochMilli(),
Expand Down

0 comments on commit 56e82d0

Please sign in to comment.