diff --git a/common/src/main/kotlin/tech/figure/aggregate/common/Config.kt b/common/src/main/kotlin/tech/figure/aggregate/common/Config.kt index d63a9c59..920ca65b 100644 --- a/common/src/main/kotlin/tech/figure/aggregate/common/Config.kt +++ b/common/src/main/kotlin/tech/figure/aggregate/common/Config.kt @@ -14,6 +14,7 @@ data class Config ( val upload: UploadConfig = UploadConfig.empty(), val apiPort: Int, val streamPort: Int, + val batchSize: Int, val blockApi: BlockApiConfig, val dbConfig: DBConfig, val badBlockRange: List, diff --git a/src/main/kotlin/tech/figure/aggregate/service/Main.kt b/src/main/kotlin/tech/figure/aggregate/service/Main.kt index a2992eb4..ff27ef8a 100644 --- a/src/main/kotlin/tech/figure/aggregate/service/Main.kt +++ b/src/main/kotlin/tech/figure/aggregate/service/Main.kt @@ -249,6 +249,7 @@ fun main(args: Array) { dbClient, channel, config.hrp, + config.batchSize, Pair(config.badBlockRange[0], config.badBlockRange[1]), config.msgFeeHeight ) diff --git a/src/main/kotlin/tech/figure/aggregate/service/stream/consumers/EventStreamUploader.kt b/src/main/kotlin/tech/figure/aggregate/service/stream/consumers/EventStreamUploader.kt index 8bcd2df4..faf731e0 100644 --- a/src/main/kotlin/tech/figure/aggregate/service/stream/consumers/EventStreamUploader.kt +++ b/src/main/kotlin/tech/figure/aggregate/service/stream/consumers/EventStreamUploader.kt @@ -42,6 +42,7 @@ class EventStreamUploader( private val dbClient: DBClient = DBClient(), private val channel: ChannelImpl, private val hrp: String, + private val batchSize: Int, private val badBlockRange: Pair, private val msgFeeHeight: Long, private val dispatchers: DispatcherProvider = DefaultDispatcherProvider(), @@ -158,7 +159,7 @@ class EventStreamUploader( } .buffer(STREAM_BUFFER_CAPACITY, onBufferOverflow = SUSPEND) .flowOn(dispatchers.io()) - .chunked(size = 100, timeout = 10.seconds) + .chunked(size = batchSize, timeout = 10.seconds) .transform { streamBlocks: List -> log.info("collected block chunk size=${streamBlocks.size} and preparing for upload") val batch: Batch = batchBlueprint.build() diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 63c62db0..3dcb619b 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -4,6 +4,7 @@ api_port: ${API_PORT} bad_block_range: ${ONE_DOT_ELEVEN_BAD_BLOCK_RANGE} msg_fee_height: ${MSG_FEE_HEIGHT} stream_port: ${STREAM_PORT:-7777} +batch_size: ${BATCH_SIZE} dbConfig: dbHost: ${DB_HOST} @@ -29,9 +30,6 @@ event-stream: # Configuration of the Tendermint API: websocket: throttle-duration-ms: 0 - batch: - # Controls the maximum number of blocks that will be bundled into a batch for processing. - size: ${BATCH_SIZE} # Controls the timeout for a batch. If a batch is incomplete and a new block has not been received within # `timeout` milliseconds, the batch will be emitted, despite the fact it has less the `size` blocks. # timeout-ms: 10000 diff --git a/src/test/kotlin/tech/figure/aggregate/service/EventStreamUploaderTests.kt b/src/test/kotlin/tech/figure/aggregate/service/EventStreamUploaderTests.kt index 149eb112..35f6e776 100644 --- a/src/test/kotlin/tech/figure/aggregate/service/EventStreamUploaderTests.kt +++ b/src/test/kotlin/tech/figure/aggregate/service/EventStreamUploaderTests.kt @@ -84,6 +84,7 @@ class EventStreamUploaderTests { dbClient, channel, "tp", + 100, Pair(config.badBlockRange[0], config.badBlockRange[1]), config.msgFeeHeight )