Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pipelining followups #147

Merged
merged 33 commits into from
Aug 31, 2023
Merged
Show file tree
Hide file tree
Changes from 32 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
f2bc1a4
msgp in file plugins
Aug 18, 2023
18af031
lint
Aug 18, 2023
ce344d4
remove posErr
Aug 18, 2023
e41924b
Update conduit/plugins/exporters/filewriter/util.go
tzaffi Aug 18, 2023
b46628f
Update conduit/plugins/importers/filereader/test_resources/conduit_da…
tzaffi Aug 18, 2023
b7c82a1
gofmt
Aug 18, 2023
b1c2b0b
change file patterns in README's
Aug 18, 2023
5c06330
readmes
Aug 18, 2023
ae1ee48
Merge branch 'master' into file-plugins-msgp
Aug 21, 2023
d7fd948
per CR suggestion, comment out configuration not explicitly required …
Aug 22, 2023
1e71d1a
Merge remote-tracking branch 'origin/master' into file-plugins-msgp
Aug 22, 2023
497df82
per CR discussion: genesis is always `genesis.json`
Aug 22, 2023
fafc6d7
lint
Aug 22, 2023
e08d93a
per CR discussion: revert renaming/factoring of pipeline's makeConfig()
Aug 22, 2023
488f1e8
lint
Aug 22, 2023
e9af0bd
complete revert
Aug 22, 2023
873cc83
Update conduit/plugins/importers/filereader/fileReadWrite_test.go
tzaffi Aug 22, 2023
85714d1
trim the genesis - ridonculous
Aug 22, 2023
b18d85b
per CR: remove unneeded assignment
Aug 25, 2023
27b2168
Update conduit/plugins/exporters/filewriter/util.go
tzaffi Aug 25, 2023
fb7ea14
test defaults should actually test the defaults
Aug 25, 2023
14a64cd
typo
Aug 25, 2023
29ed381
privatize RetriesXYZ() + retriesNoInput()
Aug 29, 2023
78b8447
don't block inside of Start()
Aug 29, 2023
a4d7376
trim the special end of round log
Aug 29, 2023
f96dbc8
noop importer
Aug 29, 2023
d65afb4
test CLI for the health endpoint
Aug 29, 2023
f12f4e6
lint
Aug 29, 2023
4d0386a
gofmt
Aug 29, 2023
f38ed13
Update conduit/plugins/importers/noop/sample.yaml
tzaffi Aug 29, 2023
f68c929
Update pkg/cli/cli_test.go
tzaffi Aug 29, 2023
35c859c
Merge remote-tracking branch 'origin/master' into pipelining-followups
Aug 29, 2023
d635841
remove retriesNoInput() and its test
Aug 30, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 19 additions & 7 deletions conduit/pipeline/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (

log "github.com/sirupsen/logrus"

sdk "github.com/algorand/go-algorand-sdk/v2/types"

"github.com/algorand/conduit/conduit/data"
)

Expand All @@ -20,14 +22,14 @@ func HandlePanic(logger *log.Logger) {
type empty struct{}

type pluginInput interface {
uint64 | data.BlockData | string
uint64 | data.BlockData | string | empty
}

type pluginOutput interface {
pluginInput | empty
pluginInput | *sdk.Genesis
}

// Retries is a wrapper for retrying a function call f() with a cancellation context,
// retries is a wrapper for retrying a function call f() with a cancellation context,
// a delay and a max retry count. It attempts to call the wrapped function at least once
// and only after the first attempt will pay attention to a context cancellation.
// This can allow the pipeline to receive a cancellation and guarantee attempting to finish
Expand All @@ -45,7 +47,7 @@ type pluginOutput interface {
// - when p.cfg.retryCount > 0, the error will be a join of all the errors encountered during the retries
// - when p.cfg.retryCount == 0, the error will be the last error encountered
// - the returned duration dur is the total time spent in the function, including retries
func Retries[X pluginInput, Y pluginOutput](f func(x X) (Y, error), x X, p *pipelineImpl, msg string) (y Y, dur time.Duration, err error) {
func retries[X pluginInput, Y pluginOutput](f func(x X) (Y, error), x X, p *pipelineImpl, msg string) (y Y, dur time.Duration, err error) {
start := time.Now()

for i := uint64(0); p.cfg.RetryCount == 0 || i <= p.cfg.RetryCount; i++ {
Expand Down Expand Up @@ -74,9 +76,19 @@ func Retries[X pluginInput, Y pluginOutput](f func(x X) (Y, error), x X, p *pipe
return
}

// RetriesNoOutput applies the same logic as Retries, but for functions that return no output.
func RetriesNoOutput[X pluginInput](f func(x X) error, a X, p *pipelineImpl, msg string) (time.Duration, error) {
_, d, err := Retries(func(x X) (empty, error) {
// TODO: probly the following function and its unit test should be axed
// retriesNoInput applies the same logic as Retries, but for functions that take no input.
tzaffi marked this conversation as resolved.
Show resolved Hide resolved

//nolint:unused
func retriesNoInput[Y pluginOutput](f func() (Y, error), p *pipelineImpl, msg string) (Y, time.Duration, error) {
return retries(func(x empty) (Y, error) {
return f()
}, empty{}, p, msg)
}

// retriesNoOutput applies the same logic as Retries, but for functions that return no output.
func retriesNoOutput[X pluginInput](f func(x X) error, a X, p *pipelineImpl, msg string) (time.Duration, error) {
_, d, err := retries(func(x X) (empty, error) {
return empty{}, f(x)
}, a, p, msg)
return d, err
Expand Down
85 changes: 77 additions & 8 deletions conduit/pipeline/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,18 @@ func TestRetries(t *testing.T) {
}
}

succeedAfterFactoryNoInput := func(succeedAfter uint64, never bool) func() (uint64, error) {
tries := uint64(0)

return func() (uint64, error) {
if tries >= succeedAfter && !never {
return tries + 1, nil
}
tries++
return 0, fmt.Errorf("%w: tries=%d", errSentinelCause, tries-1)
}
}

cases := []struct {
name string
retryCount uint64
Expand Down Expand Up @@ -106,8 +118,8 @@ func TestRetries(t *testing.T) {
for _, tc := range cases {
tc := tc

// run cases for Retries()
t.Run("Retries() "+tc.name, func(t *testing.T) {
// run cases for retries()
t.Run("retries() "+tc.name, func(t *testing.T) {
t.Parallel()
ctx, ccf := context.WithCancelCause(context.Background())
p := &pipelineImpl{
Expand All @@ -127,7 +139,7 @@ func TestRetries(t *testing.T) {
yChan := make(chan uint64)
errChan := make(chan error)
go func() {
y, _, err := Retries(succeedAfter, 0, p, "test")
y, _, err := retries(succeedAfter, 0, p, "test")
yChan <- y
errChan <- err
}()
Expand All @@ -144,7 +156,7 @@ func TestRetries(t *testing.T) {
return
}

y, _, err := Retries(succeedAfter, 0, p, "test")
y, _, err := retries(succeedAfter, 0, p, "test")
if tc.retryCount == 0 { // WLOG tc.neverSucceed == false
require.NoError(t, err, tc.name)

Expand All @@ -163,8 +175,8 @@ func TestRetries(t *testing.T) {
}
})

// run cases for RetriesNoOutput()
t.Run("RetriesNoOutput() "+tc.name, func(t *testing.T) {
// run cases for retriesNoOutput()
t.Run("retriesNoOutput() "+tc.name, func(t *testing.T) {
t.Parallel()
ctx, ccf := context.WithCancelCause(context.Background())
p := &pipelineImpl{
Expand All @@ -183,7 +195,7 @@ func TestRetries(t *testing.T) {

errChan := make(chan error)
go func() {
_, err := RetriesNoOutput(succeedAfterNoOutput, 0, p, "test")
_, err := retriesNoOutput(succeedAfterNoOutput, 0, p, "test")
errChan <- err
}()
time.Sleep(5 * time.Millisecond)
Expand All @@ -197,7 +209,7 @@ func TestRetries(t *testing.T) {
return
}

_, err := RetriesNoOutput(succeedAfterNoOutput, 0, p, "test")
_, err := retriesNoOutput(succeedAfterNoOutput, 0, p, "test")
if tc.retryCount == 0 { // WLOG tc.neverSucceed == false
require.NoError(t, err, tc.name)
} else { // retryCount > 0 so doesn't retry forever
Expand All @@ -209,5 +221,62 @@ func TestRetries(t *testing.T) {
}
}
})

// run case for retriesNoInput()
t.Run("retriesNoInput() "+tc.name, func(t *testing.T) {
t.Parallel()
ctx, ccf := context.WithCancelCause(context.Background())
p := &pipelineImpl{
ctx: ctx,
ccf: ccf,
logger: log.New(),
cfg: &data.Config{
RetryCount: tc.retryCount,
RetryDelay: 1 * time.Millisecond,
},
}
succeedAfterNoInput := succeedAfterFactoryNoInput(tc.succeedAfter, tc.neverSucceed)

if tc.retryCount == 0 && tc.neverSucceed {
// avoid infinite loop by cancelling the context

errChan := make(chan error)
yChan := make(chan uint64)
go func() {
out, _, err := retriesNoInput(succeedAfterNoInput, p, "test")
yChan <- out
errChan <- err
}()
time.Sleep(5 * time.Millisecond)
errTestCancelled := errors.New("test cancelled")
go func() {
ccf(errTestCancelled)
}()
y := <-yChan
err := <-errChan
require.ErrorIs(t, err, errTestCancelled, tc.name)
require.ErrorIs(t, err, errSentinelCause, tc.name)
require.Zero(t, y, tc.name)
return
}

y, _, err := retriesNoInput(succeedAfterNoInput, p, "test")
if tc.retryCount == 0 { // WLOG tc.neverSucceed == false
require.NoError(t, err, tc.name)

// note we subtract 1 from y below because succeedAfter has added 1 to its output
require.Equal(t, tc.succeedAfter, y-1, tc.name)
} else { // retryCount > 0 so doesn't retry forever
if tc.neverSucceed || tc.succeedAfter > tc.retryCount {
require.ErrorContains(t, err, fmt.Sprintf("%d", tc.retryCount), tc.name)
require.ErrorIs(t, err, errSentinelCause, tc.name)
require.Zero(t, y, tc.name)
} else { // !tc.neverSucceed && succeedAfter <= retryCount
require.NoError(t, err, tc.name)
require.Equal(t, tc.succeedAfter, y-1, tc.name)
}
}
})

}
}
15 changes: 6 additions & 9 deletions conduit/pipeline/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,8 +131,8 @@
func (p *pipelineImpl) makeConfig(cfg data.NameConfigPair, pluginType plugins.PluginType) (*log.Logger, plugins.PluginConfig, error) {
configs, err := yaml.Marshal(cfg.Config)
if err != nil {
return nil, plugins.PluginConfig{}, fmt.Errorf("makeConfig(): could not serialize config: %w", err)
}

Check warning on line 135 in conduit/pipeline/pipeline.go

View check run for this annotation

Codecov / codecov/patch

conduit/pipeline/pipeline.go#L134-L135

Added lines #L134 - L135 were not covered by tests

lgr := log.New()
lgr.SetOutput(p.logger.Out)
Expand All @@ -145,7 +145,7 @@
config.DataDir = path.Join(p.cfg.ConduitArgs.ConduitDataDir, fmt.Sprintf("%s_%s", pluginType, cfg.Name))
err = os.MkdirAll(config.DataDir, os.ModePerm)
if err != nil {
return nil, plugins.PluginConfig{}, fmt.Errorf("makeConfig: unable to create plugin data directory: %w", err)

Check warning on line 148 in conduit/pipeline/pipeline.go

View check run for this annotation

Codecov / codecov/patch

conduit/pipeline/pipeline.go#L148

Added line #L148 was not covered by tests
}
}

Expand Down Expand Up @@ -196,8 +196,8 @@
for _, part := range parts {
_, config, err := p.makeConfig(part.cfg, part.t)
if err != nil {
return 0, err
}

Check warning on line 200 in conduit/pipeline/pipeline.go

View check run for this annotation

Codecov / codecov/patch

conduit/pipeline/pipeline.go#L199-L200

Added lines #L199 - L200 were not covered by tests
rnd, err := part.RoundRequest(config)
if err != nil {
return 0, err
Expand Down Expand Up @@ -227,16 +227,16 @@
telemetryConfig := telemetry.MakeTelemetryConfig(p.cfg.Telemetry.URI, p.cfg.Telemetry.Index, p.cfg.Telemetry.UserName, p.cfg.Telemetry.Password)
telemetryClient, err := telemetry.MakeOpenSearchClient(telemetryConfig)
if err != nil {
return nil, fmt.Errorf("failed to initialize telemetry: %w", err)
}

Check warning on line 231 in conduit/pipeline/pipeline.go

View check run for this annotation

Codecov / codecov/patch

conduit/pipeline/pipeline.go#L230-L231

Added lines #L230 - L231 were not covered by tests
p.logger.Infof("Telemetry initialized with URI: %s", telemetryConfig.URI)

// If GUID is not in metadata, save it. Otherwise, use the GUID from metadata.
if p.pipelineMetadata.TelemetryID == "" {
p.pipelineMetadata.TelemetryID = telemetryClient.TelemetryConfig.GUID
} else {
telemetryClient.TelemetryConfig.GUID = p.pipelineMetadata.TelemetryID
}

Check warning on line 239 in conduit/pipeline/pipeline.go

View check run for this annotation

Codecov / codecov/patch

conduit/pipeline/pipeline.go#L238-L239

Added lines #L238 - L239 were not covered by tests

return telemetryClient, nil
}
Expand All @@ -255,12 +255,12 @@
var err error
profFile, err := os.Create(p.cfg.CPUProfile)
if err != nil {
return fmt.Errorf("Pipeline.Init(): unable to create profile: %w", err)

Check warning on line 258 in conduit/pipeline/pipeline.go

View check run for this annotation

Codecov / codecov/patch

conduit/pipeline/pipeline.go#L258

Added line #L258 was not covered by tests
}
p.profFile = profFile
err = pprof.StartCPUProfile(profFile)
if err != nil {
return fmt.Errorf("Pipeline.Init(): unable to start pprof: %w", err)

Check warning on line 263 in conduit/pipeline/pipeline.go

View check run for this annotation

Codecov / codecov/patch

conduit/pipeline/pipeline.go#L263

Added line #L263 was not covered by tests
}
}

Expand Down Expand Up @@ -309,7 +309,7 @@
var telemetryErr error
telemetryClient, telemetryErr = p.initializeTelemetry()
if telemetryErr != nil {
p.logger.Warnf("Telemetry initialization failed, continuing without telemetry: %s", telemetryErr)

Check warning on line 312 in conduit/pipeline/pipeline.go

View check run for this annotation

Codecov / codecov/patch

conduit/pipeline/pipeline.go#L312

Added line #L312 was not covered by tests
} else {
// Try sending a startup event. If it fails, log a warning and continue
event := telemetryClient.MakeTelemetryStartupEvent()
Expand All @@ -327,16 +327,16 @@
{
importerLogger, pluginConfig, err := p.makeConfig(p.cfg.Importer, plugins.Importer)
if err != nil {
return fmt.Errorf("Pipeline.Init(): could not make %s config: %w", p.cfg.Importer.Name, err)
}

Check warning on line 331 in conduit/pipeline/pipeline.go

View check run for this annotation

Codecov / codecov/patch

conduit/pipeline/pipeline.go#L330-L331

Added lines #L330 - L331 were not covered by tests
err = p.importer.Init(p.ctx, *p.initProvider, pluginConfig, importerLogger)
if err != nil {
return fmt.Errorf("Pipeline.Init(): could not initialize importer (%s): %w", p.cfg.Importer.Name, err)
}

Check warning on line 335 in conduit/pipeline/pipeline.go

View check run for this annotation

Codecov / codecov/patch

conduit/pipeline/pipeline.go#L334-L335

Added lines #L334 - L335 were not covered by tests
genesis, err := p.importer.GetGenesis()
if err != nil {
return fmt.Errorf("Pipeline.GetGenesis(): could not obtain Genesis from the importer (%s): %w", p.cfg.Importer.Name, err)
}

Check warning on line 339 in conduit/pipeline/pipeline.go

View check run for this annotation

Codecov / codecov/patch

conduit/pipeline/pipeline.go#L338-L339

Added lines #L338 - L339 were not covered by tests
(*p.initProvider).SetGenesis(genesis)

// write pipeline metadata
Expand All @@ -349,8 +349,8 @@
p.pipelineMetadata.Network = genesis.Network
err = p.pipelineMetadata.encodeToFile(p.cfg.ConduitArgs.ConduitDataDir)
if err != nil {
return fmt.Errorf("Pipeline.Init() failed to write metadata to file: %w", err)
}

Check warning on line 353 in conduit/pipeline/pipeline.go

View check run for this annotation

Codecov / codecov/patch

conduit/pipeline/pipeline.go#L352-L353

Added lines #L352 - L353 were not covered by tests

p.logger.Infof("Initialized Importer: %s", p.cfg.Importer.Name)
}
Expand All @@ -360,11 +360,11 @@
ncPair := p.cfg.Processors[idx]
logger, config, err := p.makeConfig(ncPair, plugins.Processor)
if err != nil {
return fmt.Errorf("Pipeline.Init(): could not initialize processor (%s): %w", ncPair, err)

Check warning on line 363 in conduit/pipeline/pipeline.go

View check run for this annotation

Codecov / codecov/patch

conduit/pipeline/pipeline.go#L363

Added line #L363 was not covered by tests
}
err = processor.Init(p.ctx, *p.initProvider, config, logger)
if err != nil {
return fmt.Errorf("Pipeline.Init(): could not initialize processor (%s): %w", ncPair.Name, err)

Check warning on line 367 in conduit/pipeline/pipeline.go

View check run for this annotation

Codecov / codecov/patch

conduit/pipeline/pipeline.go#L367

Added line #L367 was not covered by tests
}
p.logger.Infof("Initialized Processor: %s", ncPair.Name)
}
Expand All @@ -373,12 +373,12 @@
{
logger, config, err := p.makeConfig(p.cfg.Exporter, plugins.Exporter)
if err != nil {
return fmt.Errorf("Pipeline.Init(): could not initialize processor (%s): %w", p.cfg.Exporter.Name, err)
}

Check warning on line 377 in conduit/pipeline/pipeline.go

View check run for this annotation

Codecov / codecov/patch

conduit/pipeline/pipeline.go#L376-L377

Added lines #L376 - L377 were not covered by tests
err = p.exporter.Init(p.ctx, *p.initProvider, config, logger)
if err != nil {
return fmt.Errorf("Pipeline.Init(): could not initialize Exporter (%s): %w", p.cfg.Exporter.Name, err)
}

Check warning on line 381 in conduit/pipeline/pipeline.go

View check run for this annotation

Codecov / codecov/patch

conduit/pipeline/pipeline.go#L380-L381

Added lines #L380 - L381 were not covered by tests
p.logger.Infof("Initialized Exporter: %s", p.cfg.Exporter.Name)
}

Expand Down Expand Up @@ -417,19 +417,19 @@

if err := p.importer.Close(); err != nil {
// Log and continue on closing the rest of the pipeline
p.logger.Errorf("Pipeline.Stop(): Importer (%s) error on close: %v", p.importer.Metadata().Name, err)

Check warning on line 420 in conduit/pipeline/pipeline.go

View check run for this annotation

Codecov / codecov/patch

conduit/pipeline/pipeline.go#L420

Added line #L420 was not covered by tests
}

for _, processor := range p.processors {
if err := processor.Close(); err != nil {

Check warning on line 424 in conduit/pipeline/pipeline.go

View check run for this annotation

Codecov / codecov/patch

conduit/pipeline/pipeline.go#L424

Added line #L424 was not covered by tests
// Log and continue on closing the rest of the pipeline
p.logger.Errorf("Pipeline.Stop(): Processor (%s) error on close: %v", processor.Metadata().Name, err)

Check warning on line 426 in conduit/pipeline/pipeline.go

View check run for this annotation

Codecov / codecov/patch

conduit/pipeline/pipeline.go#L426

Added line #L426 was not covered by tests
}
}

if err := p.exporter.Close(); err != nil {
p.logger.Errorf("Pipeline.Stop(): Exporter (%s) error on close: %v", p.exporter.Metadata().Name, err)
}

Check warning on line 432 in conduit/pipeline/pipeline.go

View check run for this annotation

Codecov / codecov/patch

conduit/pipeline/pipeline.go#L431-L432

Added lines #L431 - L432 were not covered by tests
}

func numInnerTxn(txn sdk.SignedTxnWithAD) int {
Expand Down Expand Up @@ -472,16 +472,16 @@
for {
startRound := time.Now()
select {
case <-p.ctx.Done():
p.logger.Infof("importer handler exiting. lastRnd=%d", lastRnd)
return

Check warning on line 477 in conduit/pipeline/pipeline.go

View check run for this annotation

Codecov / codecov/patch

conduit/pipeline/pipeline.go#L475-L477

Added lines #L475 - L477 were not covered by tests
case rnd := <-roundChan:
lastRnd = rnd
waitTime := time.Since(startRound)
totalSelectWait += waitTime
p.logger.Tracef("importer handler waited %dms to receive round %d", waitTime.Milliseconds(), rnd)

blkData, importTime, lastError := Retries(importer.GetBlock, rnd, p, importer.Metadata().Name)
blkData, importTime, lastError := retries(importer.GetBlock, rnd, p, importer.Metadata().Name)
if lastError != nil {
p.cancelWithProblem(fmt.Errorf("importer %s handler (%w): failed to import round %d after %dms: %w", importer.Metadata().Name, errImporterCause, rnd, importTime.Milliseconds(), lastError))
return
Expand Down Expand Up @@ -533,7 +533,7 @@

var procTime time.Duration
var lastError error
blk, procTime, lastError = Retries(proc.Process, blk, p, proc.Metadata().Name)
blk, procTime, lastError = retries(proc.Process, blk, p, proc.Metadata().Name)
if lastError != nil {
p.cancelWithProblem(fmt.Errorf("processor[%d] %s handler (%w): failed to process round %d after %dms: %w", idx, proc.Metadata().Name, errProcessorCause, lastRnd, procTime.Milliseconds(), lastError))
return
Expand All @@ -543,8 +543,8 @@
selectStart := time.Now()
// check context in case out channel is full.
select {
case <-p.ctx.Done():
return

Check warning on line 547 in conduit/pipeline/pipeline.go

View check run for this annotation

Codecov / codecov/patch

conduit/pipeline/pipeline.go#L546-L547

Added lines #L546 - L547 were not covered by tests
case blkOutChan <- blk:
waitTime := time.Since(selectStart)
totalFeedWait += waitTime
Expand Down Expand Up @@ -593,12 +593,12 @@
lastRound = blk.Round()

if p.pipelineMetadata.NextRound != lastRound {
lastError = fmt.Errorf("aborting after out of order block data. %d != %d", p.pipelineMetadata.NextRound, lastRound)
return
}

Check warning on line 598 in conduit/pipeline/pipeline.go

View check run for this annotation

Codecov / codecov/patch

conduit/pipeline/pipeline.go#L596-L598

Added lines #L596 - L598 were not covered by tests

var exportTime time.Duration
exportTime, lastError = RetriesNoOutput(exporter.Receive, blk, p, eName)
exportTime, lastError = retriesNoOutput(exporter.Receive, blk, p, eName)
if lastError != nil {
lastError = fmt.Errorf("aborting after failing to export round %d: %w", lastRound, lastError)
return
Expand Down Expand Up @@ -640,16 +640,15 @@
// WARNING: removing/re-log-levelling the following will BREAK:
// - the E2E test (Search for "Pipeline round" in subslurp.py)
// - the internal tools logstats collector (See func ConduitCollector in logstats.go of internal-tools repo)
p.logger.Infof(logstatsE2Elog(nextRound, lastRound, len(blk.Payset), exportTime))
p.logger.Infof(logstatsE2Elog(lastRound, len(blk.Payset), exportTime))
}
}
}()
}

func logstatsE2Elog(nextRound, lastRound uint64, topLevelTxnCount int, exportTime time.Duration) string {
func logstatsE2Elog(lastRound uint64, topLevelTxnCount int, exportTime time.Duration) string {
return fmt.Sprintf(
"UPDATED Pipeline NextRound=%d. FINISHED Pipeline round r=%d (%d txn) exported in %s",
nextRound,
"FINISHED Pipeline round r=%d (%d txn) exported in %s",
lastRound,
topLevelTxnCount,
exportTime,
Expand Down Expand Up @@ -696,8 +695,6 @@
}
}
}(p.pipelineMetadata.NextRound)

<-p.ctx.Done()
Comment on lines -699 to -700
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the health endpoint bugfix.

}

func (p *pipelineImpl) Wait() {
Expand Down Expand Up @@ -761,12 +758,12 @@
for _, processorConfig := range cfg.Processors {
processorName := processorConfig.Name

processorConstructor, err := processors.ProcessorConstructorByName(processorName)

Check warning on line 761 in conduit/pipeline/pipeline.go

View check run for this annotation

Codecov / codecov/patch

conduit/pipeline/pipeline.go#L761

Added line #L761 was not covered by tests
if err != nil {
return nil, fmt.Errorf("MakePipeline(): could not build processor '%s': %w", processorName, err)
}

pipeline.processors = append(pipeline.processors, processorConstructor.New())

Check warning on line 766 in conduit/pipeline/pipeline.go

View check run for this annotation

Codecov / codecov/patch

conduit/pipeline/pipeline.go#L766

Added line #L766 was not covered by tests
logger.Infof("Found Processor: %s", processorName)
}

Expand All @@ -791,22 +788,22 @@
logger.Infof("Creating PID file at: %s", pidFilePath)
fout, err := os.Create(pidFilePath)
if err != nil {
err = fmt.Errorf("%s: could not create pid file, %v", pidFilePath, err)
logger.Error(err)
return err
}

Check warning on line 794 in conduit/pipeline/pipeline.go

View check run for this annotation

Codecov / codecov/patch

conduit/pipeline/pipeline.go#L791-L794

Added lines #L791 - L794 were not covered by tests

if _, err = fmt.Fprintf(fout, "%d", os.Getpid()); err != nil {
err = fmt.Errorf("%s: could not write pid file, %v", pidFilePath, err)
logger.Error(err)
return err
}

Check warning on line 800 in conduit/pipeline/pipeline.go

View check run for this annotation

Codecov / codecov/patch

conduit/pipeline/pipeline.go#L797-L800

Added lines #L797 - L800 were not covered by tests

err = fout.Close()
if err != nil {
err = fmt.Errorf("%s: could not close pid file, %v", pidFilePath, err)
logger.Error(err)
return err
}

Check warning on line 807 in conduit/pipeline/pipeline.go

View check run for this annotation

Codecov / codecov/patch

conduit/pipeline/pipeline.go#L804-L807

Added lines #L804 - L807 were not covered by tests
return err
}
5 changes: 2 additions & 3 deletions conduit/pipeline/pipeline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -990,13 +990,12 @@ func TestMetrics(t *testing.T) {
}

func TestLogStatsE2Elog(t *testing.T) {
nextRound := uint64(1337)
round := uint64(42)
numTxns := 13
duration := 12345600 * time.Microsecond

expectedLog := "UPDATED Pipeline NextRound=1337. FINISHED Pipeline round r=42 (13 txn) exported in 12.3456s"
log := logstatsE2Elog(nextRound, round, numTxns, duration)
expectedLog := "FINISHED Pipeline round r=42 (13 txn) exported in 12.3456s"
log := logstatsE2Elog(round, numTxns, duration)
require.Equal(t, expectedLog, log)

logstatsRex, err := regexp.Compile(`round r=(\d+) \((\d+) txn\) exported in (.*)`)
Expand Down
1 change: 1 addition & 0 deletions conduit/plugins/importers/all/all.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@ import (
// Call package wide init function
_ "github.com/algorand/conduit/conduit/plugins/importers/algod"
_ "github.com/algorand/conduit/conduit/plugins/importers/filereader"
_ "github.com/algorand/conduit/conduit/plugins/importers/noop"
)
81 changes: 81 additions & 0 deletions conduit/plugins/importers/noop/noop_importer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package noop

import (
"context"
_ "embed" // used to embed config
"fmt"
"time"

"github.com/sirupsen/logrus"

sdk "github.com/algorand/go-algorand-sdk/v2/types"

"github.com/algorand/conduit/conduit/data"
"github.com/algorand/conduit/conduit/plugins"
"github.com/algorand/conduit/conduit/plugins/importers"
)

// PluginName to use when configuring.
var PluginName = "noop"

const sleepForGetBlock = 100 * time.Millisecond

// `noopImporter`s will function without ever erroring. This means they will also process out of order blocks
// which may or may not be desirable for different use cases--it can hide errors in actual importers expecting in order
// block processing.
// The `noopImporter` will maintain `Round` state according to the round of the last block it processed.
// It also sleeps 100 milliseconds between blocks to slow down the pipeline.
type noopImporter struct {
round uint64
cfg ImporterConfig
}

//go:embed sample.yaml
var sampleConfig string

var metadata = plugins.Metadata{
Name: PluginName,
Description: "noop importer",
Deprecated: false,
SampleConfig: sampleConfig,
}

func (imp *noopImporter) Metadata() plugins.Metadata {
return metadata
}

func (imp *noopImporter) Init(_ context.Context, _ data.InitProvider, cfg plugins.PluginConfig, _ *logrus.Logger) error {
if err := cfg.UnmarshalConfig(&imp.cfg); err != nil {
return fmt.Errorf("init failure in unmarshalConfig: %v", err)
}

Check warning on line 50 in conduit/plugins/importers/noop/noop_importer.go

View check run for this annotation

Codecov / codecov/patch

conduit/plugins/importers/noop/noop_importer.go#L49-L50

Added lines #L49 - L50 were not covered by tests
imp.round = imp.cfg.Round
return nil
}

func (imp *noopImporter) Close() error {
return nil
}

func (imp *noopImporter) GetGenesis() (*sdk.Genesis, error) {
return &sdk.Genesis{}, nil
}

func (imp *noopImporter) GetBlock(rnd uint64) (data.BlockData, error) {
time.Sleep(sleepForGetBlock)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why the sleep?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to slow down the pipeline. For example, if you have a no-op for your importer and a file-writer for your exporter this limits the output to 10 blocks per second which is a little easier to manage than without having any sort of brakes on. Another option is to make this configurable, but that didn't strike me as ideal either since I'd like the no-op importer to have no config. LMK.

imp.round = rnd
return data.BlockData{
BlockHeader: sdk.BlockHeader{
Round: sdk.Round(rnd),
},
}, nil
}

func (imp *noopImporter) Round() uint64 {
return imp.round

Check warning on line 74 in conduit/plugins/importers/noop/noop_importer.go

View check run for this annotation

Codecov / codecov/patch

conduit/plugins/importers/noop/noop_importer.go#L73-L74

Added lines #L73 - L74 were not covered by tests
}

func init() {
importers.Register(PluginName, importers.ImporterConstructorFunc(func() importers.Importer {
return &noopImporter{}
}))
}
7 changes: 7 additions & 0 deletions conduit/plugins/importers/noop/noop_importer_config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package noop

// ImporterConfig specific to the noop importer
type ImporterConfig struct {
// Optionally specify the round to start on
Round uint64 `yaml:"round"`
}
3 changes: 3 additions & 0 deletions conduit/plugins/importers/noop/sample.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
name: noop
# noop has no config
config:
2 changes: 1 addition & 1 deletion e2e_tests/src/e2e_conduit/subslurp.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
logger = logging.getLogger(__name__)

# Matches conduit log output:
# "UPDATED Pipeline NextRound=1337. FINISHED Pipeline round r=42 (13 txn) exported in 12.3456s"
# "FINISHED Pipeline round r=42 (13 txn) exported in 12.3456s"
FINISH_ROUND: re.Pattern = re.compile(b"FINISHED Pipeline round r=(\d+)")


Expand Down
3 changes: 3 additions & 0 deletions pkg/cli/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@

pCfg, err := data.MakePipelineConfig(args)
if err != nil {
return err
}

Check warning on line 52 in pkg/cli/cli.go

View check run for this annotation

Codecov / codecov/patch

pkg/cli/cli.go#L51-L52

Added lines #L51 - L52 were not covered by tests

// Initialize logger
level, err := log.ParseLevel(pCfg.LogLevel)
Expand All @@ -63,8 +63,8 @@

logger, err = loggers.MakeThreadSafeLogger(level, pCfg.LogFile)
if err != nil {
return fmt.Errorf("failed to create logger: %w", err)
}

Check warning on line 67 in pkg/cli/cli.go

View check run for this annotation

Codecov / codecov/patch

pkg/cli/cli.go#L66-L67

Added lines #L66 - L67 were not covered by tests

logger.Infof("Starting Conduit %s", version.LongVersion())
logger.Infof("Using data directory: %s", args.ConduitDataDir)
Expand Down Expand Up @@ -94,26 +94,29 @@

err = pline.Init()
if err != nil {
// Suppress log, it is about to be printed to stderr.
if pCfg.LogFile != "" {
logger.Error(err)
}
return fmt.Errorf("pipeline init error: %w", err)

Check warning on line 101 in pkg/cli/cli.go

View check run for this annotation

Codecov / codecov/patch

pkg/cli/cli.go#L97-L101

Added lines #L97 - L101 were not covered by tests
}
pline.Start()
defer pline.Stop()

// Start server
if pCfg.API.Address != "" {
logger.Infof("starting API server on %s", pCfg.API.Address)
shutdown, err := api.StartServer(logger, pline, pCfg.API.Address)
if err != nil {
// Suppress log, it is about to be printed to stderr.
if pCfg.LogFile != "" {
logger.Error(err)
}
return fmt.Errorf("failed to start API server: %w", err)

Check warning on line 115 in pkg/cli/cli.go

View check run for this annotation

Codecov / codecov/patch

pkg/cli/cli.go#L111-L115

Added lines #L111 - L115 were not covered by tests
}
defer shutdown(context.Background())
} else {
logger.Info("API server is disabled")
}

pline.Wait()
Expand All @@ -121,11 +124,11 @@
}

// MakeConduitCmdWithUtilities creates the main cobra command with all utilities
func MakeConduitCmdWithUtilities() *cobra.Command {
cmd := MakeConduitCmd()
cmd.AddCommand(initialize.InitCommand)
cmd.AddCommand(list.Command)
return cmd

Check warning on line 131 in pkg/cli/cli.go

View check run for this annotation

Codecov / codecov/patch

pkg/cli/cli.go#L127-L131

Added lines #L127 - L131 were not covered by tests
}

// MakeConduitCmd creates the main cobra command, initializes flags
Expand All @@ -147,17 +150,17 @@
Detailed documentation is online: https://github.com/algorand/conduit`,
Args: cobra.NoArgs,
Run: func(cmd *cobra.Command, args []string) {
err := runConduitCmdWithConfig(cfg)
if err != nil {
fmt.Fprintf(os.Stderr, "\nExiting with error:\t%s.\n", err)
os.Exit(1)
}

Check warning on line 157 in pkg/cli/cli.go

View check run for this annotation

Codecov / codecov/patch

pkg/cli/cli.go#L153-L157

Added lines #L153 - L157 were not covered by tests
},
PersistentPreRun: func(cmd *cobra.Command, args []string) {
if vFlag {
fmt.Printf("%s\n", version.LongVersion())
os.Exit(0)
}

Check warning on line 163 in pkg/cli/cli.go

View check run for this annotation

Codecov / codecov/patch

pkg/cli/cli.go#L159-L163

Added lines #L159 - L163 were not covered by tests
},
}
cmd.Flags().StringVarP(&cfg.ConduitDataDir, "data-dir", "d", "", "Set the Conduit data directory. If not set the CONDUIT_DATA_DIR environment variable is used.")
Expand Down
Loading
Loading