From 9d3a38d71099fdd37a787e51eeaffb0baaf6e981 Mon Sep 17 00:00:00 2001 From: Phil Story Date: Fri, 11 Nov 2022 12:59:24 -0700 Subject: [PATCH] Retry bugfixes (#35) * Update retry store to separate out update and add and remove mutator. Leave the retry attempt increment up to the flow processor. --- .../coroutines/retry/flow/KafkaFlowRetry.kt | 12 +--- .../store/InMemoryConsumerRecordStore.kt | 32 ++++------ .../retry/flow/ObjectStoreRetryFlow.kt | 59 +++++++++++++++++++ .../figure/coroutines/retry/flow/RetryFlow.kt | 13 ++-- .../retry/store/RetryRecordStore.kt | 5 +- 5 files changed, 81 insertions(+), 40 deletions(-) create mode 100644 ft-coroutines-retry/src/main/kotlin/tech/figure/coroutines/retry/flow/ObjectStoreRetryFlow.kt diff --git a/ft-coroutines-kafka-retry/src/main/kotlin/tech/figure/kafka/coroutines/retry/flow/KafkaFlowRetry.kt b/ft-coroutines-kafka-retry/src/main/kotlin/tech/figure/kafka/coroutines/retry/flow/KafkaFlowRetry.kt index 4b9cebe..bdfb387 100644 --- a/ft-coroutines-kafka-retry/src/main/kotlin/tech/figure/kafka/coroutines/retry/flow/KafkaFlowRetry.kt +++ b/ft-coroutines-kafka-retry/src/main/kotlin/tech/figure/kafka/coroutines/retry/flow/KafkaFlowRetry.kt @@ -41,11 +41,7 @@ open class KafkaFlowRetry( e: Throwable ) { log.debug { "adding record to retry queue key:${item.key()} source:${item.topic()}-${item.partition()}" } - store.putOne(item, e) { - attempt = 0 - lastAttempted = OffsetDateTime.now() - lastException = e.localizedMessage - } + store.insert(item, e) } override suspend fun onSuccess( @@ -57,11 +53,7 @@ open class KafkaFlowRetry( override suspend fun onFailure(item: RetryRecord>, e: Throwable) { log.debug { "failed reprocess attempt:${item.attempt} Error: ${item.lastException} key:${item.data.key()} source:${item.data.topic()}-${item.data.partition()}" } - store.putOne(item.data, e) { - attempt.inc() - lastAttempted = OffsetDateTime.now() - lastException = e.localizedMessage - } + store.update(item.data, e) } override suspend fun process( diff --git a/ft-coroutines-kafka-retry/src/main/kotlin/tech/figure/kafka/coroutines/retry/store/InMemoryConsumerRecordStore.kt b/ft-coroutines-kafka-retry/src/main/kotlin/tech/figure/kafka/coroutines/retry/store/InMemoryConsumerRecordStore.kt index b53a7cf..597c674 100644 --- a/ft-coroutines-kafka-retry/src/main/kotlin/tech/figure/kafka/coroutines/retry/store/InMemoryConsumerRecordStore.kt +++ b/ft-coroutines-kafka-retry/src/main/kotlin/tech/figure/kafka/coroutines/retry/store/InMemoryConsumerRecordStore.kt @@ -1,16 +1,13 @@ package tech.figure.kafka.coroutines.retry.store -import tech.figure.coroutines.retry.store.RetryRecord -import tech.figure.coroutines.retry.store.RetryRecordStore import java.time.OffsetDateTime -import mu.KotlinLogging import org.apache.kafka.clients.consumer.ConsumerRecord +import tech.figure.coroutines.retry.store.RetryRecord +import tech.figure.coroutines.retry.store.RetryRecordStore fun inMemoryConsumerRecordStore( data: MutableList>> = mutableListOf() ) = object : RetryRecordStore> { - val log = KotlinLogging.logger {} - override suspend fun isEmpty(): Boolean = data.isEmpty() @@ -25,27 +22,22 @@ fun inMemoryConsumerRecordStore( .take(limit) } - override suspend fun getOne( + override suspend fun get( item: ConsumerRecord ): RetryRecord>? { return data.firstOrNull(recordMatches(item)) } - override suspend fun putOne( - item: ConsumerRecord, - lastException: Throwable?, - mutator: RetryRecord>.() -> Unit - ) { - val record = getOne(item) - if (record == null) { - data += RetryRecord(item, 0, OffsetDateTime.now(), lastException?.message.orEmpty()).also { - log.debug { "putting new entry for ${item.key()}" } - } - return - } + override suspend fun insert(item: ConsumerRecord, e: Throwable?) { + data += RetryRecord(item, lastException = e?.localizedMessage.orEmpty()) + } + + override suspend fun update(item: ConsumerRecord, e: Throwable?) { + val record = get(item) ?: error("record not found") - data[data.indexOf(record)].mutator().also { - log.debug { "incrementing attempt for $record" } + // Find and update the record in the data set. + data[data.indexOf(record)].also { + it.data = item } } diff --git a/ft-coroutines-retry/src/main/kotlin/tech/figure/coroutines/retry/flow/ObjectStoreRetryFlow.kt b/ft-coroutines-retry/src/main/kotlin/tech/figure/coroutines/retry/flow/ObjectStoreRetryFlow.kt new file mode 100644 index 0000000..59462ef --- /dev/null +++ b/ft-coroutines-retry/src/main/kotlin/tech/figure/coroutines/retry/flow/ObjectStoreRetryFlow.kt @@ -0,0 +1,59 @@ +package tech.figure.coroutines.retry.flow + +import java.time.OffsetDateTime +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.asFlow +import tech.figure.coroutines.retry.store.RetryRecord +import tech.figure.coroutines.retry.store.RetryRecordStore + +/** + * Retry a flow of objects using the backing [store]. + * + * @param handler The handler to reprocess with. + * @param store [RetryRecordStore] to save and retrieve data object of type [T] from. + * @param groupSize Process a max of this many elements each poll loop (aka: limit). + */ +fun recordStoreFlowRetry( + handler: suspend (T) -> Unit, + store: RetryRecordStore, + groupSize: Int = 40, +): FlowRetry = RecordStoreFlowRetry(handler, store, groupSize) + +/** + * Retry a flow of objects using the backing [store]. + * + * @param handler The handler to reprocess with. + * @param store [RetryRecordStore] to save and retrieve data object of type [T] from. + * @param groupSize Process a max of this many elements each poll loop (aka: limit). + */ +internal open class RecordStoreFlowRetry( + private val handler: suspend (T) -> Unit, + private val store: RetryRecordStore, + private val groupSize: Int = 40, +) : FlowRetry { + override suspend fun produceNext( + attemptRange: IntRange, + olderThan: OffsetDateTime, + limit: Int + ): Flow> = + store + .select(attemptRange, olderThan, limit) + .sortedByDescending { it.lastAttempted } + .take(groupSize) + .asFlow() + + override suspend fun send(item: T, e: Throwable) = + store.update(item, e) + + override suspend fun onSuccess(item: RetryRecord) = + store.remove(item.data) + + override suspend fun onFailure(item: RetryRecord, e: Throwable) = + store.update(item.data, e) + + override suspend fun process(item: T, attempt: Int) = + handler(item) + + override suspend fun hasNext() = + !store.isEmpty() +} diff --git a/ft-coroutines-retry/src/main/kotlin/tech/figure/coroutines/retry/flow/RetryFlow.kt b/ft-coroutines-retry/src/main/kotlin/tech/figure/coroutines/retry/flow/RetryFlow.kt index 6175476..6d0cb44 100644 --- a/ft-coroutines-retry/src/main/kotlin/tech/figure/coroutines/retry/flow/RetryFlow.kt +++ b/ft-coroutines-retry/src/main/kotlin/tech/figure/coroutines/retry/flow/RetryFlow.kt @@ -7,7 +7,6 @@ import kotlin.time.toJavaDuration import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.collect -import kotlinx.coroutines.flow.map import kotlinx.coroutines.flow.onStart import mu.KotlinLogging import tech.figure.coroutines.retry.RetryStrategy @@ -37,28 +36,26 @@ fun retryFlow( val log = KotlinLogging.logger {} val strategies = retryStrategies.invert() + log.info { "initializing polling retry flow ${flowRetry.javaClass.name}" } return pollingFlow(retryInterval) { if (!flowRetry.hasNext()) { return@pollingFlow } for (strategy in strategies) { - val lastAttempted = OffsetDateTime.now().minus(strategy.value.lastAttempted.toJavaDuration()) + val attemptedBefore = OffsetDateTime.now().minus(strategy.value.lastAttempted.toJavaDuration()) val onFailure: suspend (RetryRecord, Throwable) -> Unit = { rec, it -> strategy.value.onFailure("", it) flowRetry.onFailure(rec, it) } - flowRetry.produceNext(strategy.key, lastAttempted, batchSize) + flowRetry.produceNext(strategy.key, attemptedBefore, batchSize) .onStart { - log.trace { "${strategy.value.name} --> Retrying records in group:${strategy.key} lastAttempted:$lastAttempted" } - } - .map { - it.attempt = it.attempt.inc() - it + log.trace { "${strategy.value.name} --> Retrying records in group:${strategy.key} lastAttempted:$attemptedBefore" } } .tryMap(onFailure) { + log.debug { "retry processing attempt:${it.attempt} rec:${it.data}" } flowRetry.process(it.data, it.attempt) log.debug { "retry succeeded on attempt:${it.attempt} rec:${it.data}" } diff --git a/ft-coroutines-retry/src/main/kotlin/tech/figure/coroutines/retry/store/RetryRecordStore.kt b/ft-coroutines-retry/src/main/kotlin/tech/figure/coroutines/retry/store/RetryRecordStore.kt index db23731..e9b8f14 100644 --- a/ft-coroutines-retry/src/main/kotlin/tech/figure/coroutines/retry/store/RetryRecordStore.kt +++ b/ft-coroutines-retry/src/main/kotlin/tech/figure/coroutines/retry/store/RetryRecordStore.kt @@ -6,7 +6,8 @@ import java.time.OffsetDateTime interface RetryRecordStore { suspend fun isEmpty(): Boolean suspend fun select(attemptRange: IntRange, lastAttempted: OffsetDateTime, limit: Int = DEFAULT_FETCH_LIMIT): List> - suspend fun getOne(item: T): RetryRecord? - suspend fun putOne(item: T, lastException: Throwable? = null, mutator: RetryRecord.() -> Unit) + suspend fun get(item: T): RetryRecord? + suspend fun insert(item: T, e: Throwable? = null) + suspend fun update(item: T, e: Throwable? = null) suspend fun remove(item: T) }