From 0afe4c9aa52e43aaf49ebfd8faa1c7c54a82de4f Mon Sep 17 00:00:00 2001 From: piercetrey-figure Date: Fri, 1 Sep 2023 14:33:12 -0600 Subject: [PATCH 1/3] Update to Java 17 and blockapi blockstream --- .../src/main/kotlin/core-config.gradle.kts | 10 +-- gradle/libs.versions.toml | 2 +- .../asset/verifier/client/VerifierClient.kt | 6 +- .../verifier/config/EventStreamProvider.kt | 1 + .../providers/BlockApiEventStreamProvider.kt | 89 ++++++------------- .../providers/DefaultEventStreamProvider.kt | 20 ++--- 6 files changed, 48 insertions(+), 80 deletions(-) diff --git a/buildSrc/src/main/kotlin/core-config.gradle.kts b/buildSrc/src/main/kotlin/core-config.gradle.kts index b88db6b..62cca3d 100644 --- a/buildSrc/src/main/kotlin/core-config.gradle.kts +++ b/buildSrc/src/main/kotlin/core-config.gradle.kts @@ -17,18 +17,18 @@ repositories { tasks.withType { kotlinOptions { freeCompilerArgs = listOf("-Xjsr305=strict") - jvmTarget = "11" + jvmTarget = "17" } } tasks.withType { - sourceCompatibility = JavaVersion.VERSION_11.toString() - targetCompatibility = JavaVersion.VERSION_11.toString() + sourceCompatibility = JavaVersion.VERSION_17.toString() + targetCompatibility = JavaVersion.VERSION_17.toString() } configure { - sourceCompatibility = JavaVersion.VERSION_11 - targetCompatibility = JavaVersion.VERSION_11 + sourceCompatibility = JavaVersion.VERSION_17 + targetCompatibility = JavaVersion.VERSION_17 } tasks.test { diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index 2784268..4d0f8db 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -1,7 +1,7 @@ [versions] asset-model = "1.1.0" asset-specs = "1.0.0" -blockapi = "0.1.10" +blockapi = "0.2.1" bouncycastle = "1.70" feign = "12.3" figure-eventstream = "0.9.0" diff --git a/verifier/src/main/kotlin/tech/figure/classification/asset/verifier/client/VerifierClient.kt b/verifier/src/main/kotlin/tech/figure/classification/asset/verifier/client/VerifierClient.kt index fb753d0..a9a7f6c 100644 --- a/verifier/src/main/kotlin/tech/figure/classification/asset/verifier/client/VerifierClient.kt +++ b/verifier/src/main/kotlin/tech/figure/classification/asset/verifier/client/VerifierClient.kt @@ -96,10 +96,12 @@ class VerifierClient(private val config: VerifierClientConfig) { onBlock = { blockHeight -> // Record each block intercepted NewBlockReceived(blockHeight).send() - // Track new block height - latestBlock = trackBlockHeight(latestBlock, blockHeight) }, onEvent = { event -> handleEvent(event) }, + onEventsProcessed = { blockHeight -> + // Track new block height after successful processing of events in block + latestBlock = trackBlockHeight(latestBlock, blockHeight) + }, onError = { e -> StreamExceptionOccurred(e).send() }, onCompletion = { t -> StreamCompleted(t).send() } ) diff --git a/verifier/src/main/kotlin/tech/figure/classification/asset/verifier/config/EventStreamProvider.kt b/verifier/src/main/kotlin/tech/figure/classification/asset/verifier/config/EventStreamProvider.kt index 8691fd9..c3b869e 100644 --- a/verifier/src/main/kotlin/tech/figure/classification/asset/verifier/config/EventStreamProvider.kt +++ b/verifier/src/main/kotlin/tech/figure/classification/asset/verifier/config/EventStreamProvider.kt @@ -8,6 +8,7 @@ interface EventStreamProvider { height: Long? = null, onBlock: (suspend (blockHeight: Long) -> Unit), onEvent: (suspend (event: AssetClassificationEvent) -> Unit), + onEventsProcessed: (suspend (blockHeight: Long) -> Unit), onError: (suspend (throwable: Throwable) -> Unit), onCompletion: (suspend (throwable: Throwable?) -> Unit) ): RecoveryStatus diff --git a/verifier/src/main/kotlin/tech/figure/classification/asset/verifier/util/eventstream/providers/BlockApiEventStreamProvider.kt b/verifier/src/main/kotlin/tech/figure/classification/asset/verifier/util/eventstream/providers/BlockApiEventStreamProvider.kt index 07558c4..05885cb 100644 --- a/verifier/src/main/kotlin/tech/figure/classification/asset/verifier/util/eventstream/providers/BlockApiEventStreamProvider.kt +++ b/verifier/src/main/kotlin/tech/figure/classification/asset/verifier/util/eventstream/providers/BlockApiEventStreamProvider.kt @@ -2,7 +2,9 @@ package tech.figure.classification.asset.verifier.util.eventstream.providers import io.provenance.client.protobuf.extensions.time.toOffsetDateTimeOrNull import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.delay +import kotlinx.coroutines.flow.catch +import kotlinx.coroutines.flow.map +import kotlinx.coroutines.flow.onCompletion import kotlinx.coroutines.isActive import tech.figure.block.api.client.BlockAPIClient import tech.figure.block.api.proto.BlockServiceOuterClass @@ -10,21 +12,15 @@ import tech.figure.classification.asset.verifier.config.EventStreamProvider import tech.figure.classification.asset.verifier.config.RecoveryStatus import tech.figure.classification.asset.verifier.config.RetryPolicy import tech.figure.classification.asset.verifier.provenance.AssetClassificationEvent +import tech.figure.classification.asset.verifier.provenance.WASM_EVENT_TYPE import tech.figure.eventstream.stream.models.Event import tech.figure.eventstream.stream.models.TxEvent -import java.util.concurrent.atomic.AtomicLong -import kotlin.time.Duration.Companion.milliseconds class BlockApiEventStreamProvider( private val blockApiClient: BlockAPIClient, private val coroutineScope: CoroutineScope, private val retry: RetryPolicy? = null ) : EventStreamProvider { - - companion object { - const val DEFAULT_BLOCK_DELAY_MS: Double = 4000.0 - } - override suspend fun currentHeight(): Long = currentHeightInternal() @@ -32,30 +28,24 @@ class BlockApiEventStreamProvider( height: Long?, onBlock: suspend (blockHeight: Long) -> Unit, onEvent: suspend (event: AssetClassificationEvent) -> Unit, + onEventsProcessed: suspend (blockHeight: Long) -> Unit, onError: suspend (throwable: Throwable) -> Unit, onCompletion: suspend (throwable: Throwable?) -> Unit ): RecoveryStatus { - val lastProcessed = AtomicLong(0) - var current = currentHeightInternal { e -> onError(e) } - var from = height ?: 1 - - if (from > current) throw IllegalArgumentException("Cannot fetch block greater than the current height! Requested: $height, current: $current") - try { - while (coroutineScope.isActive) { - (from..current).forEach { blockHeight -> - process(blockHeight, onBlock, onEvent, onError) - lastProcessed.set(blockHeight + 1) + blockApiClient.streamBlocks( + start = height ?: 1, + preference = BlockServiceOuterClass.PREFER.TX_EVENTS + ).catch { e -> onError(e) } + .onCompletion { t -> onCompletion(t) } + .collect { block -> + val blockHeight = block.blockResult.block.height + onBlock(blockHeight) + // Map all captured block data to AssetClassificationEvents, which will remove all non-wasm events + // encountered + block.toAssetClassificationEvents().forEach { event -> onEvent(event) } + onEventsProcessed(blockHeight) } - - // We've reached the current block, so fire the completion event - onCompletion(null) - - // Once we've met the current block, no need to keep spinning. Wait here for 4 seconds and process again. - delay(DEFAULT_BLOCK_DELAY_MS.milliseconds) - from = lastProcessed.get() - current = currentHeightInternal { e -> onError(e) } - } } catch (ex: Exception) { onError(ex) } @@ -79,45 +69,22 @@ class BlockApiEventStreamProvider( throw IllegalArgumentException("Unable to get current height from block api!", ex) } - private suspend fun process( - height: Long, - onBlock: suspend (blockHeight: Long) -> Unit, - onEvent: suspend (event: AssetClassificationEvent) -> Unit, - onError: suspend (throwable: Throwable) -> Unit - ) { - runCatching { - retry?.tryAction { - getBlock(height, onBlock, onEvent) - } ?: getBlock(height, onBlock, onEvent) - }.onFailure { error -> - onError(error) - } - } - - private suspend fun getBlock( - height: Long, - onBlock: suspend (blockHeight: Long) -> Unit, - onEvent: suspend (event: AssetClassificationEvent) -> Unit - ) { - blockApiClient.getBlockByHeight(height, BlockServiceOuterClass.PREFER.TX_EVENTS).also { - onBlock(it.block.height) - - toAssetClassificationEvent(it).forEach { classificationEvent -> - onEvent(classificationEvent) - } - } - } - - private fun toAssetClassificationEvent(data: BlockServiceOuterClass.BlockResult): List = - data.block.transactionsList.flatMap { tx -> - tx.eventsList.map { event -> + private fun BlockServiceOuterClass.BlockStreamResult.toAssetClassificationEvents(): List { + val blockDateTime by lazy { blockResult.block.time.toOffsetDateTimeOrNull() } + return blockResult.block.transactionsList + .flatMap { it.eventsList } + // Only keep events of type WASM. All other event types are guaranteed to be unrelated to the + // Asset Classification smart contract. This check can happen prior to any other parsing of data inside + // the TxEvent, which will be a minor speed increase to downstream processing + .filter { it.eventType == WASM_EVENT_TYPE } + .map { event -> AssetClassificationEvent( TxEvent( blockHeight = event.height, txHash = event.txHash, eventType = event.eventType, attributes = event.attributesList.map { Event(it.key, it.value, it.index) }, - blockDateTime = data.block.time.toOffsetDateTimeOrNull(), + blockDateTime = blockDateTime, fee = null, denom = null, note = null @@ -126,5 +93,5 @@ class BlockApiEventStreamProvider( inputValuesEncoded = false ) } - } + } } diff --git a/verifier/src/main/kotlin/tech/figure/classification/asset/verifier/util/eventstream/providers/DefaultEventStreamProvider.kt b/verifier/src/main/kotlin/tech/figure/classification/asset/verifier/util/eventstream/providers/DefaultEventStreamProvider.kt index a53e360..cd7081f 100644 --- a/verifier/src/main/kotlin/tech/figure/classification/asset/verifier/util/eventstream/providers/DefaultEventStreamProvider.kt +++ b/verifier/src/main/kotlin/tech/figure/classification/asset/verifier/util/eventstream/providers/DefaultEventStreamProvider.kt @@ -3,7 +3,6 @@ package tech.figure.classification.asset.verifier.util.eventstream.providers import kotlinx.coroutines.flow.catch import kotlinx.coroutines.flow.map import kotlinx.coroutines.flow.onCompletion -import kotlinx.coroutines.flow.onEach import okhttp3.OkHttpClient import tech.figure.classification.asset.verifier.config.EventStreamProvider import tech.figure.classification.asset.verifier.config.RecoveryStatus @@ -38,20 +37,19 @@ class DefaultEventStreamProvider( height: Long?, onBlock: suspend (blockHeight: Long) -> Unit, onEvent: suspend (event: AssetClassificationEvent) -> Unit, + onEventsProcessed: suspend (blockHeight: Long) -> Unit, onError: suspend (throwable: Throwable) -> Unit, onCompletion: suspend (throwable: Throwable?) -> Unit ): RecoveryStatus { verifierBlockDataFlow(netAdapter, decoderAdapter, from = height) .catch { e -> onError(e) } .onCompletion { t -> onCompletion(t) } - .onEach { block -> + .collect { block -> onBlock(block.height) - } - // Map all captured block data to AssetClassificationEvents, which will remove all non-wasm events - // encountered - .map { toAssetClassificationEvent(it) } - .collect { events -> - events.forEach { event -> onEvent(event) } + // Map all captured block data to AssetClassificationEvents, which will remove all non-wasm events + // encountered + block.toAssetClassificationEvents().forEach { event -> onEvent(event) } + onEventsProcessed(block.height) } // The event stream flow should execute infinitely unless some error occurs, so this line will only be reached // on connection failures or other problems. @@ -69,11 +67,11 @@ class DefaultEventStreamProvider( return RecoveryStatus.RECOVERABLE } - private fun toAssetClassificationEvent(data: BlockData) = - data.blockResult + private fun BlockData.toAssetClassificationEvents(): List = + blockResult // Use the event stream library's excellent extension functions to grab the needed TxEvent from // the block result, using the same strategy that their EventStream object does - .txEvents(data.block.header?.dateTime()) { index -> data.block.txData(index) } + .txEvents(block.header?.dateTime()) { index -> block.txData(index) } // Only keep events of type WASM. All other event types are guaranteed to be unrelated to the // Asset Classification smart contract. This check can happen prior to any other parsing of data inside // the TxEvent, which will be a minor speed increase to downstream processing From db97135cf7e521f4a655c20c1b56b9e98d0cfc79 Mon Sep 17 00:00:00 2001 From: piercetrey-figure Date: Fri, 1 Sep 2023 15:48:29 -0600 Subject: [PATCH 2/3] java 17 for GHA --- .github/workflows/build.yaml | 7 ++++--- .github/workflows/publish.yaml | 6 +++--- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/.github/workflows/build.yaml b/.github/workflows/build.yaml index 1bd7042..ab9146a 100644 --- a/.github/workflows/build.yaml +++ b/.github/workflows/build.yaml @@ -12,11 +12,12 @@ jobs: - name: Checkout Code uses: actions/checkout@v3 - - name: Setup JDK 11 + - name: Setup JDK 17 uses: actions/setup-java@v3 with: - java-version: '11' - distribution: 'adopt' + java-version: 17 + distribution: zulu + server-id: github - name: Build run: ./gradlew clean build --refresh-dependencies --parallel diff --git a/.github/workflows/publish.yaml b/.github/workflows/publish.yaml index 1b211b2..c74b1cb 100644 --- a/.github/workflows/publish.yaml +++ b/.github/workflows/publish.yaml @@ -29,11 +29,11 @@ jobs: # Export release version from step for use in publish artifact step echo "::set-env name=RELEASE_VERSION::$RELEASE_VERSION" - - name: Set up JDK 11 + - name: Set up JDK 17 uses: actions/setup-java@v3 with: - java-version: 11 - distribution: adopt + java-version: 17 + distribution: zulu server-id: github - name: Build with Gradle From 8ecce176dac85c094c9dbda6db276576f7c02988 Mon Sep 17 00:00:00 2001 From: piercetrey-figure Date: Thu, 7 Sep 2023 13:41:39 -0600 Subject: [PATCH 3/3] use env var for maven signing keyId --- .github/workflows/publish.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/publish.yaml b/.github/workflows/publish.yaml index c74b1cb..ded3c75 100644 --- a/.github/workflows/publish.yaml +++ b/.github/workflows/publish.yaml @@ -57,7 +57,7 @@ jobs: echo "Publishing release for version [$RELEASE_VERSION]" ./gradlew publishToSonatype $(if [ "${{github.event.release.prerelease}}" = "true" ]; then echo 'closeSonatypeStagingRepository'; else echo 'closeAndReleaseSonatypeStagingRepository'; fi) \ -PlibraryVersion=$RELEASE_VERSION \ - -Psigning.keyId=69C08EA0 -Psigning.password="${{ secrets.OSSRH_GPG_SECRET_KEY_PASSWORD }}" -Psigning.secretKeyRingFile=$GITHUB_WORKSPACE/release.gpg \ + -Psigning.keyId="${{ secrets.OSSRH_GPG_SECRET_KEY_ID }}" -Psigning.password="${{ secrets.OSSRH_GPG_SECRET_KEY_PASSWORD }}" -Psigning.secretKeyRingFile=$GITHUB_WORKSPACE/release.gpg \ --info