diff --git a/conduit/pipeline/pipeline.go b/conduit/pipeline/pipeline.go index b77529d4..f0266f6f 100644 --- a/conduit/pipeline/pipeline.go +++ b/conduit/pipeline/pipeline.go @@ -3,6 +3,7 @@ package pipeline import ( "context" "encoding/base64" + "errors" "fmt" "net/http" "os" @@ -41,13 +42,13 @@ type Pipeline interface { type pipelineImpl struct { ctx context.Context - cf context.CancelFunc + ccf context.CancelCauseFunc wg sync.WaitGroup cfg *data.Config logger *log.Logger profFile *os.File err error - mu sync.RWMutex + errMu sync.RWMutex initProvider *data.InitProvider @@ -59,16 +60,28 @@ type pipelineImpl struct { pipelineMetadata state } +var ( + errStopCause = errors.New("pipeline stopped") + errImporterCause = errors.New("importer cancelled") + errProcessorCause = errors.New("processor cancelled") + errExporterCause = errors.New("exporter cancelled") +) + func (p *pipelineImpl) Error() error { - p.mu.RLock() - defer p.mu.RUnlock() + p.errMu.RLock() + defer p.errMu.RUnlock() return p.err } -func (p *pipelineImpl) setError(err error) { - p.mu.Lock() - defer p.mu.Unlock() - p.err = err +func (p *pipelineImpl) joinError(err error) { + p.errMu.Lock() + defer p.errMu.Unlock() + p.err = errors.Join(p.err, err) +} + +func (p *pipelineImpl) cancel(err error) { + p.ccf(err) + p.joinError(err) } func (p *pipelineImpl) registerLifecycleCallbacks() { @@ -372,7 +385,7 @@ func (p *pipelineImpl) Init() error { } func (p *pipelineImpl) Stop() { - p.cf() + p.ccf(errStopCause) p.wg.Wait() if p.profFile != nil { @@ -431,8 +444,225 @@ func addMetrics(block data.BlockData, importTime time.Duration) { metrics.ImportedTxnsPerBlock.Observe(float64(len(block.Payset)) + float64(innerTxn)) } +func (p *pipelineImpl) ImportHandler(importer importers.Importer, roundChan <-chan uint64, blkOutChan chan<- data.BlockData) { + p.wg.Add(1) + go func() { + defer p.wg.Done() + lastRnd := uint64(0) + totalSelectWait := time.Duration(0) + totalFeedWait := time.Duration(0) + defer func() { + p.logger.Debugf("importer handler exiting. lastRnd=%d totalSelectWait=%dms; totalFeedWait=%dms", lastRnd, totalSelectWait.Milliseconds(), totalFeedWait.Milliseconds()) + }() + + for { + selectStart := time.Now() + select { + case <-p.ctx.Done(): + p.logger.Infof("importer handler exiting. lastRnd=%d", lastRnd) + return + case rnd := <-roundChan: + lastRnd = rnd + waitTime := time.Since(selectStart) + totalSelectWait += waitTime + p.logger.Tracef("importer handler @ round %d received %s", rnd, waitTime) + + blkDataPtr, importTime, lastError := Retries(importer.GetBlock, rnd, p, importer.Metadata().Name) + if lastError != nil { + p.cancel(fmt.Errorf("importer %s handler (%w): failed to import round %d after %dms: %w", importer.Metadata().Name, errImporterCause, rnd, importTime.Milliseconds(), lastError)) + return + } + if blkDataPtr == nil { + p.cancel(fmt.Errorf("importer %s handler (%w): receieved nil blkDataPtr. this should never happen! failed to import round", importer.Metadata().Name, errImporterCause)) + return + } + metrics.ImporterTimeSeconds.Observe(importTime.Seconds()) + + feedStart := time.Now() + select { + case <-p.ctx.Done(): + return + case blkOutChan <- *blkDataPtr: + waitTime := time.Since(feedStart) + totalFeedWait += waitTime + p.logger.Tracef("imported round %d into blkOutChan after waiting %dms on channel", rnd, waitTime.Milliseconds()) + } + + } + } + }() +} + +func (p *pipelineImpl) ProcessorHandler(idx int, proc processors.Processor, blkInChan <-chan data.BlockData, blkOutChan chan<- data.BlockData) { + p.wg.Add(1) + go func() { + defer p.wg.Done() + var lastRnd uint64 + totalSelectWait := time.Duration(0) + totalFeedWait := time.Duration(0) + defer func() { + p.logger.Debugf("processor[%d] %s handler exiting. lastRnd=%d totalSelectWait=%dms, totalFeedWait=%dms", idx, proc.Metadata().Name, lastRnd, totalSelectWait.Milliseconds(), totalFeedWait.Milliseconds()) + }() + + for { + selectStart := time.Now() + select { + case <-p.ctx.Done(): + p.logger.Infof("processor[%d] %s handler exiting lastRnd=%d", idx, proc.Metadata().Name, lastRnd) + return + case blkData := <-blkInChan: + waitTime := time.Since(selectStart) + totalSelectWait += waitTime + p.logger.Tracef("processor[%d] %s handler received block data for round %d after wait of %s", idx, proc.Metadata().Name, lastRnd, waitTime) + lastRnd = blkData.Round() + + blkDataPtr, procTime, lastError := Retries(proc.Process, blkData, p, proc.Metadata().Name) + if lastError != nil { + p.cancel(fmt.Errorf("processor[%d] %s handler (%w): failed to process round %d after %dms: %w", idx, proc.Metadata().Name, errProcessorCause, lastRnd, procTime.Milliseconds(), lastError)) + return + } + if blkDataPtr == nil { + p.cancel(fmt.Errorf("processor[%d] %s handler (%w): receieved nil blkDataPtr. this should never happen! failed to process round %d", idx, proc.Metadata().Name, errProcessorCause, lastRnd)) + return + } + metrics.ProcessorTimeSeconds.WithLabelValues(proc.Metadata().Name).Observe(procTime.Seconds()) + + selectStart := time.Now() + select { + case <-p.ctx.Done(): + return + case blkOutChan <- blkData: + waitTime := time.Since(selectStart) + totalFeedWait += waitTime + p.logger.Tracef("processor[%d] %s for round %d into blkOutChan after waiting %dms", idx, proc.Metadata().Name, lastRnd, totalFeedWait.Milliseconds()) + } + } + } + }() +} + +// ExporterHandler handles the exporter's Receive method, updating the metadata's NextRound, +// saving the metadata, and invoking the activated plugin callbacks. If the context's cancellation is received +// before the exporter's Receive method has succeeded, cancel immediately. +// However, after the exporter's Receive() method has succeeded we try every component at least once +// but will abort if a failure occurs at when a cancellation is received. +func (p *pipelineImpl) ExporterHandler(exporter exporters.Exporter, blkChan <-chan data.BlockData) { + p.wg.Add(1) + go func() { + defer p.wg.Done() + var lastError error + lastRnd := uint64(0) + totalSelectWait := time.Duration(0) + totalExportWork := time.Duration(0) + + defer func() { + p.logger.Debugf("exporter %s handler exiting. lastRnd=%d totalSelectWait=%dms, totalExportWork=%dms", exporter.Metadata().Name, lastRnd, totalSelectWait.Milliseconds(), totalExportWork.Milliseconds()) + }() + + defer func() { + if lastError != nil { + err := fmt.Errorf("exporter %s handler (%w) after round %d: %w", exporter.Metadata().Name, errExporterCause, lastRnd, lastError) + p.cancel(err) + p.logger.Error(err) + } + }() + + for { + p.logger.Tracef("exporter handler waiting for block data. round=%d", lastRnd) + selectStart := time.Now() + select { + case <-p.ctx.Done(): + p.logger.Infof("exporter handler exiting lastRnd=%d", lastRnd) + return + case blkData := <-blkChan: + waitTime := time.Since(selectStart) + totalSelectWait += waitTime + p.logger.Tracef("exporter handler received block data for round %d after wait of %s", lastRnd, waitTime) + lastRnd = blkData.Round() + + // TODO: the next level of concurrency will require an atomic NextRound counter + if p.pipelineMetadata.NextRound != lastRnd { + lastError = fmt.Errorf("aborting after out of order block data. %d != %d", p.pipelineMetadata.NextRound, lastRnd) + return + } + + var exportTime time.Duration + exportTime, lastError = RetriesNoOutput(exporter.Receive, blkData, p, exporter.Metadata().Name) + if lastError != nil { + lastError = fmt.Errorf("aborting after failing to export round %d: %w", lastRnd, lastError) + return + } + metrics.ExporterTimeSeconds.Observe(exportTime.Seconds()) + + // Increment Round, update metadata + p.pipelineMetadata.NextRound = lastRnd + 1 + _, lastError = RetriesNoOutput(p.pipelineMetadata.encodeToFile, p.cfg.ConduitArgs.ConduitDataDir, p, "pipelineMetadata save") + if lastError != nil { + lastError = fmt.Errorf("aborting after updating NextRound=%d and failing to save metadata: %w", lastRnd+1, lastError) + return + } + p.logger.Tracef("exporter %s @ round %d saved pipeline metadata", exporter.Metadata().Name, p.pipelineMetadata.NextRound) + + for i, cb := range p.completeCallback { + p.logger.Tracef("exporter %s @ round=%d NextRound=%d executing callback %d", exporter.Metadata().Name, lastRnd, p.pipelineMetadata.NextRound, i) + _, lastError = RetriesNoOutput(cb, blkData, p, fmt.Sprintf("callback %d", i)) + if lastError != nil { + lastError = fmt.Errorf("aborting due to failed callback %d: %w", i, lastError) + return + } + } + lastError = nil + } + } + }() +} + // Start pushes block data through the pipeline func (p *pipelineImpl) Start() { + p.logger.Trace("Pipeline.Start()") + + // Setup channels + roundChan := make(chan uint64) + processorBlkInChan := make(chan data.BlockData) + p.ImportHandler(p.importer, roundChan, processorBlkInChan) + + var processorBlkOutChan chan data.BlockData + for i, proc := range p.processors { + processorBlkOutChan = make(chan data.BlockData) + p.ProcessorHandler(i, proc, processorBlkInChan, processorBlkOutChan) + processorBlkInChan = processorBlkOutChan + } + + p.ExporterHandler(p.exporter, processorBlkOutChan) + + p.wg.Add(1) + // Main loop + go func(startRound uint64) { + defer p.wg.Done() + rnd := startRound + totalFeedWait := time.Duration(0) + defer p.logger.Debugf("round channel feed exiting. lastRnd=%d totalFeedWait=%dms", rnd, totalFeedWait.Milliseconds()) + for { + selectStart := time.Now() + p.logger.Tracef("pushing round %d into roundChan", rnd) + select { + case <-p.ctx.Done(): + p.logger.Infof("round channel feed exiting. lastRnd=%d", rnd) + return + case roundChan <- rnd: + waitTime := time.Since(selectStart) + totalFeedWait += waitTime + rnd++ + } + } + }(p.pipelineMetadata.NextRound) + + <-p.ctx.Done() + // TODO: send a prometheus observation based on context.Cause(ctx) +} + +// Start pushes block data through the pipeline +func (p *pipelineImpl) OStart() { p.wg.Add(1) retry := uint64(0) go func() { @@ -463,7 +693,7 @@ func (p *pipelineImpl) Start() { blkData, err := p.importer.GetBlock(p.pipelineMetadata.NextRound) if err != nil { p.logger.Errorf("%v", err) - p.setError(err) + p.joinError(err) retry++ goto pipelineRun } @@ -480,7 +710,7 @@ func (p *pipelineImpl) Start() { blkData, err = proc.Process(blkData) if err != nil { p.logger.Errorf("%v", err) - p.setError(err) + p.joinError(err) retry++ goto pipelineRun } @@ -491,7 +721,7 @@ func (p *pipelineImpl) Start() { err = p.exporter.Receive(blkData) if err != nil { p.logger.Errorf("%v", err) - p.setError(err) + p.joinError(err) retry++ goto pipelineRun } @@ -509,7 +739,7 @@ func (p *pipelineImpl) Start() { err = cb(blkData) if err != nil { p.logger.Errorf("%v", err) - p.setError(err) + p.joinError(err) retry++ goto pipelineRun } @@ -519,7 +749,7 @@ func (p *pipelineImpl) Start() { if p.pipelineMetadata.NextRound > 1 { addMetrics(blkData, time.Since(start)) } - p.setError(nil) + p.joinError(nil) retry = 0 } } @@ -554,11 +784,11 @@ func MakePipeline(ctx context.Context, cfg *data.Config, logger *log.Logger) (Pi return nil, fmt.Errorf("MakePipeline(): logger was empty") } - cancelContext, cancelFunc := context.WithCancel(ctx) + cancelContext, cancelCauseFunc := context.WithCancelCause(ctx) pipeline := &pipelineImpl{ ctx: cancelContext, - cf: cancelFunc, + ccf: cancelCauseFunc, cfg: cfg, logger: logger, initProvider: nil,