Skip to content

Commit

Permalink
Merge pull request #95 from FigureTechnologies/docs_cleanup
Browse files Browse the repository at this point in the history
Update documentation, cleanup and remove any leftover aws dependencies
  • Loading branch information
rchaing-figure authored Feb 9, 2023
2 parents aa9b0ec + 393abe1 commit 0da937a
Show file tree
Hide file tree
Showing 19 changed files with 87 additions and 294 deletions.
16 changes: 2 additions & 14 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
.PHONY: build-dist clean clean-test localstack-start localstack-stop localstack-status run-local test
.PHONY: build-dist clean clean-test run-local test

# Make all environment variables available to child processes
.EXPORT_ALL_VARIABLES:

NAME := aggregate-service
BUILD := $(PWD)/build
GRADLEW := ./gradlew
LOCALSTACK_PORT := 4566

all: run-local

Expand All @@ -16,22 +15,11 @@ clean:
clean-test:
$(GRADLEW) cleanTest

localstack-start:
@echo "- Starting localstack"
@docker-compose -f docker-compose.local.yml up --abort-on-container-exit

localstack-stop:
@echo "- Stopping localstack"
@docker-compose -f docker-compose.local.yml down --remove-orphans --volumes

localstack-status:
@curl -s http://localhost:$(LOCALSTACK_PORT)/health | jq -e .services

build-dist:
$(GRADLEW) installDist

run-local: build-dist
AWS_REGION=us-east-1 ENVIRONMENT=local $(BUILD)/install/$(NAME)/bin/$(NAME) $(ARGS)
ENVIRONMENT=local $(BUILD)/install/$(NAME)/bin/$(NAME) $(ARGS)

test: clean-test
$(GRADLEW) test -i
130 changes: 11 additions & 119 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,121 +16,23 @@
---

The purpose of this service is to retrieve block data over time from the Provenance
blockchain so that we can compute aggregated data at a reasonable rate to perform
Blockchain so that we can compute aggregated data at a reasonable rate to perform
business decision queries.

The aggregator service makes use of the [Provenance Event Stream Library](https://github.com/provenance-io/event-stream)
The aggregator service makes use of the [Event Stream Library](https://github.com/FigureTechnologies/event-stream)
to stream in ordered blocks and transform the block's data into the business data needs.

## Local Setup

---

To run the service outside of a container:

### 1. Install AWS CLI tool

You will need to install the AWS CLI, which can be downloaded from https://aws.amazon.com/cli, or
installed via homebrew:

```bash
$ brew install awscli
```

Once installed, run `aws configure` to begin token and region configurations.

### 2. Environment setup

Prior to running Localstack (see below), set the environment variables `AWS_ACCESS_KEY_ID` and `AWS_SECRET_ACCESS_KEY` to `test`, otherwise you can setup a specific profile:

```bash
$ aws configure --profile <profile_name>
```

if you are permissioned to an AWS instance.

### 3. Running LocalStack

When developing locally, we utilize LocalStack, a tool that mocks AWS services like S3 and DynamoD.

#### Starting

```bash
$ make localstack-start
```

#### Status

Check if LocalStack is running:

```bash
$ make localstack-status

> {
"dynamodbstreams": "running",
"kinesis": "running",
"s3": "running",
"dynamodb": "running"
}
```
### 1. Database Setup

#### Stopping
Run a postgres docker image:

```bash
$ make localstack-stop
$ docker run --name postgresdb -p 5432:5432 -e POSTGRES_USER=postgres -e POSTGRES_PASSWORD=password1 -e POSTGRES_DB=aggregate -d postgres
```
---
### 4.A Use the node public IP

In the [local.env.properties](hhttps://github.com/provenance-io/aggregate-service/blob/a0257f85f203cc65f3a63eeee3d3332dedec133c/src/main/resources/local.env.properties#L8) you could set the public IP address of the
query node that you wish to stream from.

### 4.B Port-forwarding for Provenance/Tendermint API

Port `26657` is expected to be open for RPC and web-socket traffic, which will
require port-forwarding to the Provenance Kubernetes cluster.

#### Using `figcli`

This can be accomplished easily using the internal Figure command line tool, `figcli`:

1. If it does not already exist, add a `[port_forward]` entry to you local `figcli`
configuration file:

```
[port_forward]
context = "gke_provenance-io-test_us-east1-b_p-io-cluster"
```

This example will use the Provenance test cluster.

2. Create a pod in the cluster to perform port-forwarding on a specific remote host and port:

```bash
$ figcli port-forward 26657:$REMOTE_HOST:26657
```
where `$REMOTE_HOST` is a private IP within the cluster network, e.g. `192.168.xxx.xxx`

3. If successful, you should see the pod listed in the output of `kubectl get pods`:

```bash
$ kubectl get pods
NAME READY STATUS RESTARTS AGE
datadog-agent-29nvm 1/1 Running 2 26d
datadog-agent-44rdl 1/1 Running 1 26d
datadog-agent-xcj48 1/1 Running 1 26d
...
datadog-agent-xfkmw 1/1 Running 1 26d
figcli-temp-port-forward-fi8dlktx 1/1 Running 0 15m <<<
nginx-test 1/1 Running 0 154d
```

Traffic on `localhost:26657` will now be forwarded to the Provenance cluster

---

### 5. Block Caching
### 2. Block Caching

The aggregate service also supports the ability to cache block data within a local NoSQL database if a cloud data warehouse is not desired.

Expand All @@ -140,12 +42,11 @@ To run RavenDB locally:
```bash
$ docker run -p 8080:8080 ravendb/ravendb:ubuntu-latest
```
Once ravenDB is running could access its GUI interface to set up the database at http://localhost:8080, then you could make changes to the [local.env.properties](https://github.com/provenance-io/aggregate-service/blob/main/src/main/resources/local.env.properties) to support the desired configurations.

Once ravenDB is running could access its GUI interface to set up the database at http://localhost:8080, then you could make changes to the [local.env.properties](https://github.com/FigureTechnologies/aggregate-service/blob/main/src/main/resources/local.env.properties) to support the desired configurations.

---

### 6. Running
### 3. Running

The service can be run locally:

Expand All @@ -165,24 +66,15 @@ To run the containerized service:

- pull the image from to get the latest version:
```
$ docker pull ghcr.io/provenance-io/aggregate-service:latest
$ docker pull figuretechnologies/aggregate-service:latest
```
---

### 6. Deployment
### 4. Deployment

#### Github Actions

The aggregator CI/CD process uses Github Actions to build the docker container and deploy the image onto [GHCR](https://github.com/features/packages) (Github Container Registry) from where the docker container can be pulled from any deployment environment.
The aggregator CI/CD process uses Github Actions to build the docker container and deploy the image onto [Docker Hub](https://hub.docker.com/r/figuretechnologies/aggregate-service) from where the docker container can be pulled from any deployment environment.

#### Serverless

We utilize `serverless` for our AWS infrastructure deployment via github actions: https://serverless.com. Sample serverless yaml could be found [here](https://github.com/provenance-io/aggregate-service/blob/main/serverless_example.yml).

Install the serverless package to test and develop with new configurations:

```bash
$ npm install -g serverless
```


8 changes: 3 additions & 5 deletions augment/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,6 @@ dependencies {
implementation(libs.hoplite.core)
implementation(libs.hoplite.yaml)
implementation(libs.json)
implementation(platform(libs.aws.bom))
implementation(libs.bundles.aws)
implementation(libs.localstack)
implementation(libs.grpc.protobuf)
implementation(libs.grpc.stub)
implementation(libs.logback.classic)
Expand All @@ -45,10 +42,11 @@ dependencies {
implementation(libs.kotlin.guava)
implementation(libs.kotlin.jdk8coroutines)
implementation(libs.commons.dbutils)
implementation(platform(libs.aws.bom))
implementation(libs.aws.s3)
implementation(libs.exposed.core)

runtimeOnly(libs.grpc.netty)
testImplementation(libs.kotlin.test)

}

tasks.compileTestKotlin {
Expand Down
6 changes: 5 additions & 1 deletion augment/src/main/kotlin/tech/figure/augment/Main.kt
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,10 @@ fun main(args: Array<String>) {
put("networkTimeout", "30")
put("queryTimeout", "30")
}
val dbConnection = DriverManager.getConnection("jdbc:snowflake://${unwrapEnvOrError("DW_HOST")}.snowflakecomputing.com", properties)
val dbConnection = DriverManager.getConnection(
"jdbc:postgresql://${unwrapEnvOrError("DB_HOST")}:${unwrapEnvOrError("DB_PORT")}/${unwrapEnvOrError("DB_NAME")}",
properties
)

val provenanceUri = URI(unwrapEnvOrError("PROVENANCE_GRPC_URL"))
val channel = ManagedChannelBuilder
Expand All @@ -56,6 +59,7 @@ fun main(args: Array<String>) {
}
}
.build()

val semaphore = Semaphore(System.getenv("GRPC_CONCURRENCY")?.toInt() ?: Const.DEFAULT_GRPC_CONCURRENCY)
val provenanceClient = ProvenanceClient(channel, semaphore)

Expand Down
132 changes: 56 additions & 76 deletions augment/src/main/kotlin/tech/figure/augment/Output.kt
Original file line number Diff line number Diff line change
@@ -1,76 +1,56 @@
/**
* COMMENT OUT FOR NOW, WE MAY NEED THIS FOR THE SECONDARY, RIGHT NOW NOT BEING USED.
*/
//package tech.figure.augment
//
//import tech.figure.aggregate.common.AwsConfig
//import tech.figure.aggregate.common.S3Config
//import tech.figure.aggregate.common.aws.AwsClient
//import tech.figure.aggregate.common.aws.s3.S3Bucket
//import tech.figure.aggregate.common.db.Key
//import tech.figure.aggregate.common.aws.s3.StreamableObject
//import tech.figure.aggregate.common.writer.csv.ApacheCommonsCSVRecordWriter
//import io.provenance.eventstream.config.Environment
//import org.apache.commons.csv.CSVFormat
//import org.slf4j.Logger
//import software.amazon.awssdk.core.async.AsyncRequestBody
//import tech.figure.augment.dsl.Data
//import tech.figure.augment.dsl.LoggingOutput
//import tech.figure.augment.dsl.Output
//import tech.figure.augment.dsl.S3Output
//import java.io.BufferedOutputStream
//import java.io.OutputStreamWriter
//import java.nio.file.Files
//import java.nio.file.StandardOpenOption
//import java.time.OffsetDateTime
//import java.util.*
//
//fun Data.filterColumns(output: Output): Data {
// val columns = when (output) {
// is LoggingOutput -> output.columns
// is S3Output -> output.columns
// }.toSortedSet()
//
// return this.map { row ->
// row.filterKeys(columns::contains)
// }
//}
//
//suspend fun Data.output(environment: Environment, jobName: String, output: Output, log: Logger): Unit = when (output) {
// is LoggingOutput -> log.info("LoggingOutput result = $this")
// is S3Output -> {
// val config = AwsConfig(
// region = System.getenv("AWS_REGION"),
// S3Config(S3Bucket(output.bucket))
// )
// val client = AwsClient.create(config.s3)
// val outputFile = Files.createTempFile("", "staging_file.csv")
// val outputStream = BufferedOutputStream(Files.newOutputStream(outputFile, StandardOpenOption.APPEND, StandardOpenOption.WRITE))
// val writer = ApacheCommonsCSVRecordWriter.Builder()
// .format(
// CSVFormat.Builder.create()
// .apply { setHeader(*output.columns.toTypedArray()) }
// .build()
// )
// .output(OutputStreamWriter(outputStream))
// .build()
//
// writer.use {
// this.forEach { row ->
// it.writeRecord(*output.columns.map(row::getValue).toTypedArray())
// }
// }
//
// val key = Key.create(OffsetDateTime.now(), "cron", UUID.randomUUID().toString(), "${output.tableName}.csv")
// client.s3().streamObject(object : StreamableObject {
// override val key: Key get() = key
// override val body: AsyncRequestBody get() = AsyncRequestBody.fromFile(outputFile)
// override val metadata: Map<String, String> get() = mapOf(
// "JobName" to jobName,
// "TableName" to output.tableName,
// )
// })
//
// log.info("$jobName output file written to ${key.value}")
// }
//}
package tech.figure.augment

import tech.figure.aggregate.common.writer.csv.ApacheCommonsCSVRecordWriter
import io.provenance.eventstream.config.Environment
import org.apache.commons.csv.CSVFormat
import org.jetbrains.exposed.sql.transactions.transaction
import org.slf4j.Logger
import tech.figure.aggregate.common.db.DBClient
import tech.figure.augment.dsl.Data
import tech.figure.augment.dsl.LoggingOutput
import tech.figure.augment.dsl.Output
import tech.figure.augment.dsl.ResultOutput
import java.io.BufferedOutputStream
import java.io.OutputStreamWriter
import java.nio.file.Files
import java.nio.file.StandardOpenOption

fun Data.filterColumns(output: Output): Data {
val columns = when (output) {
is LoggingOutput -> output.columns
is ResultOutput -> output.columns
}.toSortedSet()

return this.map { row ->
row.filterKeys(columns::contains)
}
}

suspend fun Data.output(environment: Environment, jobName: String, output: Output, log: Logger): Unit = when (output) {
is LoggingOutput -> log.info("LoggingOutput result = $this")
is ResultOutput -> {
val dbClient = DBClient()
val outputFile = Files.createTempFile("", "staging_file.csv")
val outputStream = BufferedOutputStream(Files.newOutputStream(outputFile, StandardOpenOption.APPEND, StandardOpenOption.WRITE))
val writer = ApacheCommonsCSVRecordWriter.Builder()
.format(
CSVFormat.Builder.create()
.apply { setHeader(*output.columns.toTypedArray()) }
.build()
)
.output(OutputStreamWriter(outputStream))
.build()

writer.use {
this.forEach { row ->
it.writeRecord(*output.columns.map(row::getValue).toTypedArray())
}
}

transaction {
dbClient.handleInsert("nycb_usdf_balances", outputFile.toFile())
}

log.info("$jobName output file written")
}
}
Loading

0 comments on commit 0da937a

Please sign in to comment.