Skip to content

Commit

Permalink
Merge pull request #127 from FigureTechnologies/piercetrey-figure/sc-…
Browse files Browse the repository at this point in the history
…253150/update-to-blockstream

Update to Java 17 and blockapi blockstream
  • Loading branch information
celloman authored Sep 7, 2023
2 parents 22bcc00 + 8ecce17 commit e0e0931
Show file tree
Hide file tree
Showing 8 changed files with 56 additions and 87 deletions.
7 changes: 4 additions & 3 deletions .github/workflows/build.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,12 @@ jobs:
- name: Checkout Code
uses: actions/checkout@v3

- name: Setup JDK 11
- name: Setup JDK 17
uses: actions/setup-java@v3
with:
java-version: '11'
distribution: 'adopt'
java-version: 17
distribution: zulu
server-id: github

- name: Build
run: ./gradlew clean build --refresh-dependencies --parallel
Expand Down
8 changes: 4 additions & 4 deletions .github/workflows/publish.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,11 @@ jobs:
# Export release version from step for use in publish artifact step
echo "::set-env name=RELEASE_VERSION::$RELEASE_VERSION"
- name: Set up JDK 11
- name: Set up JDK 17
uses: actions/setup-java@v3
with:
java-version: 11
distribution: adopt
java-version: 17
distribution: zulu
server-id: github

- name: Build with Gradle
Expand All @@ -57,7 +57,7 @@ jobs:
echo "Publishing release for version [$RELEASE_VERSION]"
./gradlew publishToSonatype $(if [ "${{github.event.release.prerelease}}" = "true" ]; then echo 'closeSonatypeStagingRepository'; else echo 'closeAndReleaseSonatypeStagingRepository'; fi) \
-PlibraryVersion=$RELEASE_VERSION \
-Psigning.keyId=69C08EA0 -Psigning.password="${{ secrets.OSSRH_GPG_SECRET_KEY_PASSWORD }}" -Psigning.secretKeyRingFile=$GITHUB_WORKSPACE/release.gpg \
-Psigning.keyId="${{ secrets.OSSRH_GPG_SECRET_KEY_ID }}" -Psigning.password="${{ secrets.OSSRH_GPG_SECRET_KEY_PASSWORD }}" -Psigning.secretKeyRingFile=$GITHUB_WORKSPACE/release.gpg \
--info
10 changes: 5 additions & 5 deletions buildSrc/src/main/kotlin/core-config.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,18 @@ repositories {
tasks.withType<KotlinCompile> {
kotlinOptions {
freeCompilerArgs = listOf("-Xjsr305=strict")
jvmTarget = "11"
jvmTarget = "17"
}
}

tasks.withType<JavaCompile> {
sourceCompatibility = JavaVersion.VERSION_11.toString()
targetCompatibility = JavaVersion.VERSION_11.toString()
sourceCompatibility = JavaVersion.VERSION_17.toString()
targetCompatibility = JavaVersion.VERSION_17.toString()
}

configure<JavaPluginExtension> {
sourceCompatibility = JavaVersion.VERSION_11
targetCompatibility = JavaVersion.VERSION_11
sourceCompatibility = JavaVersion.VERSION_17
targetCompatibility = JavaVersion.VERSION_17
}

tasks.test {
Expand Down
2 changes: 1 addition & 1 deletion gradle/libs.versions.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[versions]
asset-model = "1.1.0"
asset-specs = "1.0.0"
blockapi = "0.1.10"
blockapi = "0.2.1"
bouncycastle = "1.70"
feign = "12.3"
figure-eventstream = "0.9.0"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,10 +96,12 @@ class VerifierClient(private val config: VerifierClientConfig) {
onBlock = { blockHeight ->
// Record each block intercepted
NewBlockReceived(blockHeight).send()
// Track new block height
latestBlock = trackBlockHeight(latestBlock, blockHeight)
},
onEvent = { event -> handleEvent(event) },
onEventsProcessed = { blockHeight ->
// Track new block height after successful processing of events in block
latestBlock = trackBlockHeight(latestBlock, blockHeight)
},
onError = { e -> StreamExceptionOccurred(e).send() },
onCompletion = { t -> StreamCompleted(t).send() }
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ interface EventStreamProvider {
height: Long? = null,
onBlock: (suspend (blockHeight: Long) -> Unit),
onEvent: (suspend (event: AssetClassificationEvent) -> Unit),
onEventsProcessed: (suspend (blockHeight: Long) -> Unit),
onError: (suspend (throwable: Throwable) -> Unit),
onCompletion: (suspend (throwable: Throwable?) -> Unit)
): RecoveryStatus
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,60 +2,50 @@ package tech.figure.classification.asset.verifier.util.eventstream.providers

import io.provenance.client.protobuf.extensions.time.toOffsetDateTimeOrNull
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.catch
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.onCompletion
import kotlinx.coroutines.isActive
import tech.figure.block.api.client.BlockAPIClient
import tech.figure.block.api.proto.BlockServiceOuterClass
import tech.figure.classification.asset.verifier.config.EventStreamProvider
import tech.figure.classification.asset.verifier.config.RecoveryStatus
import tech.figure.classification.asset.verifier.config.RetryPolicy
import tech.figure.classification.asset.verifier.provenance.AssetClassificationEvent
import tech.figure.classification.asset.verifier.provenance.WASM_EVENT_TYPE
import tech.figure.eventstream.stream.models.Event
import tech.figure.eventstream.stream.models.TxEvent
import java.util.concurrent.atomic.AtomicLong
import kotlin.time.Duration.Companion.milliseconds

class BlockApiEventStreamProvider(
private val blockApiClient: BlockAPIClient,
private val coroutineScope: CoroutineScope,
private val retry: RetryPolicy? = null
) : EventStreamProvider {

companion object {
const val DEFAULT_BLOCK_DELAY_MS: Double = 4000.0
}

override suspend fun currentHeight(): Long =
currentHeightInternal()

override suspend fun startProcessingFromHeight(
height: Long?,
onBlock: suspend (blockHeight: Long) -> Unit,
onEvent: suspend (event: AssetClassificationEvent) -> Unit,
onEventsProcessed: suspend (blockHeight: Long) -> Unit,
onError: suspend (throwable: Throwable) -> Unit,
onCompletion: suspend (throwable: Throwable?) -> Unit
): RecoveryStatus {
val lastProcessed = AtomicLong(0)
var current = currentHeightInternal { e -> onError(e) }
var from = height ?: 1

if (from > current) throw IllegalArgumentException("Cannot fetch block greater than the current height! Requested: $height, current: $current")

try {
while (coroutineScope.isActive) {
(from..current).forEach { blockHeight ->
process(blockHeight, onBlock, onEvent, onError)
lastProcessed.set(blockHeight + 1)
blockApiClient.streamBlocks(
start = height ?: 1,
preference = BlockServiceOuterClass.PREFER.TX_EVENTS
).catch { e -> onError(e) }
.onCompletion { t -> onCompletion(t) }
.collect { block ->
val blockHeight = block.blockResult.block.height
onBlock(blockHeight)
// Map all captured block data to AssetClassificationEvents, which will remove all non-wasm events
// encountered
block.toAssetClassificationEvents().forEach { event -> onEvent(event) }
onEventsProcessed(blockHeight)
}

// We've reached the current block, so fire the completion event
onCompletion(null)

// Once we've met the current block, no need to keep spinning. Wait here for 4 seconds and process again.
delay(DEFAULT_BLOCK_DELAY_MS.milliseconds)
from = lastProcessed.get()
current = currentHeightInternal { e -> onError(e) }
}
} catch (ex: Exception) {
onError(ex)
}
Expand All @@ -79,45 +69,22 @@ class BlockApiEventStreamProvider(
throw IllegalArgumentException("Unable to get current height from block api!", ex)
}

private suspend fun process(
height: Long,
onBlock: suspend (blockHeight: Long) -> Unit,
onEvent: suspend (event: AssetClassificationEvent) -> Unit,
onError: suspend (throwable: Throwable) -> Unit
) {
runCatching {
retry?.tryAction {
getBlock(height, onBlock, onEvent)
} ?: getBlock(height, onBlock, onEvent)
}.onFailure { error ->
onError(error)
}
}

private suspend fun getBlock(
height: Long,
onBlock: suspend (blockHeight: Long) -> Unit,
onEvent: suspend (event: AssetClassificationEvent) -> Unit
) {
blockApiClient.getBlockByHeight(height, BlockServiceOuterClass.PREFER.TX_EVENTS).also {
onBlock(it.block.height)

toAssetClassificationEvent(it).forEach { classificationEvent ->
onEvent(classificationEvent)
}
}
}

private fun toAssetClassificationEvent(data: BlockServiceOuterClass.BlockResult): List<AssetClassificationEvent> =
data.block.transactionsList.flatMap { tx ->
tx.eventsList.map { event ->
private fun BlockServiceOuterClass.BlockStreamResult.toAssetClassificationEvents(): List<AssetClassificationEvent> {
val blockDateTime by lazy { blockResult.block.time.toOffsetDateTimeOrNull() }
return blockResult.block.transactionsList
.flatMap { it.eventsList }
// Only keep events of type WASM. All other event types are guaranteed to be unrelated to the
// Asset Classification smart contract. This check can happen prior to any other parsing of data inside
// the TxEvent, which will be a minor speed increase to downstream processing
.filter { it.eventType == WASM_EVENT_TYPE }
.map { event ->
AssetClassificationEvent(
TxEvent(
blockHeight = event.height,
txHash = event.txHash,
eventType = event.eventType,
attributes = event.attributesList.map { Event(it.key, it.value, it.index) },
blockDateTime = data.block.time.toOffsetDateTimeOrNull(),
blockDateTime = blockDateTime,
fee = null,
denom = null,
note = null
Expand All @@ -126,5 +93,5 @@ class BlockApiEventStreamProvider(
inputValuesEncoded = false
)
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package tech.figure.classification.asset.verifier.util.eventstream.providers
import kotlinx.coroutines.flow.catch
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.onCompletion
import kotlinx.coroutines.flow.onEach
import okhttp3.OkHttpClient
import tech.figure.classification.asset.verifier.config.EventStreamProvider
import tech.figure.classification.asset.verifier.config.RecoveryStatus
Expand Down Expand Up @@ -38,20 +37,19 @@ class DefaultEventStreamProvider(
height: Long?,
onBlock: suspend (blockHeight: Long) -> Unit,
onEvent: suspend (event: AssetClassificationEvent) -> Unit,
onEventsProcessed: suspend (blockHeight: Long) -> Unit,
onError: suspend (throwable: Throwable) -> Unit,
onCompletion: suspend (throwable: Throwable?) -> Unit
): RecoveryStatus {
verifierBlockDataFlow(netAdapter, decoderAdapter, from = height)
.catch { e -> onError(e) }
.onCompletion { t -> onCompletion(t) }
.onEach { block ->
.collect { block ->
onBlock(block.height)
}
// Map all captured block data to AssetClassificationEvents, which will remove all non-wasm events
// encountered
.map { toAssetClassificationEvent(it) }
.collect { events ->
events.forEach { event -> onEvent(event) }
// Map all captured block data to AssetClassificationEvents, which will remove all non-wasm events
// encountered
block.toAssetClassificationEvents().forEach { event -> onEvent(event) }
onEventsProcessed(block.height)
}
// The event stream flow should execute infinitely unless some error occurs, so this line will only be reached
// on connection failures or other problems.
Expand All @@ -69,11 +67,11 @@ class DefaultEventStreamProvider(
return RecoveryStatus.RECOVERABLE
}

private fun toAssetClassificationEvent(data: BlockData) =
data.blockResult
private fun BlockData.toAssetClassificationEvents(): List<AssetClassificationEvent> =
blockResult
// Use the event stream library's excellent extension functions to grab the needed TxEvent from
// the block result, using the same strategy that their EventStream object does
.txEvents(data.block.header?.dateTime()) { index -> data.block.txData(index) }
.txEvents(block.header?.dateTime()) { index -> block.txData(index) }
// Only keep events of type WASM. All other event types are guaranteed to be unrelated to the
// Asset Classification smart contract. This check can happen prior to any other parsing of data inside
// the TxEvent, which will be a minor speed increase to downstream processing
Expand Down

0 comments on commit e0e0931

Please sign in to comment.