Skip to content

Commit

Permalink
Retry bugfixes (#35)
Browse files Browse the repository at this point in the history
* Update retry store to separate out update and add and remove mutator. Leave the retry attempt increment up to the flow processor.
  • Loading branch information
mtps authored Nov 11, 2022
1 parent 80953cd commit 9d3a38d
Show file tree
Hide file tree
Showing 5 changed files with 81 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,7 @@ open class KafkaFlowRetry<K, V>(
e: Throwable
) {
log.debug { "adding record to retry queue key:${item.key()} source:${item.topic()}-${item.partition()}" }
store.putOne(item, e) {
attempt = 0
lastAttempted = OffsetDateTime.now()
lastException = e.localizedMessage
}
store.insert(item, e)
}

override suspend fun onSuccess(
Expand All @@ -57,11 +53,7 @@ open class KafkaFlowRetry<K, V>(

override suspend fun onFailure(item: RetryRecord<ConsumerRecord<K, V>>, e: Throwable) {
log.debug { "failed reprocess attempt:${item.attempt} Error: ${item.lastException} key:${item.data.key()} source:${item.data.topic()}-${item.data.partition()}" }
store.putOne(item.data, e) {
attempt.inc()
lastAttempted = OffsetDateTime.now()
lastException = e.localizedMessage
}
store.update(item.data, e)
}

override suspend fun process(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,13 @@
package tech.figure.kafka.coroutines.retry.store

import tech.figure.coroutines.retry.store.RetryRecord
import tech.figure.coroutines.retry.store.RetryRecordStore
import java.time.OffsetDateTime
import mu.KotlinLogging
import org.apache.kafka.clients.consumer.ConsumerRecord
import tech.figure.coroutines.retry.store.RetryRecord
import tech.figure.coroutines.retry.store.RetryRecordStore

fun <K, V> inMemoryConsumerRecordStore(
data: MutableList<RetryRecord<ConsumerRecord<K, V>>> = mutableListOf()
) = object : RetryRecordStore<ConsumerRecord<K, V>> {
val log = KotlinLogging.logger {}

override suspend fun isEmpty(): Boolean =
data.isEmpty()

Expand All @@ -25,27 +22,22 @@ fun <K, V> inMemoryConsumerRecordStore(
.take(limit)
}

override suspend fun getOne(
override suspend fun get(
item: ConsumerRecord<K, V>
): RetryRecord<ConsumerRecord<K, V>>? {
return data.firstOrNull(recordMatches(item))
}

override suspend fun putOne(
item: ConsumerRecord<K, V>,
lastException: Throwable?,
mutator: RetryRecord<ConsumerRecord<K, V>>.() -> Unit
) {
val record = getOne(item)
if (record == null) {
data += RetryRecord(item, 0, OffsetDateTime.now(), lastException?.message.orEmpty()).also {
log.debug { "putting new entry for ${item.key()}" }
}
return
}
override suspend fun insert(item: ConsumerRecord<K, V>, e: Throwable?) {
data += RetryRecord(item, lastException = e?.localizedMessage.orEmpty())
}

override suspend fun update(item: ConsumerRecord<K, V>, e: Throwable?) {
val record = get(item) ?: error("record not found")

data[data.indexOf(record)].mutator().also {
log.debug { "incrementing attempt for $record" }
// Find and update the record in the data set.
data[data.indexOf(record)].also {
it.data = item
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package tech.figure.coroutines.retry.flow

import java.time.OffsetDateTime
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.asFlow
import tech.figure.coroutines.retry.store.RetryRecord
import tech.figure.coroutines.retry.store.RetryRecordStore

/**
* Retry a flow of objects using the backing [store].
*
* @param handler The handler to reprocess with.
* @param store [RetryRecordStore] to save and retrieve data object of type [T] from.
* @param groupSize Process a max of this many elements each poll loop (aka: limit).
*/
fun <T> recordStoreFlowRetry(
handler: suspend (T) -> Unit,
store: RetryRecordStore<T>,
groupSize: Int = 40,
): FlowRetry<T> = RecordStoreFlowRetry(handler, store, groupSize)

/**
* Retry a flow of objects using the backing [store].
*
* @param handler The handler to reprocess with.
* @param store [RetryRecordStore] to save and retrieve data object of type [T] from.
* @param groupSize Process a max of this many elements each poll loop (aka: limit).
*/
internal open class RecordStoreFlowRetry<T>(
private val handler: suspend (T) -> Unit,
private val store: RetryRecordStore<T>,
private val groupSize: Int = 40,
) : FlowRetry<T> {
override suspend fun produceNext(
attemptRange: IntRange,
olderThan: OffsetDateTime,
limit: Int
): Flow<RetryRecord<T>> =
store
.select(attemptRange, olderThan, limit)
.sortedByDescending { it.lastAttempted }
.take(groupSize)
.asFlow()

override suspend fun send(item: T, e: Throwable) =
store.update(item, e)

override suspend fun onSuccess(item: RetryRecord<T>) =
store.remove(item.data)

override suspend fun onFailure(item: RetryRecord<T>, e: Throwable) =
store.update(item.data, e)

override suspend fun process(item: T, attempt: Int) =
handler(item)

override suspend fun hasNext() =
!store.isEmpty()
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import kotlin.time.toJavaDuration
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.onStart
import mu.KotlinLogging
import tech.figure.coroutines.retry.RetryStrategy
Expand Down Expand Up @@ -37,28 +36,26 @@ fun <T> retryFlow(
val log = KotlinLogging.logger {}
val strategies = retryStrategies.invert()

log.info { "initializing polling retry flow ${flowRetry.javaClass.name}" }
return pollingFlow(retryInterval) {
if (!flowRetry.hasNext()) {
return@pollingFlow
}

for (strategy in strategies) {
val lastAttempted = OffsetDateTime.now().minus(strategy.value.lastAttempted.toJavaDuration())
val attemptedBefore = OffsetDateTime.now().minus(strategy.value.lastAttempted.toJavaDuration())

val onFailure: suspend (RetryRecord<T>, Throwable) -> Unit = { rec, it ->
strategy.value.onFailure("", it)
flowRetry.onFailure(rec, it)
}

flowRetry.produceNext(strategy.key, lastAttempted, batchSize)
flowRetry.produceNext(strategy.key, attemptedBefore, batchSize)
.onStart {
log.trace { "${strategy.value.name} --> Retrying records in group:${strategy.key} lastAttempted:$lastAttempted" }
}
.map {
it.attempt = it.attempt.inc()
it
log.trace { "${strategy.value.name} --> Retrying records in group:${strategy.key} lastAttempted:$attemptedBefore" }
}
.tryMap(onFailure) {
log.debug { "retry processing attempt:${it.attempt} rec:${it.data}" }
flowRetry.process(it.data, it.attempt)

log.debug { "retry succeeded on attempt:${it.attempt} rec:${it.data}" }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ import java.time.OffsetDateTime
interface RetryRecordStore<T> {
suspend fun isEmpty(): Boolean
suspend fun select(attemptRange: IntRange, lastAttempted: OffsetDateTime, limit: Int = DEFAULT_FETCH_LIMIT): List<RetryRecord<T>>
suspend fun getOne(item: T): RetryRecord<T>?
suspend fun putOne(item: T, lastException: Throwable? = null, mutator: RetryRecord<T>.() -> Unit)
suspend fun get(item: T): RetryRecord<T>?
suspend fun insert(item: T, e: Throwable? = null)
suspend fun update(item: T, e: Throwable? = null)
suspend fun remove(item: T)
}

0 comments on commit 9d3a38d

Please sign in to comment.