Skip to content

Commit

Permalink
Merge pull request #154 from FigureTechnologies/batch_size_configurable
Browse files Browse the repository at this point in the history
Make batch size configurable (again)
  • Loading branch information
rchaing-figure authored Jul 25, 2023
2 parents 126cdba + ccc15d2 commit e515381
Show file tree
Hide file tree
Showing 5 changed files with 6 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ data class Config (
val upload: UploadConfig = UploadConfig.empty(),
val apiPort: Int,
val streamPort: Int,
val batchSize: Int,
val blockApi: BlockApiConfig,
val dbConfig: DBConfig,
val badBlockRange: List<Long>,
Expand Down
1 change: 1 addition & 0 deletions src/main/kotlin/tech/figure/aggregate/service/Main.kt
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,7 @@ fun main(args: Array<String>) {
dbClient,
channel,
config.hrp,
config.batchSize,
Pair(config.badBlockRange[0], config.badBlockRange[1]),
config.msgFeeHeight
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ class EventStreamUploader(
private val dbClient: DBClient = DBClient(),
private val channel: ChannelImpl<StreamTypeImpl>,
private val hrp: String,
private val batchSize: Int,
private val badBlockRange: Pair<Long, Long>,
private val msgFeeHeight: Long,
private val dispatchers: DispatcherProvider = DefaultDispatcherProvider(),
Expand Down Expand Up @@ -158,7 +159,7 @@ class EventStreamUploader(
}
.buffer(STREAM_BUFFER_CAPACITY, onBufferOverflow = SUSPEND)
.flowOn(dispatchers.io())
.chunked(size = 100, timeout = 10.seconds)
.chunked(size = batchSize, timeout = 10.seconds)
.transform { streamBlocks: List<StreamBlock> ->
log.info("collected block chunk size=${streamBlocks.size} and preparing for upload")
val batch: Batch = batchBlueprint.build()
Expand Down
4 changes: 1 addition & 3 deletions src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ api_port: ${API_PORT}
bad_block_range: ${ONE_DOT_ELEVEN_BAD_BLOCK_RANGE}
msg_fee_height: ${MSG_FEE_HEIGHT}
stream_port: ${STREAM_PORT:-7777}
batch_size: ${BATCH_SIZE}

dbConfig:
dbHost: ${DB_HOST}
Expand All @@ -29,9 +30,6 @@ event-stream:
# Configuration of the Tendermint API:
websocket:
throttle-duration-ms: 0
batch:
# Controls the maximum number of blocks that will be bundled into a batch for processing.
size: ${BATCH_SIZE}
# Controls the timeout for a batch. If a batch is incomplete and a new block has not been received within
# `timeout` milliseconds, the batch will be emitted, despite the fact it has less the `size` blocks.
# timeout-ms: 10000
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ class EventStreamUploaderTests {
dbClient,
channel,
"tp",
100,
Pair(config.badBlockRange[0], config.badBlockRange[1]),
config.msgFeeHeight
)
Expand Down

0 comments on commit e515381

Please sign in to comment.