Skip to content

Commit

Permalink
listen to blocks via blockapi instead of event stream
Browse files Browse the repository at this point in the history
  • Loading branch information
celloman committed Aug 16, 2023
1 parent 748ba23 commit deffc73
Show file tree
Hide file tree
Showing 19 changed files with 188 additions and 218 deletions.
8 changes: 2 additions & 6 deletions gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
asset-model = "0.1.2"
bouncycastle = "1.70"
exposed = "0.37.3"
figure-eventstream = "0.8.1"
flyway = "8.0.2"
grpc = "1.45.1" # If updating this value, ensure to update the doc reference in the ClientConfig class
grpc-kotlin = "1.3.0"
Expand All @@ -21,6 +20,7 @@ mockk = "1.12.3"
okhttp = "4.9.1"
postgres = "42.3.3"
protobuf = "3.23.2"
provenance-blockapi = "0.2.1"
provenance-client = "1.3.0"
provenance-hdwallet = "0.1.15"
provenance-proto = "1.13.0"
Expand All @@ -40,10 +40,6 @@ coroutines-slf4j = { module = "org.jetbrains.kotlinx:kotlinx-coroutines-slf4j",
exposed-core = { module = "org.jetbrains.exposed:exposed-core", version.ref = "exposed" }
exposed-dao = { module = "org.jetbrains.exposed:exposed-dao", version.ref = "exposed" }
exposed-jdbc = { module = "org.jetbrains.exposed:exposed-jdbc", version.ref = "exposed" }
figure-eventstream-api = { module = "tech.figure.eventstream:es-api", version.ref = "figure-eventstream" }
figure-eventstream-api-model = { module = "tech.figure.eventstream:es-api-model", version.ref = "figure-eventstream" }
figure-eventstream-cli = { module = "tech.figure.eventstream:es-cli", version.ref = "figure-eventstream" }
figure-eventstream-core = { module = "tech.figure.eventstream:es-core", version.ref = "figure-eventstream" }
flyway = { module = "org.flywaydb:flyway-core", version.ref = "flyway" }
grpc-protobuf = { module = "io.grpc:grpc-protobuf", version.ref = "grpc" }
grpc-stub = { module = "io.grpc:grpc-stub", version.ref = "grpc" }
Expand All @@ -63,6 +59,7 @@ okhttp3 = { module = "com.squareup.okhttp3:okhttp", version.ref = "okhttp" }
postgres = { module = "org.postgresql:postgresql", version.ref = "postgres" }
protobuf-java = { module = "com.google.protobuf:protobuf-java", version.ref = "protobuf" }
protobuf-java-util = { module = "com.google.protobuf:protobuf-java-util", version.ref = "protobuf" }
provenance-blockapi-client = { module = "tech.figure.block:api-client", version.ref = "provenance-blockapi" }
provenance-client = { module = "io.provenance.client:pb-grpc-client-kotlin", version.ref = "provenance-client" }
provenance-hdwallet = { module = "io.provenance.hdwallet:hdwallet", version.ref = "provenance-hdwallet" }
provenance-proto = { module = "io.provenance:proto-kotlin", version.ref = "provenance-proto" }
Expand Down Expand Up @@ -91,7 +88,6 @@ testcontainers-postgres = { module = "org.testcontainers:postgresql", version.re

[bundles]
database = ["exposed-core", "exposed-dao", "exposed-jdbc", "flyway", "hikari", "postgres", "sqlite"]
eventstream = ["figure-eventstream-api", "figure-eventstream-api-model", "figure-eventstream-cli", "figure-eventstream-core"]
grpc = ["grpc-netty", "grpc-netty-shaded", "grpc-protobuf", "grpc-stub", "grpc-springboot-starter", "grpc-kotlin-stub"]
jackson = ["jackson-module-kotlin", "jackson-module-protobuf"]
kotlin = ["coroutines-core-jvm", "coroutines-reactor", "coroutines-jdk8", "coroutines-slf4j", "kotlin-allopen", "kotlin-reflect", "kotlin-stdlib-jdk8"]
Expand Down
2 changes: 1 addition & 1 deletion server/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ dependencies {
libs.bouncycastle,
libs.java.jwt,
libs.okhttp3,
libs.provenance.blockapi.client,
libs.bundles.database,
libs.bundles.eventstream,
libs.bundles.grpc,
libs.bundles.jackson,
libs.bundles.kotlin,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,16 @@ import org.springframework.boot.context.properties.EnableConfigurationProperties
import org.springframework.boot.runApplication
import org.springframework.scheduling.annotation.EnableScheduling
import tech.figure.objectstore.gateway.configuration.BatchProperties
import tech.figure.objectstore.gateway.configuration.BlockapiProperties
import tech.figure.objectstore.gateway.configuration.DatabaseProperties
import tech.figure.objectstore.gateway.configuration.EventStreamProperties
import tech.figure.objectstore.gateway.configuration.ObjectStoreProperties
import tech.figure.objectstore.gateway.configuration.ProvenanceProperties

@SpringBootApplication
@EnableConfigurationProperties(
value = [
BatchProperties::class,
EventStreamProperties::class,
BlockapiProperties::class,
ObjectStoreProperties::class,
ProvenanceProperties::class,
DatabaseProperties::class,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
package tech.figure.objectstore.gateway.components

import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import mu.KLogging
import org.springframework.beans.factory.annotation.Qualifier
import org.springframework.boot.actuate.health.Health
import org.springframework.boot.actuate.health.HealthIndicator
import org.springframework.boot.context.event.ApplicationReadyEvent
import org.springframework.context.ApplicationListener
import org.springframework.stereotype.Component
import tech.figure.block.api.client.BlockAPIClient
import tech.figure.block.api.proto.BlockServiceOuterClass
import tech.figure.objectstore.gateway.configuration.BeanQualifiers
import tech.figure.objectstore.gateway.configuration.BlockapiProperties
import tech.figure.objectstore.gateway.repository.BlockHeightRepository
import tech.figure.objectstore.gateway.service.TxEventHandlerService
import java.util.concurrent.atomic.AtomicBoolean
import kotlin.time.Duration.Companion.seconds
import kotlin.time.ExperimentalTime

@Component
class BlockStreamConsumer(
private val txEventHandlerService: TxEventHandlerService,
private val blockHeightRepository: BlockHeightRepository,
private val blockapiProperties: BlockapiProperties,
private val blockAPIClient: BlockAPIClient,
@Qualifier(BeanQualifiers.BLOCK_STREAM_COROUTINE_SCOPE_QUALIFIER) private val blockStreamScope: CoroutineScope,
) : ApplicationListener<ApplicationReadyEvent>, HealthIndicator {
private companion object : KLogging() {
private var blockStreamRunning = AtomicBoolean(false)
private val TX_EVENTS = setOf("wasm")
}

override fun onApplicationEvent(event: ApplicationReadyEvent) {
if (!blockapiProperties.enabled) {
logger.warn("Event stream has been manually disabled! Use a value of `true` for EVENT_STREAM_ENABLED to re-enable it")
return
}
tryStartBlockStream {
blockStreamLoop()
}
}

override fun health(): Health = if (blockStreamRunning.get()) {
Health.up().build()
} else {
Health.down().build()
}

private suspend fun blockStreamLoop() {
val lastBlockProcessed = blockHeightRepository.getLastProcessedBlockHeight()
blockAPIClient.streamBlocks(
start = lastBlockProcessed,
preference = BlockServiceOuterClass.PREFER.TX_EVENTS
).collect { block ->
val lastProcessedHeight = blockHeightRepository.getLastProcessedBlockHeight()

block.blockResult.block.transactionsList.flatMap { it.eventsList }
.filter { TX_EVENTS.contains(it.eventType) } // these are the blocks you are looking for
.forEach {
logger.info { "got event $it" }
try {
txEventHandlerService.handleEvent(it)
} catch (e: Exception) {
// If exceptions are simply thrown without any additional logging, the errors appear to be
// event stream related, which would not be the case if they occur in this block
logger.error("Failed to process event with hash ${it.txHash} at height ${it.height}", e)
throw e
}
}

block.blockResult.block.height.let { blockHeight ->
if (blockHeight < lastProcessedHeight) {
logger.warn("Received lower block height than last processed ($blockHeight vs. $lastProcessedHeight)")
} else {
blockHeightRepository.setLastProcessedBlockHeight(blockHeight)
}
}
}
}

@OptIn(ExperimentalTime::class)
private fun tryStartBlockStream(
eventStreamFn: suspend CoroutineScope.() -> Unit
) {
logger.info("BLOCKSTREAM INIT")
blockStreamScope.launch(Dispatchers.IO) {
while (true) {
try {
logger.info("BLOCKSTREAM START")
eventStreamFn()
logger.info("BLOCKSTREAM END/SUCCESS")
blockStreamRunning.set(true)
} catch (e: Exception) {
logger.error("BLOCKSTREAM END/FAILURE {}", e.message)
blockStreamRunning.set(false)
logger.info("Waiting ${blockapiProperties.restartDelaySeconds} seconds before reconnecting to block stream")
delay(blockapiProperties.restartDelaySeconds.seconds)
}
}
}
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@ data class BatchProperties(
)

@ConstructorBinding
@ConfigurationProperties(prefix = "event.stream")
@ConfigurationProperties(prefix = "blockapi")
@Validated
data class EventStreamProperties(
val websocketUri: URI,
data class BlockapiProperties(
val uri: URI,
val epochHeight: Long,
val enabled: Boolean,
val blockHeightTrackingUuid: UUID,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package tech.figure.objectstore.gateway.configuration

object BeanQualifiers {
const val EVENT_STREAM_COROUTINE_SCOPE_QUALIFIER = "eventStreamCoroutineScopeBean"
const val BLOCK_STREAM_COROUTINE_SCOPE_QUALIFIER = "eventStreamCoroutineScopeBean"
const val OBJECTSTORE_ENCRYPTION_KEYS: String = "objectStoreEncryptionKeys"
const val OBJECTSTORE_PRIVATE_KEYS: String = "objectStorePrivateKeys"
const val OBJECTSTORE_MASTER_KEY: String = "objectStoreMasterKey"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package tech.figure.objectstore.gateway.configuration

import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import tech.figure.block.api.client.BlockAPIClient
import tech.figure.block.api.client.Protocol
import tech.figure.block.api.client.withProtocol
import tech.figure.block.api.proto.BlockServiceOuterClass

@Configuration
class BlockapiConfig {
@Bean
fun blockapiClient(blockapiProperties: BlockapiProperties): BlockAPIClient = BlockAPIClient(
host = blockapiProperties.uri.host,
port = blockapiProperties.uri.port,
withProtocol(if (blockapiProperties.uri.scheme.endsWith('s')) Protocol.TLS else Protocol.PLAINTEXT),
defaultTxVerbosity = BlockServiceOuterClass.PREFER.TX_EVENTS,
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,13 @@ import tech.figure.objectstore.gateway.util.CoroutineUtil

@Configuration
class CoroutineConfig {
@Bean(BeanQualifiers.EVENT_STREAM_COROUTINE_SCOPE_QUALIFIER)
fun eventStreamScope(eventStreamProperties: EventStreamProperties): CoroutineScope = CoroutineUtil.newSingletonScope(
scopeName = CoroutineScopeNames.EVENT_STREAM_SCOPE,
threadCount = eventStreamProperties.threadCount
@Bean(BeanQualifiers.BLOCK_STREAM_COROUTINE_SCOPE_QUALIFIER)
fun blockStreamScope(blockapiProperties: BlockapiProperties): CoroutineScope = CoroutineUtil.newSingletonScope(
scopeName = CoroutineScopeNames.BLOCK_STREAM_SCOPE,
threadCount = blockapiProperties.threadCount
)
}

object CoroutineScopeNames {
const val BATCH_PROCESS_SCOPE = "batchProcessCoroutineScope"
const val EVENT_STREAM_SCOPE = "eventStreamCoroutineScope"
const val BLOCK_STREAM_SCOPE = "blockStreamCoroutineScope"
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package tech.figure.objectstore.gateway.eventstream

import mu.KotlinLogging
import tech.figure.eventstream.stream.models.TxEvent
import tech.figure.block.api.proto.BlockOuterClass.TxEvent

/**
* Wrapper for a wasm event that contains the expected object store gateway attributes to trigger an access grant.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
package tech.figure.objectstore.gateway.eventstream

import mu.KotlinLogging
import tech.figure.eventstream.stream.models.TxEvent
import tech.figure.block.api.proto.BlockOuterClass.Attribute
import tech.figure.block.api.proto.BlockOuterClass.TxEvent

/**
* A base adapter class for a given TxEvent, consumed via the event stream libraries. This allows an event to be
Expand All @@ -15,7 +16,7 @@ abstract class GatewayEventAdapter(val sourceEvent: TxEvent) {
/**
* Lazily-parsed tx events, accessible for event value fetching.
*/
private val attributeMap: Map<String, TxAttribute> by lazy { TxAttribute.parseTxEventMap(sourceEvent) }
private val attributeMap: Map<String, Attribute> by lazy { sourceEvent.attributesList.associateBy { it.key } }

/**
* On-demand access to the source event's txHash value for simpler syntax throughout the application.
Expand Down
Loading

0 comments on commit deffc73

Please sign in to comment.