Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stream events directly to GCS #560

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion modules/cloudevent-recorder/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ No requirements.
| <a name="input_cloud_storage_config_max_duration"></a> [cloud\_storage\_config\_max\_duration](#input\_cloud\_storage\_config\_max\_duration) | The maximum duration that can elapse before a new Cloud Storage file is created. Min 1 minute, max 10 minutes, default 5 minutes. | `number` | `300` | no |
| <a name="input_deletion_protection"></a> [deletion\_protection](#input\_deletion\_protection) | Whether to enable deletion protection on data resources. | `bool` | `true` | no |
| <a name="input_enable_profiler"></a> [enable\_profiler](#input\_enable\_profiler) | Enable cloud profiler. | `bool` | `false` | no |
| <a name="input_flush_interval"></a> [flush\_interval](#input\_flush\_interval) | Flush interval for logrotate, as a duration string. | `string` | `""` | no |
| <a name="input_flush_interval"></a> [flush\_interval](#input\_flush\_interval) | Flush interval for logrotate, as a duration string. | `string` | `"3m"` | no |
| <a name="input_ignore_unknown_values"></a> [ignore\_unknown\_values](#input\_ignore\_unknown\_values) | Whether to ignore unknown values in the data, when transferring data to BigQuery. | `bool` | `false` | no |
| <a name="input_limits"></a> [limits](#input\_limits) | Resource limits for the regional go service. | <pre>object({<br> cpu = string<br> memory = string<br> })</pre> | `null` | no |
| <a name="input_location"></a> [location](#input\_location) | The location to create the BigQuery dataset in, and in which to run the data transfer jobs from GCS. | `string` | `"US"` | no |
Expand Down
36 changes: 0 additions & 36 deletions modules/cloudevent-recorder/cmd/logrotate/main.go

This file was deleted.

87 changes: 77 additions & 10 deletions modules/cloudevent-recorder/cmd/recorder/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,21 +10,29 @@ import (
"os"
"os/signal"
"path/filepath"
"strconv"
"sync"
"syscall"
"time"

cloudevents "github.com/cloudevents/sdk-go/v2"
"github.com/sethvargo/go-envconfig"
"gocloud.dev/blob"

"github.com/chainguard-dev/clog"
_ "github.com/chainguard-dev/clog/gcp/init"
"github.com/chainguard-dev/terraform-infra-common/pkg/httpmetrics"
mce "github.com/chainguard-dev/terraform-infra-common/pkg/httpmetrics/cloudevents"
"github.com/chainguard-dev/terraform-infra-common/pkg/profiler"

// Add gcsblob support that we need to support gs:// prefixes
_ "gocloud.dev/blob/gcsblob"
)

var env = envconfig.MustProcess(context.Background(), &struct {
Port int `env:"PORT, default=8080"`
LogPath string `env:"LOG_PATH, required"`
Port int `env:"PORT, default=8080"`
FlushInterval time.Duration `env:"FLUSH_INTERVAL, default=3m"`
Bucket string `env:"BUCKET, required"`
}{})

func main() {
Expand All @@ -40,20 +48,79 @@ func main() {
if err != nil {
clog.Fatalf("failed to create event client, %v", err)
}

bucket, err := blob.OpenBucket(ctx, env.Bucket)
if err != nil {
clog.Fatalf("failed to open bucket, %v", err)
}
defer bucket.Close()

var m sync.Mutex
writers := make(map[string]*blob.Writer, 10)

// Periodically flush the writers to commit the data to the bucket.
go func() {
done := false
for {
writersToDrain := func() map[string]*blob.Writer {
m.Lock()
defer m.Unlock()
// Swap the writers map so we can safely iterate and close the writers.
writersToDrain := writers
writers = make(map[string]*blob.Writer, 10)
return writersToDrain
}()

for t, w := range writersToDrain {
clog.Infof("Flushing writer[%s]", t)
if err := w.Close(); err != nil {
clog.Errorf("failed to close writer[%s]: %v", t, err)
}
}

if done {
clog.InfoContextf(ctx, "Exiting flush loop")
return
}
select {
case <-time.After(env.FlushInterval):
case <-ctx.Done():
clog.InfoContext(ctx, "Flushing one more time")
done = true
}
}
}()

// Listen for events and as they come in write them to the appropriate
// writer based on event type.
if err := c.StartReceiver(ctx, func(_ context.Context, event cloudevents.Event) error {
dir := filepath.Join(env.LogPath, event.Type())
if err := os.MkdirAll(dir, 0755); err != nil {
writer, err := func() (*blob.Writer, error) {
m.Lock()
defer m.Unlock()

w, ok := writers[event.Type()]
if !ok {
w, err = bucket.NewWriter(ctx, filepath.Join(event.Type(), strconv.FormatInt(time.Now().UnixNano(), 10)), nil)
if err != nil {
clog.Errorf("failed to create writer: %v", err)
return nil, err
}
}
writers[event.Type()] = w
return w, nil
}()
if err != nil {
clog.Errorf("failed to create writer: %v", err)
return err
}

filename := filepath.Join(dir, event.ID())
if err := os.WriteFile(filename, event.Data(), 0600); err != nil {
clog.Warnf("failed to write file %s; %v", filename, err)
if err := os.RemoveAll(filename); err != nil {
clog.Warnf("failed to remove failed write file: %s; %v", filename, err)
}
// Write the event data as a line to the writer.
line := string(event.Data())
if _, err := writer.Write([]byte(line + "\n")); err != nil {
clog.Errorf("failed to write event data: %v", err)
return err
}

return nil
}); err != nil {
clog.Fatalf("failed to start event receiver, %v", err)
Expand Down
43 changes: 7 additions & 36 deletions modules/cloudevent-recorder/recorder.tf
Original file line number Diff line number Diff line change
Expand Up @@ -20,25 +20,15 @@ resource "google_storage_bucket_iam_binding" "recorder-writes-to-gcs-buckets" {
members = ["serviceAccount:${google_service_account.recorder.email}"]
}

locals {
lenv = [{
name = "LOG_PATH"
value = "/logs"
}]

logrotate_env = var.flush_interval == "" ? local.lenv : concat(local.lenv, [{
name = "FLUSH_INTERVAL"
value = var.flush_interval
}])
}

module "this" {
count = var.method == "trigger" ? 1 : 0
source = "../regional-go-service"
project_id = var.project_id
name = var.name
regions = var.regions

deletion_protection = var.deletion_protection

service_account = google_service_account.recorder.email
containers = {
"recorder" = {
Expand All @@ -48,37 +38,18 @@ module "this" {
}
ports = [{ container_port = 8080 }]
env = [{
name = "LOG_PATH"
value = "/logs"
}]
volume_mounts = [{
name = "logs"
mount_path = "/logs"
name = "FLUSH_INTERVAL"
value = var.flush_interval
}]
resources = {
limits = var.limits
}
}
"logrotate" = {
source = {
working_dir = path.module
importpath = "./cmd/logrotate"
}
env = local.logrotate_env
regional-env = [{
name = "BUCKET"
value = { for k, v in google_storage_bucket.recorder : k => v.url }
}]
volume_mounts = [{
name = "logs"
mount_path = "/logs"
}]
resources = {
limits = var.limits
}
}
}
volumes = [{
name = "logs"
empty_dir = {}
}]

scaling = var.scaling

Expand Down
2 changes: 1 addition & 1 deletion modules/cloudevent-recorder/variables.tf
Original file line number Diff line number Diff line change
Expand Up @@ -141,5 +141,5 @@ variable "split_triggers" {
variable "flush_interval" {
description = "Flush interval for logrotate, as a duration string."
type = string
default = ""
default = "3m"
}
Loading
Loading