Skip to content

Commit

Permalink
Remove async ingestion option
Browse files Browse the repository at this point in the history
  • Loading branch information
dejanb committed Sep 7, 2023
1 parent 847c504 commit 5c46c7f
Show file tree
Hide file tree
Showing 4 changed files with 5 additions and 58 deletions.
57 changes: 2 additions & 55 deletions cmd/guacingest/cmd/ingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,17 @@ package cmd

import (
"context"
"encoding/json"
"fmt"
"os"
"os/signal"
"sync"
"syscall"

"github.com/guacsec/guac/pkg/assembler"
csub_client "github.com/guacsec/guac/pkg/collectsub/client"
"github.com/guacsec/guac/pkg/emitter"
"github.com/guacsec/guac/pkg/handler/processor"
"github.com/guacsec/guac/pkg/handler/processor/process"
"github.com/guacsec/guac/pkg/ingestor"
"github.com/guacsec/guac/pkg/ingestor/parser"
parser_common "github.com/guacsec/guac/pkg/ingestor/parser/common"
"github.com/guacsec/guac/pkg/logging"
"github.com/spf13/cobra"
"github.com/spf13/viper"
Expand All @@ -41,7 +37,6 @@ type options struct {
natsAddr string
csubAddr string
graphqlEndpoint string
async bool
}

func ingest(cmd *cobra.Command, args []string) {
Expand All @@ -50,7 +45,6 @@ func ingest(cmd *cobra.Command, args []string) {
viper.GetString("nats-addr"),
viper.GetString("csub-addr"),
viper.GetString("gql-addr"),
viper.GetBool("async-ingest"),
args)
if err != nil {
fmt.Printf("unable to validate flags: %v\n", err)
Expand Down Expand Up @@ -80,28 +74,7 @@ func ingest(cmd *cobra.Command, args []string) {
defer csubClient.Close()

emit := func(d *processor.Document) error {
if opts.async {
docTree, err := process.Process(ctx, d)
if err != nil {
logger.Error("[processor] failed process document: %v", err)
return nil
}

docTreeBytes, err := json.Marshal(docTree)
if err != nil {
return fmt.Errorf("failed marshal of document: %w", err)
}
err = emitter.Publish(ctx, emitter.SubjectNameDocProcessed, docTreeBytes)
if err != nil {
logger.Error("[processor] failed transportFunc: %v", err)
return nil
}

logger.Infof("[processor] docTree Processed: %+v", docTree.Document.SourceInformation)
return nil
} else {
return ingestor.Ingest(ctx, d, opts.graphqlEndpoint, csubClient)
}
return ingestor.Ingest(ctx, d, opts.graphqlEndpoint, csubClient)
}

// Assuming that publisher and consumer are different processes.
Expand All @@ -114,31 +87,6 @@ func ingest(cmd *cobra.Command, args []string) {
}
}()

if opts.async {
ingestorFunc := func(predicates []assembler.IngestPredicates, idstrings []*parser_common.IdentifierStrings) error {
collectSubEmitFunc := ingestor.GetCollectSubEmit(ctx, csubClient)
assemblerFunc := ingestor.GetAssembler(ctx, opts.graphqlEndpoint)

err := collectSubEmitFunc(idstrings)
if err != nil {
logger.Infof("unable to create entries in collectsub server, but continuing: %v", err)
}
err = assemblerFunc(predicates)
if err != nil {
return err
}
return nil
}

wg.Add(1)
go func() {
defer wg.Done()
if err := parser.Subscribe(ctx, ingestorFunc); err != nil {
logger.Errorf("parser ended with error: %v", err)
}
}()
}

logger.Infof("starting processor and parser")
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
Expand All @@ -149,12 +97,11 @@ func ingest(cmd *cobra.Command, args []string) {
wg.Wait()
}

func validateFlags(natsAddr string, csubAddr string, graphqlEndpoint string, async bool, args []string) (options, error) {
func validateFlags(natsAddr string, csubAddr string, graphqlEndpoint string, args []string) (options, error) {
var opts options
opts.natsAddr = natsAddr
opts.csubAddr = csubAddr
opts.graphqlEndpoint = graphqlEndpoint
opts.async = async

return opts, nil
}
2 changes: 1 addition & 1 deletion cmd/guacingest/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (
func init() {
cobra.OnInitialize(cli.InitConfig)

set, err := cli.BuildFlags([]string{"nats-addr", "csub-addr", "gql-addr", "async-ingest"})
set, err := cli.BuildFlags([]string{"nats-addr", "csub-addr", "gql-addr"})
if err != nil {
fmt.Fprintf(os.Stderr, "failed to setup flag: %v", err)
os.Exit(1)
Expand Down
2 changes: 1 addition & 1 deletion cmd/guacone/cmd/gcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ var gcsCmd = &cobra.Command{

if err != nil {
gotErr = true
return fmt.Errorf("unable to ingest document: %v", err)
return fmt.Errorf("unable to ingest document: %w", err)
}
return nil
}
Expand Down
2 changes: 1 addition & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ services:
guac-ingestor:
networks: [frontend]
image: $GUAC_IMAGE
command: "/opt/guac/guacingest --async-ingest=false"
command: "/opt/guac/guacingest"
working_dir: /guac
restart: on-failure
depends_on:
Expand Down

0 comments on commit 5c46c7f

Please sign in to comment.