Skip to content

Commit

Permalink
basic logic in place
Browse files Browse the repository at this point in the history
  • Loading branch information
Zeph Grunschlag committed Jul 26, 2023
1 parent cf130e2 commit 66f1288
Showing 1 changed file with 246 additions and 16 deletions.
262 changes: 246 additions & 16 deletions conduit/pipeline/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package pipeline
import (
"context"
"encoding/base64"
"errors"
"fmt"
"net/http"
"os"
Expand Down Expand Up @@ -41,13 +42,13 @@ type Pipeline interface {

type pipelineImpl struct {
ctx context.Context
cf context.CancelFunc
ccf context.CancelCauseFunc
wg sync.WaitGroup
cfg *data.Config
logger *log.Logger
profFile *os.File
err error
mu sync.RWMutex
errMu sync.RWMutex

initProvider *data.InitProvider

Expand All @@ -59,16 +60,28 @@ type pipelineImpl struct {
pipelineMetadata state
}

var (
errStopCause = errors.New("pipeline stopped")
errImporterCause = errors.New("importer cancelled")
errProcessorCause = errors.New("processor cancelled")
errExporterCause = errors.New("exporter cancelled")
)

func (p *pipelineImpl) Error() error {
p.mu.RLock()
defer p.mu.RUnlock()
p.errMu.RLock()
defer p.errMu.RUnlock()
return p.err
}

func (p *pipelineImpl) setError(err error) {
p.mu.Lock()
defer p.mu.Unlock()
p.err = err
func (p *pipelineImpl) joinError(err error) {
p.errMu.Lock()
defer p.errMu.Unlock()
p.err = errors.Join(p.err, err)
}

func (p *pipelineImpl) cancel(err error) {
p.ccf(err)
p.joinError(err)
}

func (p *pipelineImpl) registerLifecycleCallbacks() {
Expand Down Expand Up @@ -372,7 +385,7 @@ func (p *pipelineImpl) Init() error {
}

func (p *pipelineImpl) Stop() {
p.cf()
p.ccf(errStopCause)
p.wg.Wait()

if p.profFile != nil {
Expand Down Expand Up @@ -431,8 +444,225 @@ func addMetrics(block data.BlockData, importTime time.Duration) {
metrics.ImportedTxnsPerBlock.Observe(float64(len(block.Payset)) + float64(innerTxn))
}

func (p *pipelineImpl) ImportHandler(importer importers.Importer, roundChan <-chan uint64, blkOutChan chan<- data.BlockData) {
p.wg.Add(1)
go func() {
defer p.wg.Done()
lastRnd := uint64(0)
totalSelectWait := time.Duration(0)
totalFeedWait := time.Duration(0)
defer func() {
p.logger.Debugf("importer handler exiting. lastRnd=%d totalSelectWait=%dms; totalFeedWait=%dms", lastRnd, totalSelectWait.Milliseconds(), totalFeedWait.Milliseconds())
}()

for {
selectStart := time.Now()
select {
case <-p.ctx.Done():
p.logger.Infof("importer handler exiting. lastRnd=%d", lastRnd)
return
case rnd := <-roundChan:
lastRnd = rnd
waitTime := time.Since(selectStart)
totalSelectWait += waitTime
p.logger.Tracef("importer handler @ round %d received %s", rnd, waitTime)

blkDataPtr, importTime, lastError := Retries(importer.GetBlock, rnd, p, importer.Metadata().Name)
if lastError != nil {
p.cancel(fmt.Errorf("importer %s handler (%w): failed to import round %d after %dms: %w", importer.Metadata().Name, errImporterCause, rnd, importTime.Milliseconds(), lastError))
return
}
if blkDataPtr == nil {
p.cancel(fmt.Errorf("importer %s handler (%w): receieved nil blkDataPtr. this should never happen! failed to import round", importer.Metadata().Name, errImporterCause))
return
}
metrics.ImporterTimeSeconds.Observe(importTime.Seconds())

feedStart := time.Now()
select {
case <-p.ctx.Done():
return
case blkOutChan <- *blkDataPtr:
waitTime := time.Since(feedStart)
totalFeedWait += waitTime
p.logger.Tracef("imported round %d into blkOutChan after waiting %dms on channel", rnd, waitTime.Milliseconds())
}

}
}
}()
}

func (p *pipelineImpl) ProcessorHandler(idx int, proc processors.Processor, blkInChan <-chan data.BlockData, blkOutChan chan<- data.BlockData) {
p.wg.Add(1)
go func() {
defer p.wg.Done()
var lastRnd uint64
totalSelectWait := time.Duration(0)
totalFeedWait := time.Duration(0)
defer func() {
p.logger.Debugf("processor[%d] %s handler exiting. lastRnd=%d totalSelectWait=%dms, totalFeedWait=%dms", idx, proc.Metadata().Name, lastRnd, totalSelectWait.Milliseconds(), totalFeedWait.Milliseconds())
}()

for {
selectStart := time.Now()
select {
case <-p.ctx.Done():
p.logger.Infof("processor[%d] %s handler exiting lastRnd=%d", idx, proc.Metadata().Name, lastRnd)
return
case blkData := <-blkInChan:
waitTime := time.Since(selectStart)
totalSelectWait += waitTime
p.logger.Tracef("processor[%d] %s handler received block data for round %d after wait of %s", idx, proc.Metadata().Name, lastRnd, waitTime)
lastRnd = blkData.Round()

blkDataPtr, procTime, lastError := Retries(proc.Process, blkData, p, proc.Metadata().Name)
if lastError != nil {
p.cancel(fmt.Errorf("processor[%d] %s handler (%w): failed to process round %d after %dms: %w", idx, proc.Metadata().Name, errProcessorCause, lastRnd, procTime.Milliseconds(), lastError))
return
}
if blkDataPtr == nil {
p.cancel(fmt.Errorf("processor[%d] %s handler (%w): receieved nil blkDataPtr. this should never happen! failed to process round %d", idx, proc.Metadata().Name, errProcessorCause, lastRnd))
return
}
metrics.ProcessorTimeSeconds.WithLabelValues(proc.Metadata().Name).Observe(procTime.Seconds())

selectStart := time.Now()
select {
case <-p.ctx.Done():
return
case blkOutChan <- blkData:
waitTime := time.Since(selectStart)
totalFeedWait += waitTime
p.logger.Tracef("processor[%d] %s for round %d into blkOutChan after waiting %dms", idx, proc.Metadata().Name, lastRnd, totalFeedWait.Milliseconds())
}
}
}
}()
}

// ExporterHandler handles the exporter's Receive method, updating the metadata's NextRound,
// saving the metadata, and invoking the activated plugin callbacks. If the context's cancellation is received
// before the exporter's Receive method has succeeded, cancel immediately.
// However, after the exporter's Receive() method has succeeded we try every component at least once
// but will abort if a failure occurs at when a cancellation is received.
func (p *pipelineImpl) ExporterHandler(exporter exporters.Exporter, blkChan <-chan data.BlockData) {
p.wg.Add(1)
go func() {
defer p.wg.Done()
var lastError error
lastRnd := uint64(0)
totalSelectWait := time.Duration(0)
totalExportWork := time.Duration(0)

defer func() {
p.logger.Debugf("exporter %s handler exiting. lastRnd=%d totalSelectWait=%dms, totalExportWork=%dms", exporter.Metadata().Name, lastRnd, totalSelectWait.Milliseconds(), totalExportWork.Milliseconds())
}()

defer func() {
if lastError != nil {
err := fmt.Errorf("exporter %s handler (%w) after round %d: %w", exporter.Metadata().Name, errExporterCause, lastRnd, lastError)
p.cancel(err)
p.logger.Error(err)
}
}()

for {
p.logger.Tracef("exporter handler waiting for block data. round=%d", lastRnd)
selectStart := time.Now()
select {
case <-p.ctx.Done():
p.logger.Infof("exporter handler exiting lastRnd=%d", lastRnd)
return
case blkData := <-blkChan:
waitTime := time.Since(selectStart)
totalSelectWait += waitTime
p.logger.Tracef("exporter handler received block data for round %d after wait of %s", lastRnd, waitTime)
lastRnd = blkData.Round()

// TODO: the next level of concurrency will require an atomic NextRound counter
if p.pipelineMetadata.NextRound != lastRnd {
lastError = fmt.Errorf("aborting after out of order block data. %d != %d", p.pipelineMetadata.NextRound, lastRnd)
return
}

var exportTime time.Duration
exportTime, lastError = RetriesNoOutput(exporter.Receive, blkData, p, exporter.Metadata().Name)
if lastError != nil {
lastError = fmt.Errorf("aborting after failing to export round %d: %w", lastRnd, lastError)
return
}
metrics.ExporterTimeSeconds.Observe(exportTime.Seconds())

// Increment Round, update metadata
p.pipelineMetadata.NextRound = lastRnd + 1
_, lastError = RetriesNoOutput(p.pipelineMetadata.encodeToFile, p.cfg.ConduitArgs.ConduitDataDir, p, "pipelineMetadata save")
if lastError != nil {
lastError = fmt.Errorf("aborting after updating NextRound=%d and failing to save metadata: %w", lastRnd+1, lastError)
return
}
p.logger.Tracef("exporter %s @ round %d saved pipeline metadata", exporter.Metadata().Name, p.pipelineMetadata.NextRound)

for i, cb := range p.completeCallback {
p.logger.Tracef("exporter %s @ round=%d NextRound=%d executing callback %d", exporter.Metadata().Name, lastRnd, p.pipelineMetadata.NextRound, i)
_, lastError = RetriesNoOutput(cb, blkData, p, fmt.Sprintf("callback %d", i))
if lastError != nil {
lastError = fmt.Errorf("aborting due to failed callback %d: %w", i, lastError)
return
}
}
lastError = nil
}
}
}()
}

// Start pushes block data through the pipeline
func (p *pipelineImpl) Start() {
p.logger.Trace("Pipeline.Start()")

// Setup channels
roundChan := make(chan uint64)
processorBlkInChan := make(chan data.BlockData)
p.ImportHandler(p.importer, roundChan, processorBlkInChan)

var processorBlkOutChan chan data.BlockData
for i, proc := range p.processors {
processorBlkOutChan = make(chan data.BlockData)
p.ProcessorHandler(i, proc, processorBlkInChan, processorBlkOutChan)
processorBlkInChan = processorBlkOutChan
}

p.ExporterHandler(p.exporter, processorBlkOutChan)

p.wg.Add(1)
// Main loop
go func(startRound uint64) {
defer p.wg.Done()
rnd := startRound
totalFeedWait := time.Duration(0)
defer p.logger.Debugf("round channel feed exiting. lastRnd=%d totalFeedWait=%dms", rnd, totalFeedWait.Milliseconds())
for {
selectStart := time.Now()
p.logger.Tracef("pushing round %d into roundChan", rnd)
select {
case <-p.ctx.Done():
p.logger.Infof("round channel feed exiting. lastRnd=%d", rnd)
return
case roundChan <- rnd:
waitTime := time.Since(selectStart)
totalFeedWait += waitTime
rnd++
}
}
}(p.pipelineMetadata.NextRound)

<-p.ctx.Done()
// TODO: send a prometheus observation based on context.Cause(ctx)
}

// Start pushes block data through the pipeline
func (p *pipelineImpl) OStart() {
p.wg.Add(1)
retry := uint64(0)
go func() {
Expand Down Expand Up @@ -463,7 +693,7 @@ func (p *pipelineImpl) Start() {
blkData, err := p.importer.GetBlock(p.pipelineMetadata.NextRound)
if err != nil {
p.logger.Errorf("%v", err)
p.setError(err)
p.joinError(err)
retry++
goto pipelineRun
}
Expand All @@ -480,7 +710,7 @@ func (p *pipelineImpl) Start() {
blkData, err = proc.Process(blkData)
if err != nil {
p.logger.Errorf("%v", err)
p.setError(err)
p.joinError(err)
retry++
goto pipelineRun
}
Expand All @@ -491,7 +721,7 @@ func (p *pipelineImpl) Start() {
err = p.exporter.Receive(blkData)
if err != nil {
p.logger.Errorf("%v", err)
p.setError(err)
p.joinError(err)
retry++
goto pipelineRun
}
Expand All @@ -509,7 +739,7 @@ func (p *pipelineImpl) Start() {
err = cb(blkData)
if err != nil {
p.logger.Errorf("%v", err)
p.setError(err)
p.joinError(err)
retry++
goto pipelineRun
}
Expand All @@ -519,7 +749,7 @@ func (p *pipelineImpl) Start() {
if p.pipelineMetadata.NextRound > 1 {
addMetrics(blkData, time.Since(start))
}
p.setError(nil)
p.joinError(nil)
retry = 0
}
}
Expand Down Expand Up @@ -554,11 +784,11 @@ func MakePipeline(ctx context.Context, cfg *data.Config, logger *log.Logger) (Pi
return nil, fmt.Errorf("MakePipeline(): logger was empty")
}

cancelContext, cancelFunc := context.WithCancel(ctx)
cancelContext, cancelCauseFunc := context.WithCancelCause(ctx)

pipeline := &pipelineImpl{
ctx: cancelContext,
cf: cancelFunc,
ccf: cancelCauseFunc,
cfg: cfg,
logger: logger,
initProvider: nil,
Expand Down

0 comments on commit 66f1288

Please sign in to comment.