Skip to content

Commit

Permalink
Merge pull request #2 from MinaFoundation/s3-fallback
Browse files Browse the repository at this point in the history
PM-1343 S3 fallback
  • Loading branch information
piotr-iohk authored Mar 27, 2024
2 parents 42de27a + 80f73fd commit a49e8d7
Show file tree
Hide file tree
Showing 10 changed files with 257 additions and 107 deletions.
11 changes: 9 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,11 @@ $ make

## Configuration

**Runtime Configuration**:
**1. Runtime Configuration**:

- `DELEGATION_VERIFY_BIN_PATH` - path to [Stateless verifier tool](https://github.com/MinaProtocol/mina/tree/develop/src/app/delegation_verify) binary.

**AWS Keyspaces/Cassandra Configuration**:
**2. AWS Keyspaces/Cassandra Configuration**:

**Mandatory/common env vars:**
- `AWS_KEYSPACE` - Your Keyspace name.
Expand All @@ -39,6 +39,13 @@ $ make
- `AWS_ACCESS_KEY_ID` - Your AWS Access Key ID. No need to set if `AWS_WEB_IDENTITY_TOKEN_FILE`, `AWS_ROLE_SESSION_NAME` and `AWS_ROLE_ARN` are set.
- `AWS_SECRET_ACCESS_KEY` - Your AWS Secret Access Key. No need to set if `AWS_WEB_IDENTITY_TOKEN_FILE`, `AWS_ROLE_SESSION_NAME` and `AWS_ROLE_ARN` are set.

**3. AWS S3 Configuration**:

- `AWS_S3_BUCKET` - AWS S3 Bucket where blocks and submissions are stored.
- `NETWORK_NAME` - Network name (in case block does not exist in Cassandra we attempt to download it from AWS S3 from `AWS_S3_BUCKET`\\`NETWORK_NAME`\blocks)
- `AWS_REGION` - The AWS region where your S3 bucket is located. While this is automatically retrieved, it can also be explicitly set through environment variables or AWS configuration files.
- `AWS_ACCESS_KEY_ID` and `AWS_SECRET_ACCESS_KEY` - Your AWS credentials. These are automatically retrieved from your environment or AWS configuration files but should be securely stored and accessible in your deployment environment.

## Run

```
Expand Down
4 changes: 2 additions & 2 deletions shell.nix
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
with import <nixpkgs> { };
with import (fetchTarball "https://nixos.org/channels/nixos-unstable/nixexprs.tar.xz") { };
{
devEnv = stdenv.mkDerivation {
name = "dev";
buildInputs = [ stdenv go_1_20 glibc ];
buildInputs = [ stdenv go_1_21 glibc ];
shellHook = ''
return
'';
Expand Down
88 changes: 31 additions & 57 deletions src/app_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,27 +10,23 @@ import (
func LoadEnv(log logging.EventLogger) AppConfig {
var config AppConfig

// networkName := getEnvChecked("NETWORK_NAME", log)
// config.NetworkName = networkName

// // AWS configurations
// if bucketName := os.Getenv("AWS_BUCKET"); bucketName != "" {
// // accessKeyId, secretAccessKey are not mandatory for production set up
// accessKeyId := os.Getenv("AWS_ACCESS_KEY_ID")
// secretAccessKey := os.Getenv("AWS_SECRET_ACCESS_KEY")
// awsRegion := getEnvChecked("AWS_REGION", log)
// bucketName = getEnvChecked("AWS_BUCKET", log)

// config.Aws = &AwsConfig{
// BucketName: bucketName,
// Region: awsRegion,
// AccessKeyId: accessKeyId,
// SecretAccessKey: secretAccessKey,
// }
// }

// delegation_verify bin path
delegationVerifyBinPath := getEnvChecked("DELEGATION_VERIFY_BIN_PATH", log)
networkName := getEnvChecked("NETWORK_NAME", log)

// AWS configurations
bucketName := getEnvChecked("AWS_S3_BUCKET", log)
awsRegion := os.Getenv("AWS_REGION")

// if webIdentityTokenFile, roleSessionName and roleArn are set,
// we are using AWS STS to assume a role and get temporary credentials
// if they are not set, we are using AWS IAM user credentials
webIdentityTokenFile := os.Getenv("AWS_WEB_IDENTITY_TOKEN_FILE")
roleSessionName := os.Getenv("AWS_ROLE_SESSION_NAME")
roleArn := os.Getenv("AWS_ROLE_ARN")
// accessKeyId, secretAccessKey are not mandatory for production set up
accessKeyId := os.Getenv("AWS_ACCESS_KEY_ID")
secretAccessKey := os.Getenv("AWS_SECRET_ACCESS_KEY")

// AWSKeyspace/Cassandra configurations
awsKeyspace := getEnvChecked("AWS_KEYSPACE", log)
Expand All @@ -46,19 +42,7 @@ func LoadEnv(log logging.EventLogger) AppConfig {
cassandraUsername := os.Getenv("CASSANDRA_USERNAME")
cassandraPassword := os.Getenv("CASSANDRA_PASSWORD")

//aws keyspaces connection
awsRegion := os.Getenv("AWS_REGION")

// if webIdentityTokenFile, roleSessionName and roleArn are set,
// we are using AWS STS to assume a role and get temporary credentials
// if they are not set, we are using AWS IAM user credentials
webIdentityTokenFile := os.Getenv("AWS_WEB_IDENTITY_TOKEN_FILE")
roleSessionName := os.Getenv("AWS_ROLE_SESSION_NAME")
roleArn := os.Getenv("AWS_ROLE_ARN")
// accessKeyId, secretAccessKey are not mandatory for production set up
accessKeyId := os.Getenv("AWS_ACCESS_KEY_ID")
secretAccessKey := os.Getenv("AWS_SECRET_ACCESS_KEY")

config.NetworkName = networkName
config.DelegationVerifyBinPath = delegationVerifyBinPath
config.CassandraConfig = &CassandraConfig{
Keyspace: awsKeyspace,
Expand All @@ -74,6 +58,12 @@ func LoadEnv(log logging.EventLogger) AppConfig {
RoleArn: roleArn,
SSLCertificatePath: sslCertificatePath,
}
config.AwsConfig = &AwsConfig{
BucketName: bucketName,
Region: awsRegion,
AccessKeyId: accessKeyId,
SecretAccessKey: secretAccessKey,
}

return config
}
Expand All @@ -86,28 +76,12 @@ func getEnvChecked(variable string, log logging.EventLogger) string {
return value
}

// func boolEnvChecked(variable string, log logging.EventLogger) bool {
// value := os.Getenv(variable)
// switch value {
// case "1":
// return true
// case "0":
// return false
// case "":
// return false
// default:
// log.Fatalf("%s, if set, should be either 0 or 1!", variable)
// return false
// }
// }

// type AwsConfig struct {
// AccountId string `json:"account_id"`
// BucketName string `json:"bucket_name_suffix"`
// Region string `json:"region"`
// AccessKeyId string `json:"access_key_id"`
// SecretAccessKey string `json:"secret_access_key"`
// }
type AwsConfig struct {
BucketName string `json:"bucket_name"`
Region string `json:"region"`
AccessKeyId string `json:"access_key_id"`
SecretAccessKey string `json:"secret_access_key"`
}

type CassandraConfig struct {
Keyspace string `json:"keyspace"`
Expand All @@ -125,8 +99,8 @@ type CassandraConfig struct {
}

type AppConfig struct {
// NetworkName string `json:"network_name"`
// Aws *AwsConfig `json:"aws,omitempty"`
NetworkName string `json:"network_name"`
DelegationVerifyBinPath string `json:"delegation_verify_bin_path"`
CassandraConfig *CassandraConfig `json:"cassandra_config,omitempty"`
AwsConfig *AwsConfig `json:"aws"`
CassandraConfig *CassandraConfig `json:"cassandra_config"`
}
35 changes: 35 additions & 0 deletions src/app_context.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package main

import (
"context"

"github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/gocql/gocql"
logging "github.com/ipfs/go-log/v2"
)

// AppContext holds shared resources and configurations.
type AppContext struct {
CassandraSession *gocql.Session
S3Session *s3.Client
Log *logging.ZapEventLogger
}

// NewAppContext creates a new context with the necessary components.
func NewAppContext(ctx context.Context, cassandraConfig *CassandraConfig, awsConfig *AwsConfig, log *logging.ZapEventLogger) (*AppContext, error) {
cassandraSession, err := InitializeCassandraSession(cassandraConfig)
if err != nil {
return nil, err
}

s3Session, err := InitializeS3Session(ctx, awsConfig.Region)
if err != nil {
return nil, err
}

return &AppContext{
CassandraSession: cassandraSession,
Log: log,
S3Session: s3Session,
}, nil
}
50 changes: 11 additions & 39 deletions src/cassandra.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"github.com/aws/aws-sigv4-auth-cassandra-gocql-driver-plugin/sigv4"
"github.com/gocql/gocql"
_ "github.com/golang-migrate/migrate/v4/source/file"
logging "github.com/ipfs/go-log/v2"
)

// InitializeCassandraSession creates a new gocql session for Amazon Keyspaces using the provided configuration.
Expand Down Expand Up @@ -108,34 +107,7 @@ func sigv4Authentication(config *CassandraConfig) (sigv4.AwsAuthenticator, error
return auth, nil
}

type CassandraContext struct {
Session *gocql.Session
Keyspace string
Log *logging.ZapEventLogger
}

type Submission struct {
SubmittedAtDate string `json:"submitted_at_date"`
Shard int `json:"shard"`
SubmittedAt time.Time `json:"submitted_at"`
Submitter string `json:"submitter"`
CreatedAt time.Time `json:"created_at"`
BlockHash string `json:"block_hash"`
RawBlock []byte `json:"raw_block"`
RemoteAddr string `json:"remote_addr"`
PeerID string `json:"peer_id"`
SnarkWork []byte `json:"snark_work"`
GraphqlControlPort int `json:"graphql_control_port"`
BuiltWithCommitSha string `json:"built_with_commit_sha"`
StateHash string `json:"state_hash"`
Parent string `json:"parent"`
Height int `json:"height"`
Slot int `json:"slot"`
ValidationError string `json:"validation_error"`
Verified bool `json:"verified"`
}

func (kc *CassandraContext) selectRange(startTime, endTime time.Time) ([]Submission, error) {
func (ctx *AppContext) selectRange(startTime, endTime time.Time) ([]Submission, error) {

query := `SELECT submitted_at_date, shard, submitted_at, submitter, created_at, block_hash,
raw_block, remote_addr, peer_id, snark_work, graphql_control_port, built_with_commit_sha,
Expand All @@ -144,7 +116,7 @@ func (kc *CassandraContext) selectRange(startTime, endTime time.Time) ([]Submiss
WHERE ` + calculateDateRange(startTime, endTime) +
` AND ` + shardsToCql(calculateShardsInRange(startTime, endTime)) +
` AND submitted_at >= ? AND submitted_at < ?`
iter := kc.Session.Query(query, startTime, endTime).Iter()
iter := ctx.CassandraSession.Query(query, startTime, endTime).Iter()

var submissions []Submission
for {
Expand All @@ -160,39 +132,39 @@ func (kc *CassandraContext) selectRange(startTime, endTime time.Time) ([]Submiss
submissions = append(submissions, submission)
}
if err := iter.Close(); err != nil {
kc.Log.Errorf("Error closing iterator: %s", err)
ctx.Log.Errorf("Error closing iterator: %s", err)
return nil, err
}

return submissions, nil
}

func (kc *CassandraContext) tryUpdateSubmissions(submissions []Submission) error {
kc.Log.Infof("Updating %d submissions", len(submissions))
func (ctx *AppContext) tryUpdateSubmissions(submissions []Submission) error {
ctx.Log.Infof("Updating %d submissions", len(submissions))
for _, sub := range submissions {
// Update the submission
// Note: raw_block and snark_work are reseted to nil since we don't want to keep them in the database
query := `UPDATE submissions
SET state_hash = ?, parent = ?, height = ?, slot = ?, validation_error = ?, verified = ?,
raw_block = ?, snark_work = ?
WHERE submitted_at_date = ? AND shard = ? AND submitted_at = ? AND submitter = ?`
if err := kc.Session.Query(query,
if err := ctx.CassandraSession.Query(query,
sub.StateHash, sub.Parent, sub.Height, sub.Slot, sub.ValidationError, sub.Verified,
nil, nil,
sub.SubmittedAtDate, sub.Shard, sub.SubmittedAt, sub.Submitter).Exec(); err != nil {
kc.Log.Errorf("Failed to update submission: %v", err)
ctx.Log.Errorf("Failed to update submission: %v", err)
return err
}
}
kc.Log.Infof("Submissions updated")
ctx.Log.Infof("Submissions updated")

return nil
}

func (kc *CassandraContext) updateSubmissions(submissions []Submission) error {
func (ctx *AppContext) updateSubmissions(submissions []Submission) error {
return ExponentialBackoff(func() error {
if err := kc.tryUpdateSubmissions(submissions); err != nil {
kc.Log.Errorf("Error updating submissions (trying again): %v", err)
if err := ctx.tryUpdateSubmissions(submissions); err != nil {
ctx.Log.Errorf("Error updating submissions (trying again): %v", err)
return err
}
return nil
Expand Down
21 changes: 21 additions & 0 deletions src/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,34 @@ go 1.20

require (
github.com/aws/aws-sdk-go v1.50.33
github.com/aws/aws-sdk-go-v2 v1.26.0
github.com/aws/aws-sigv4-auth-cassandra-gocql-driver-plugin v1.1.0
github.com/gocql/gocql v1.6.0
github.com/golang-migrate/migrate/v4 v4.17.0
github.com/ipfs/go-log/v2 v2.5.1
)

require (
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.1 // indirect
github.com/aws/aws-sdk-go-v2/credentials v1.17.9 // indirect
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.0 // indirect
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.4 // indirect
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.4 // indirect
github.com/aws/aws-sdk-go-v2/internal/ini v1.8.0 // indirect
github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.4 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.11.1 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.3.6 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.6 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.17.4 // indirect
github.com/aws/aws-sdk-go-v2/service/sso v1.20.3 // indirect
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.23.3 // indirect
github.com/aws/aws-sdk-go-v2/service/sts v1.28.5 // indirect
github.com/aws/smithy-go v1.20.1 // indirect
)

require (
github.com/aws/aws-sdk-go-v2/config v1.27.9
github.com/aws/aws-sdk-go-v2/service/s3 v1.53.0
github.com/golang/snappy v0.0.4 // indirect
github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
Expand Down
36 changes: 36 additions & 0 deletions src/go.sum
Original file line number Diff line number Diff line change
@@ -1,8 +1,44 @@
github.com/aws/aws-sdk-go v1.49.12/go.mod h1:LF8svs817+Nz+DmiMQKTO3ubZ/6IaTpq3TjupRn3Eqk=
github.com/aws/aws-sdk-go v1.50.33 h1:/SKPJ7ZVPCFOYZyTKo5YdjeUEeOn2J2M0qfDTXWAoEU=
github.com/aws/aws-sdk-go v1.50.33/go.mod h1:LF8svs817+Nz+DmiMQKTO3ubZ/6IaTpq3TjupRn3Eqk=
github.com/aws/aws-sdk-go-v2 v1.26.0 h1:/Ce4OCiM3EkpW7Y+xUnfAFpchU78K7/Ug01sZni9PgA=
github.com/aws/aws-sdk-go-v2 v1.26.0/go.mod h1:35hUlJVYd+M++iLI3ALmVwMOyRYMmRqUXpTtRGW+K9I=
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.1 h1:gTK2uhtAPtFcdRRJilZPx8uJLL2J85xK11nKtWL0wfU=
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.1/go.mod h1:sxpLb+nZk7tIfCWChfd+h4QwHNUR57d8hA1cleTkjJo=
github.com/aws/aws-sdk-go-v2/config v1.27.9 h1:gRx/NwpNEFSk+yQlgmk1bmxxvQ5TyJ76CWXs9XScTqg=
github.com/aws/aws-sdk-go-v2/config v1.27.9/go.mod h1:dK1FQfpwpql83kbD873E9vz4FyAxuJtR22wzoXn3qq0=
github.com/aws/aws-sdk-go-v2/credentials v1.17.9 h1:N8s0/7yW+h8qR8WaRlPQeJ6czVMNQVNtNdUqf6cItao=
github.com/aws/aws-sdk-go-v2/credentials v1.17.9/go.mod h1:446YhIdmSV0Jf/SLafGZalQo+xr2iw7/fzXGDPTU1yQ=
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.0 h1:af5YzcLf80tv4Em4jWVD75lpnOHSBkPUZxZfGkrI3HI=
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.0/go.mod h1:nQ3how7DMnFMWiU1SpECohgC82fpn4cKZ875NDMmwtA=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.4 h1:0ScVK/4qZ8CIW0k8jOeFVsyS/sAiXpYxRBLolMkuLQM=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.4/go.mod h1:84KyjNZdHC6QZW08nfHI6yZgPd+qRgaWcYsyLUo3QY8=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.4 h1:sHmMWWX5E7guWEFQ9SVo6A3S4xpPrWnd77a6y4WM6PU=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.4/go.mod h1:WjpDrhWisWOIoS9n3nk67A3Ll1vfULJ9Kq6h29HTD48=
github.com/aws/aws-sdk-go-v2/internal/ini v1.8.0 h1:hT8rVHwugYE2lEfdFE0QWVo81lF7jMrYJVDWI+f+VxU=
github.com/aws/aws-sdk-go-v2/internal/ini v1.8.0/go.mod h1:8tu/lYfQfFe6IGnaOdrpVgEL2IrrDOf6/m9RQum4NkY=
github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.4 h1:SIkD6T4zGQ+1YIit22wi37CGNkrE7mXV1vNA5VpI3TI=
github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.4/go.mod h1:XfeqbsG0HNedNs0GT+ju4Bs+pFAwsrlzcRdMvdNVf5s=
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.11.1 h1:EyBZibRTVAs6ECHZOw5/wlylS9OcTzwyjeQMudmREjE=
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.11.1/go.mod h1:JKpmtYhhPs7D97NL/ltqz7yCkERFW5dOlHyVl66ZYF8=
github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.3.6 h1:NkHCgg0Ck86c5PTOzBZ0JRccI51suJDg5lgFtxBu1ek=
github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.3.6/go.mod h1:mjTpxjC8v4SeINTngrnKFgm2QUi+Jm+etTbCxh8W4uU=
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.6 h1:b+E7zIUHMmcB4Dckjpkapoy47W6C9QBv/zoUP+Hn8Kc=
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.6/go.mod h1:S2fNV0rxrP78NhPbCZeQgY8H9jdDMeGtwcfZIRxzBqU=
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.17.4 h1:uDj2K47EM1reAYU9jVlQ1M5YENI1u6a/TxJpf6AeOLA=
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.17.4/go.mod h1:XKCODf4RKHppc96c2EZBGV/oCUC7OClxAo2MEyg4pIk=
github.com/aws/aws-sdk-go-v2/service/s3 v1.53.0 h1:r3o2YsgW9zRcIP3Q0WCmttFVhTuugeKIvT5z9xDspc0=
github.com/aws/aws-sdk-go-v2/service/s3 v1.53.0/go.mod h1:w2E4f8PUfNtyjfL6Iu+mWI96FGttE03z3UdNcUEC4tA=
github.com/aws/aws-sdk-go-v2/service/sso v1.20.3 h1:mnbuWHOcM70/OFUlZZ5rcdfA8PflGXXiefU/O+1S3+8=
github.com/aws/aws-sdk-go-v2/service/sso v1.20.3/go.mod h1:5HFu51Elk+4oRBZVxmHrSds5jFXmFj8C3w7DVF2gnrs=
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.23.3 h1:uLq0BKatTmDzWa/Nu4WO0M1AaQDaPpwTKAeByEc6WFM=
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.23.3/go.mod h1:b+qdhjnxj8GSR6t5YfphOffeoQSQ1KmpoVVuBn+PWxs=
github.com/aws/aws-sdk-go-v2/service/sts v1.28.5 h1:J/PpTf/hllOjx8Xu9DMflff3FajfLxqM5+tepvVXmxg=
github.com/aws/aws-sdk-go-v2/service/sts v1.28.5/go.mod h1:0ih0Z83YDH/QeQ6Ori2yGE2XvWYv/Xm+cZc01LC6oK0=
github.com/aws/aws-sigv4-auth-cassandra-gocql-driver-plugin v1.1.0 h1:EJsHUYgFBV7/N1YtL73lsfZODAOU+CnNSZfEAlqqQaA=
github.com/aws/aws-sigv4-auth-cassandra-gocql-driver-plugin v1.1.0/go.mod h1:AxKuXHc0zv2yYaeueUG7R3ONbcnQIuDj0bkdFmPVRzU=
github.com/aws/smithy-go v1.20.1 h1:4SZlSlMr36UEqC7XOyRVb27XMeZubNcBNN+9IgEPIQw=
github.com/aws/smithy-go v1.20.1/go.mod h1:krry+ya/rV9RDcV/Q16kpu6ypI4K2czasz0NC3qS14E=
github.com/benbjohnson/clock v1.1.0 h1:Q92kusRqC1XV2MjkWETPvjJVqKetz1OzxZB7mHJLju8=
github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
github.com/bitly/go-hostpool v0.0.0-20171023180738-a3a6125de932 h1:mXoPYz/Ul5HYEDvkta6I8/rnYM5gSdSV2tJ6XbZuEtY=
Expand Down
Loading

0 comments on commit a49e8d7

Please sign in to comment.