diff --git a/conduit/plugins/importers/algod/algod_importer.go b/conduit/plugins/importers/algod/algod_importer.go index 43e62560..c26cb4d1 100644 --- a/conduit/plugins/importers/algod/algod_importer.go +++ b/conduit/plugins/importers/algod/algod_importer.go @@ -42,7 +42,7 @@ const ( ) var ( - waitForRoundTimeout = 15 * time.Second + waitForRoundTimeout = 30 * time.Second ) const catchpointsURL = "https://algorand-catchpoints.s3.us-east-2.amazonaws.com/consolidated/%s_catchpoints.txt" @@ -418,12 +418,15 @@ func (algodImp *algodImporter) getDelta(rnd uint64) (sdk.LedgerStateDelta, error } // SyncError is used to indicate algod and conduit are not synchronized. -// The retrievedRound is the round returned from an algod status call. -// The expectedRound is the round conduit expected to have gotten back. type SyncError struct { + // retrievedRound is the round returned from an algod status call. retrievedRound uint64 - expectedRound uint64 - err error + + // expectedRound is the round conduit expected to have gotten back. + expectedRound uint64 + + // err is the error that was received from the endpoint caller. + err error } // NewSyncError creates a new SyncError. @@ -456,7 +459,8 @@ func waitForRoundWithTimeout(ctx context.Context, l *logrus.Logger, c *algod.Cli if rnd <= status.LastRound { return status.LastRound, nil } - return 0, NewSyncError(status.LastRound, rnd, fmt.Errorf("this check should never be required: %w", err)) + // algod's timeout should not be reached because context.WithTimeout is used + return 0, NewSyncError(status.LastRound, rnd, fmt.Errorf("sync error, likely due to status after block timeout")) } // If there was a different error and the node is responsive, call status before returning a SyncError. @@ -480,10 +484,8 @@ func (algodImp *algodImporter) getBlockInner(rnd uint64) (data.BlockData, error) nodeRound, err := waitForRoundWithTimeout(algodImp.ctx, algodImp.logger, algodImp.aclient, rnd, waitForRoundTimeout) if err != nil { - if algodImp.ctx.Err() != nil { - return blk, fmt.Errorf("importer algod.GetBlock() ctx cancelled: %w", err) - } - algodImp.logger.Errorf("importer algod.GetBlock() called waitForRoundWithTimeout: %v", err) + err = fmt.Errorf("called waitForRoundWithTimeout: %w", err) + algodImp.logger.Errorf(err.Error()) return data.BlockData{}, err } start := time.Now() @@ -492,13 +494,14 @@ func (algodImp *algodImporter) getBlockInner(rnd uint64) (data.BlockData, error) dt := time.Since(start) getAlgodRawBlockTimeSeconds.Observe(dt.Seconds()) if err != nil { - algodImp.logger.Errorf("importer algod.GetBlock() error getting block for round %d: %s", rnd, err.Error()) + err = fmt.Errorf("error getting block for round %d: %w", rnd, err) + algodImp.logger.Errorf(err.Error()) return data.BlockData{}, err } tmpBlk := new(models.BlockResponse) err = msgpack.Decode(blockbytes, tmpBlk) if err != nil { - return blk, err + return blk, fmt.Errorf("error decoding block for round %d: %w", rnd, err) } blk.BlockHeader = tmpBlk.Block.BlockHeader @@ -512,9 +515,9 @@ func (algodImp *algodImporter) getBlockInner(rnd uint64) (data.BlockData, error) delta, err = algodImp.getDelta(rnd) if err != nil { if nodeRound < rnd { - err = fmt.Errorf("importer algod.GetBlock() ledger state delta not found: node round (%d) is behind required round (%d), ensure follower node has its sync round set to the required round: %w", nodeRound, rnd, err) + err = fmt.Errorf("ledger state delta not found: node round (%d) is behind required round (%d), ensure follower node has its sync round set to the required round: %w", nodeRound, rnd, err) } else { - err = fmt.Errorf("importer algod.GetBlock() ledger state delta not found: node round (%d), required round (%d): verify follower node configuration and ensure follower node has its sync round set to the required round, re-deploying the follower node may be necessary: %w", nodeRound, rnd, err) + err = fmt.Errorf("ledger state delta not found: node round (%d), required round (%d): verify follower node configuration and ensure follower node has its sync round set to the required round, re-deploying the follower node may be necessary: %w", nodeRound, rnd, err) } algodImp.logger.Error(err.Error()) return data.BlockData{}, err diff --git a/conduit/plugins/importers/algod/algod_importer_test.go b/conduit/plugins/importers/algod/algod_importer_test.go index c7cfc185..00c07a9e 100644 --- a/conduit/plugins/importers/algod/algod_importer_test.go +++ b/conduit/plugins/importers/algod/algod_importer_test.go @@ -682,34 +682,39 @@ func TestGetBlockErrors(t *testing.T) { name: "Cannot wait for block", rnd: 123, blockAfterResponder: MakeJsonResponderSeries("/wait-for-block-after", []int{http.StatusOK, http.StatusNotFound}, []interface{}{models.NodeStatus{LastRound: 1}}), - err: fmt.Sprintf("error getting block for round 123"), + blockResponder: nil, + deltaResponder: nil, + err: "error getting block for round 123", logs: []string{"error getting block for round 123"}, }, { name: "Cannot get block", rnd: 123, blockAfterResponder: BlockAfterResponder, - deltaResponder: MakeMsgpStatusResponder("get", "/v2/deltas/", http.StatusNotFound, sdk.LedgerStateDelta{}), blockResponder: MakeMsgpStatusResponder("get", "/v2/blocks/", http.StatusNotFound, ""), - err: fmt.Sprintf("error getting block for round 123"), + deltaResponder: MakeMsgpStatusResponder("get", "/v2/deltas/", http.StatusNotFound, sdk.LedgerStateDelta{}), + err: "error getting block for round 123", logs: []string{"error getting block for round 123"}, }, { - name: "Cannot get delta (node behind, re-send sync)", + name: "Cannot get delta - node behind, re-send sync", rnd: 200, blockAfterResponder: MakeBlockAfterResponder(models.NodeStatus{LastRound: 50}), blockResponder: BlockResponder, deltaResponder: MakeMsgpStatusResponder("get", "/v2/deltas/", http.StatusNotFound, ""), - err: fmt.Sprintf("wrong round returned from status for round: retrieved(50) != expected(200)"), - logs: []string{"wrong round returned from status for round: retrieved(50) != expected(200)", "sync error detected, attempting to set the sync round to recover the node"}, + err: "wrong round returned from status for round: retrieved(50) != expected(200)", + logs: []string{ + "wrong round returned from status for round: retrieved(50) != expected(200)", + "sync error detected, attempting to set the sync round to recover the node", + }, }, { - name: "Cannot get delta (caught up)", + name: "Cannot get delta - caught up", rnd: 200, blockAfterResponder: MakeBlockAfterResponder(models.NodeStatus{LastRound: 200}), blockResponder: BlockResponder, deltaResponder: MakeMsgpStatusResponder("get", "/v2/deltas/", http.StatusNotFound, ""), - err: fmt.Sprintf("ledger state delta not found: node round (200), required round (200)"), + err: "ledger state delta not found: node round (200), required round (200)", logs: []string{"ledger state delta not found: node round (200), required round (200)"}, }, } @@ -753,22 +758,26 @@ func TestGetBlockErrors(t *testing.T) { _, err = testImporter.GetBlock(tc.rnd) noError := assert.ErrorContains(t, err, tc.err) - // Make sure each of the expected log messages are present + // Make sure each of the expected log messages are present in the hookEntries + hookEntries := hook.AllEntries() for _, log := range tc.logs { found := false - hookEntries := hook.AllEntries() for _, entry := range hookEntries { - fmt.Println(strings.Contains(entry.Message, log)) - found = found || strings.Contains(entry.Message, log) + logIsSubstring := strings.Contains(entry.Message, log) + found = found || logIsSubstring + fmt.Printf("logIsSubstring=%t, found=%t:\n\t%s\n", logIsSubstring, found, entry.Message) + } + if !found { + fmt.Printf(">>>>>>WE HAVE A PROBLEM<<<<<<\n") } - noError = noError && assert.True(t, found, "Expected log was not found: '%s'", log) + noError = noError && assert.True(t, found, "(%s) Expected log was not found: '%s'", tc.name, log) } // Print logs if there was an error. if !noError { - fmt.Println("An error was detected, printing logs") + fmt.Printf("An error was detected, printing logs (%s)\n", tc.name) fmt.Println("------------------------------------") - for _, entry := range hook.AllEntries() { + for _, entry := range hookEntries { fmt.Printf(" %s\n", entry.Message) } fmt.Println("------------------------------------") diff --git a/conduit/plugins/importers/algod/mock_algod_test.go b/conduit/plugins/importers/algod/mock_algod_test.go index f5b5bed5..deb61c44 100644 --- a/conduit/plugins/importers/algod/mock_algod_test.go +++ b/conduit/plugins/importers/algod/mock_algod_test.go @@ -33,7 +33,11 @@ func NewAlgodHandler(responders ...algodCustomHandler) *AlgodHandler { // ServeHTTP implements the http.Handler interface for AlgodHandler func (handler *AlgodHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { - for _, responder := range handler.responders { + for i, responder := range handler.responders { + _ = i + if responder == nil { + continue + } if responder(req, w) { return } diff --git a/docs/GettingStarted.md b/docs/GettingStarted.md deleted file mode 100644 index 7be184c1..00000000 --- a/docs/GettingStarted.md +++ /dev/null @@ -1,45 +0,0 @@ -# Getting Started - -## Installation - -### Download - -The latest `conduit` binary can be downloaded from the [GitHub releases page](https://github.com/algorand/conduit/releases). - -### Install from Source - -1. Checkout the repo, or download the source, `git clone https://github.com/algorand/conduit.git && cd conduit` -2. Run `make conduit`. -3. The binary is created at `cmd/conduit/conduit`. - -## Configuration and Plugins - -### Configuration File - -Conduit requires a configuration file to set up and run a data pipeline. To generate an initial skeleton for a conduit -config file, you can run `./conduit init -d data`. This will set up a sample data directory with a config located at -`data/conduit.yml`. - -You will need to manually edit the data in the config file, filling in a valid configuration for conduit to run. -You can find a valid config file in [Configuration.md](Configuration.md) or via the `conduit init` command. - -Once you have a valid config file in a directory, `config_directory`, launch conduit with `./conduit -d config_directory`. - -### Plugins - -Conduit comes with an initial set of plugins available for use in pipelines. For more information on the possible -plugins and how to include these plugins in your pipeline's configuration file see [Configuration.md](Configuration.md). - -## Tutorials - -### Migrate from the Legacy Indexer Architecture to a Conduit-backed Indexer - -[How to migrate from a legacy Indexer architecture to a Conduit-backed Indexer deployment. .](./tutorials/IndexerMigration.md) - -### Set up Conduit for the Indexer API - -[How to configure algod, PostgreSQL and Conduit as an Indexer API backend.](./tutorials/IndexerWriter.md) - -### Writing Block Data to the Filesystem - -[Use the file exporter to write data to files.](./tutorials/WritingBlocksToFile.md)