Skip to content

Commit

Permalink
Merge branch 'master' into pipelining
Browse files Browse the repository at this point in the history
  • Loading branch information
Zeph Grunschlag committed Aug 11, 2023
2 parents cb86036 + 7ef6106 commit 8c1e6fa
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 75 deletions.
31 changes: 17 additions & 14 deletions conduit/plugins/importers/algod/algod_importer.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ const (
)

var (
waitForRoundTimeout = 15 * time.Second
waitForRoundTimeout = 30 * time.Second
)

const catchpointsURL = "https://algorand-catchpoints.s3.us-east-2.amazonaws.com/consolidated/%s_catchpoints.txt"
Expand Down Expand Up @@ -418,12 +418,15 @@ func (algodImp *algodImporter) getDelta(rnd uint64) (sdk.LedgerStateDelta, error
}

// SyncError is used to indicate algod and conduit are not synchronized.
// The retrievedRound is the round returned from an algod status call.
// The expectedRound is the round conduit expected to have gotten back.
type SyncError struct {
// retrievedRound is the round returned from an algod status call.
retrievedRound uint64
expectedRound uint64
err error

// expectedRound is the round conduit expected to have gotten back.
expectedRound uint64

// err is the error that was received from the endpoint caller.
err error
}

// NewSyncError creates a new SyncError.
Expand Down Expand Up @@ -456,7 +459,8 @@ func waitForRoundWithTimeout(ctx context.Context, l *logrus.Logger, c *algod.Cli
if rnd <= status.LastRound {
return status.LastRound, nil
}
return 0, NewSyncError(status.LastRound, rnd, fmt.Errorf("this check should never be required: %w", err))
// algod's timeout should not be reached because context.WithTimeout is used
return 0, NewSyncError(status.LastRound, rnd, fmt.Errorf("sync error, likely due to status after block timeout"))
}

// If there was a different error and the node is responsive, call status before returning a SyncError.
Expand All @@ -480,10 +484,8 @@ func (algodImp *algodImporter) getBlockInner(rnd uint64) (data.BlockData, error)

nodeRound, err := waitForRoundWithTimeout(algodImp.ctx, algodImp.logger, algodImp.aclient, rnd, waitForRoundTimeout)
if err != nil {
if algodImp.ctx.Err() != nil {
return blk, fmt.Errorf("importer algod.GetBlock() ctx cancelled: %w", err)
}
algodImp.logger.Errorf("importer algod.GetBlock() called waitForRoundWithTimeout: %v", err)
err = fmt.Errorf("called waitForRoundWithTimeout: %w", err)
algodImp.logger.Errorf(err.Error())
return data.BlockData{}, err
}
start := time.Now()
Expand All @@ -492,13 +494,14 @@ func (algodImp *algodImporter) getBlockInner(rnd uint64) (data.BlockData, error)
dt := time.Since(start)
getAlgodRawBlockTimeSeconds.Observe(dt.Seconds())
if err != nil {
algodImp.logger.Errorf("importer algod.GetBlock() error getting block for round %d: %s", rnd, err.Error())
err = fmt.Errorf("error getting block for round %d: %w", rnd, err)
algodImp.logger.Errorf(err.Error())
return data.BlockData{}, err
}
tmpBlk := new(models.BlockResponse)
err = msgpack.Decode(blockbytes, tmpBlk)
if err != nil {
return blk, err
return blk, fmt.Errorf("error decoding block for round %d: %w", rnd, err)
}

Check warning on line 505 in conduit/plugins/importers/algod/algod_importer.go

View check run for this annotation

Codecov / codecov/patch

conduit/plugins/importers/algod/algod_importer.go#L504-L505

Added lines #L504 - L505 were not covered by tests

blk.BlockHeader = tmpBlk.Block.BlockHeader
Expand All @@ -512,9 +515,9 @@ func (algodImp *algodImporter) getBlockInner(rnd uint64) (data.BlockData, error)
delta, err = algodImp.getDelta(rnd)
if err != nil {
if nodeRound < rnd {
err = fmt.Errorf("importer algod.GetBlock() ledger state delta not found: node round (%d) is behind required round (%d), ensure follower node has its sync round set to the required round: %w", nodeRound, rnd, err)
err = fmt.Errorf("ledger state delta not found: node round (%d) is behind required round (%d), ensure follower node has its sync round set to the required round: %w", nodeRound, rnd, err)

Check warning on line 518 in conduit/plugins/importers/algod/algod_importer.go

View check run for this annotation

Codecov / codecov/patch

conduit/plugins/importers/algod/algod_importer.go#L518

Added line #L518 was not covered by tests
} else {
err = fmt.Errorf("importer algod.GetBlock() ledger state delta not found: node round (%d), required round (%d): verify follower node configuration and ensure follower node has its sync round set to the required round, re-deploying the follower node may be necessary: %w", nodeRound, rnd, err)
err = fmt.Errorf("ledger state delta not found: node round (%d), required round (%d): verify follower node configuration and ensure follower node has its sync round set to the required round, re-deploying the follower node may be necessary: %w", nodeRound, rnd, err)
}
algodImp.logger.Error(err.Error())
return data.BlockData{}, err
Expand Down
39 changes: 24 additions & 15 deletions conduit/plugins/importers/algod/algod_importer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -682,34 +682,39 @@ func TestGetBlockErrors(t *testing.T) {
name: "Cannot wait for block",
rnd: 123,
blockAfterResponder: MakeJsonResponderSeries("/wait-for-block-after", []int{http.StatusOK, http.StatusNotFound}, []interface{}{models.NodeStatus{LastRound: 1}}),
err: fmt.Sprintf("error getting block for round 123"),
blockResponder: nil,
deltaResponder: nil,
err: "error getting block for round 123",
logs: []string{"error getting block for round 123"},
},
{
name: "Cannot get block",
rnd: 123,
blockAfterResponder: BlockAfterResponder,
deltaResponder: MakeMsgpStatusResponder("get", "/v2/deltas/", http.StatusNotFound, sdk.LedgerStateDelta{}),
blockResponder: MakeMsgpStatusResponder("get", "/v2/blocks/", http.StatusNotFound, ""),
err: fmt.Sprintf("error getting block for round 123"),
deltaResponder: MakeMsgpStatusResponder("get", "/v2/deltas/", http.StatusNotFound, sdk.LedgerStateDelta{}),
err: "error getting block for round 123",
logs: []string{"error getting block for round 123"},
},
{
name: "Cannot get delta (node behind, re-send sync)",
name: "Cannot get delta - node behind, re-send sync",
rnd: 200,
blockAfterResponder: MakeBlockAfterResponder(models.NodeStatus{LastRound: 50}),
blockResponder: BlockResponder,
deltaResponder: MakeMsgpStatusResponder("get", "/v2/deltas/", http.StatusNotFound, ""),
err: fmt.Sprintf("wrong round returned from status for round: retrieved(50) != expected(200)"),
logs: []string{"wrong round returned from status for round: retrieved(50) != expected(200)", "sync error detected, attempting to set the sync round to recover the node"},
err: "wrong round returned from status for round: retrieved(50) != expected(200)",
logs: []string{
"wrong round returned from status for round: retrieved(50) != expected(200)",
"sync error detected, attempting to set the sync round to recover the node",
},
},
{
name: "Cannot get delta (caught up)",
name: "Cannot get delta - caught up",
rnd: 200,
blockAfterResponder: MakeBlockAfterResponder(models.NodeStatus{LastRound: 200}),
blockResponder: BlockResponder,
deltaResponder: MakeMsgpStatusResponder("get", "/v2/deltas/", http.StatusNotFound, ""),
err: fmt.Sprintf("ledger state delta not found: node round (200), required round (200)"),
err: "ledger state delta not found: node round (200), required round (200)",
logs: []string{"ledger state delta not found: node round (200), required round (200)"},
},
}
Expand Down Expand Up @@ -753,22 +758,26 @@ func TestGetBlockErrors(t *testing.T) {
_, err = testImporter.GetBlock(tc.rnd)
noError := assert.ErrorContains(t, err, tc.err)

// Make sure each of the expected log messages are present
// Make sure each of the expected log messages are present in the hookEntries
hookEntries := hook.AllEntries()
for _, log := range tc.logs {
found := false
hookEntries := hook.AllEntries()
for _, entry := range hookEntries {
fmt.Println(strings.Contains(entry.Message, log))
found = found || strings.Contains(entry.Message, log)
logIsSubstring := strings.Contains(entry.Message, log)
found = found || logIsSubstring
fmt.Printf("logIsSubstring=%t, found=%t:\n\t%s\n", logIsSubstring, found, entry.Message)
}
if !found {
fmt.Printf(">>>>>>WE HAVE A PROBLEM<<<<<<\n")
}
noError = noError && assert.True(t, found, "Expected log was not found: '%s'", log)
noError = noError && assert.True(t, found, "(%s) Expected log was not found: '%s'", tc.name, log)
}

// Print logs if there was an error.
if !noError {
fmt.Println("An error was detected, printing logs")
fmt.Printf("An error was detected, printing logs (%s)\n", tc.name)
fmt.Println("------------------------------------")
for _, entry := range hook.AllEntries() {
for _, entry := range hookEntries {
fmt.Printf(" %s\n", entry.Message)
}
fmt.Println("------------------------------------")
Expand Down
6 changes: 5 additions & 1 deletion conduit/plugins/importers/algod/mock_algod_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,11 @@ func NewAlgodHandler(responders ...algodCustomHandler) *AlgodHandler {

// ServeHTTP implements the http.Handler interface for AlgodHandler
func (handler *AlgodHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
for _, responder := range handler.responders {
for i, responder := range handler.responders {
_ = i
if responder == nil {
continue
}
if responder(req, w) {
return
}
Expand Down
45 changes: 0 additions & 45 deletions docs/GettingStarted.md

This file was deleted.

0 comments on commit 8c1e6fa

Please sign in to comment.