diff --git a/README.md b/README.md index f99ecac..7ccba96 100644 --- a/README.md +++ b/README.md @@ -25,6 +25,7 @@ Runs the chain backup service FLAGS -from-block 1 the starting block number for the backup (inclusive) + -legacy=false flag indicating if the legacy output format should be used (tx-per-line) -output-path ./backup.jsonl the output path for the JSONL chain data -overwrite=false flag indicating if the output file should be overwritten during backup -remote http://127.0.0.1:26657 the JSON-RPC URL of the chain to be backed up diff --git a/backup/backup.go b/backup/backup.go index 7ec08e5..8d2def2 100644 --- a/backup/backup.go +++ b/backup/backup.go @@ -4,6 +4,7 @@ package backup import ( "context" "fmt" + "time" "github.com/gnolang/tx-archive/backup/client" "github.com/gnolang/tx-archive/backup/writer" @@ -19,14 +20,17 @@ type Service struct { client client.Client writer writer.Writer logger log.Logger + + watchInterval time.Duration // interval for the watch routine } // NewService creates a new backup service func NewService(client client.Client, writer writer.Writer, opts ...Option) *Service { s := &Service{ - client: client, - writer: writer, - logger: noop.New(), + client: client, + writer: writer, + logger: noop.New(), + watchInterval: 1 * time.Second, } for _, opt := range opts { @@ -49,39 +53,93 @@ func (s *Service) ExecuteBackup(ctx context.Context, cfg Config) error { return fmt.Errorf("unable to determine right bound, %w", boundErr) } + // Keep track of total txs backed up + totalTxs := uint64(0) + + fetchAndWrite := func(block uint64) error { + txs, txErr := s.client.GetBlockTransactions(block) + if txErr != nil { + return fmt.Errorf("unable to fetch block transactions, %w", txErr) + } + + // Skip empty blocks + if len(txs) == 0 { + return nil + } + + // Save the block transaction data, if any + for _, tx := range txs { + data := &types.TxData{ + Tx: tx, + BlockNum: block, + } + + // Write the tx data to the file + if writeErr := s.writer.WriteTxData(data); writeErr != nil { + return fmt.Errorf("unable to write tx data, %w", writeErr) + } + + totalTxs++ + + // Log the progress + s.logger.Info( + "Transaction backed up", + "total", totalTxs, + ) + } + + return nil + } + // Gather the chain data from the node for block := cfg.FromBlock; block <= toBlock; block++ { select { case <-ctx.Done(): - s.logger.Info("export procedure stopped") + s.logger.Info("backup procedure stopped") return nil default: - txs, txErr := s.client.GetBlockTransactions(block) - if txErr != nil { - return fmt.Errorf("unable to fetch block transactions, %w", txErr) + if fetchErr := fetchAndWrite(block); fetchErr != nil { + return fetchErr } + } + } - // Skip empty blocks - if len(txs) == 0 { - continue - } + // Check if there needs to be a watcher setup + if cfg.Watch { + ticker := time.NewTicker(s.watchInterval) + defer ticker.Stop() + + lastBlock := toBlock + + for { + select { + case <-ctx.Done(): + s.logger.Info("export procedure stopped") - // Save the block transaction data, if any - for _, tx := range txs { - data := &types.TxData{ - Tx: tx, - BlockNum: block, + return nil + case <-ticker.C: + // Fetch the latest block from the chain + latest, latestErr := s.client.GetLatestBlockNumber() + if latestErr != nil { + return fmt.Errorf("unable to fetch latest block number, %w", latestErr) } - // Write the tx data to the file - if writeErr := s.writer.WriteTxData(data); writeErr != nil { - return fmt.Errorf("unable to write tx data, %w", writeErr) + // Check if there have been blocks in the meantime + if lastBlock == latest { + continue } - } - // Log the progress - logProgress(s.logger, cfg.FromBlock, toBlock, block) + // Catch up to the latest block + for block := lastBlock + 1; block <= latest; block++ { + if fetchErr := fetchAndWrite(block); fetchErr != nil { + return fetchErr + } + } + + // Update the last exported block + lastBlock = latest + } } } @@ -111,17 +169,3 @@ func determineRightBound( // Requested right bound is not valid, use the latest block number return latestBlockNumber, nil } - -// logProgress logs the backup progress -func logProgress(logger log.Logger, from, to, current uint64) { - total := to - from - status := (float64(current) - float64(from)) / float64(total) * 100 - - logger.Info( - fmt.Sprintf("Total of %d blocks backed up", current-from+1), - "total", total+1, - "from", from, - "to", to, - "status", fmt.Sprintf("%.2f%%", status), - ) -} diff --git a/backup/backup_test.go b/backup/backup_test.go index 054bb4c..46ab293 100644 --- a/backup/backup_test.go +++ b/backup/backup_test.go @@ -6,6 +6,7 @@ import ( "errors" "os" "testing" + "time" "github.com/gnolang/gno/tm2/pkg/amino" "github.com/gnolang/gno/tm2/pkg/std" @@ -80,7 +81,7 @@ func TestBackup_DetermineRightBound(t *testing.T) { }) } -func TestBackup_ExecuteBackup(t *testing.T) { +func TestBackup_ExecuteBackup_FixedRange(t *testing.T) { t.Parallel() var ( @@ -160,3 +161,97 @@ func TestBackup_ExecuteBackup(t *testing.T) { t.Fatalf("error encountered during scan, %v", err) } } + +func TestBackup_ExecuteBackup_Watch(t *testing.T) { + t.Parallel() + + // Set up the context that is controlled by the test + ctx, cancelFn := context.WithCancel(context.Background()) + defer cancelFn() + + var ( + tempFile = createTempFile(t) + + fromBlock uint64 = 10 + toBlock = fromBlock + 10 + + requestToBlock = toBlock / 2 + + exampleTx = std.Tx{ + Memo: "example transaction", + } + + cfg = DefaultConfig() + + mockClient = &mockClient{ + getLatestBlockNumberFn: func() (uint64, error) { + return toBlock, nil + }, + getBlockTransactionsFn: func(blockNum uint64) ([]std.Tx, error) { + // Sanity check + if blockNum < fromBlock && blockNum > toBlock { + t.Fatal("invalid block number requested") + } + + if blockNum == toBlock { + // End of the road, close the watch process + cancelFn() + } + + return []std.Tx{exampleTx}, nil // 1 tx per block + }, + } + ) + + // Temp file cleanup + t.Cleanup(func() { + require.NoError(t, tempFile.Close()) + require.NoError(t, os.Remove(tempFile.Name())) + }) + + // Set the config + cfg.FromBlock = fromBlock + cfg.ToBlock = &requestToBlock + cfg.Watch = true + + s := NewService(mockClient, standard.NewWriter(tempFile), WithLogger(noop.New())) + s.watchInterval = 10 * time.Millisecond // make the interval almost instant for the test + + // Run the backup procedure + require.NoError( + t, + s.ExecuteBackup( + ctx, + cfg, + ), + ) + + // Read the output file + fileRaw, err := os.Open(tempFile.Name()) + require.NoError(t, err) + + // Set up a line-by-line scanner + scanner := bufio.NewScanner(fileRaw) + + expectedBlock := fromBlock + + // Iterate over each line in the file + for scanner.Scan() { + var txData types.TxData + + // Unmarshal the JSON data into the Person struct + if err := amino.UnmarshalJSON(scanner.Bytes(), &txData); err != nil { + t.Fatalf("unable to unmarshal JSON line, %v", err) + } + + assert.Equal(t, expectedBlock, txData.BlockNum) + assert.Equal(t, exampleTx, txData.Tx) + + expectedBlock++ + } + + // Check for errors during scanning + if err := scanner.Err(); err != nil { + t.Fatalf("error encountered during scan, %v", err) + } +} diff --git a/backup/config.go b/backup/config.go index a1b151b..e13b01a 100644 --- a/backup/config.go +++ b/backup/config.go @@ -13,13 +13,16 @@ var ( type Config struct { ToBlock *uint64 // the right bound for the block range; latest if not specified FromBlock uint64 // the left bound for the block range + + Watch bool // flag indicating if incoming tx data should be backed up } // DefaultConfig returns the default backup configuration func DefaultConfig() Config { return Config{ - ToBlock: nil, // to latest block by default - FromBlock: 1, // from genesis + 1 by default + ToBlock: nil, // to latest block by default + FromBlock: 1, // from genesis + 1 by default + Watch: false, // no tracking by default } } diff --git a/cmd/backup.go b/cmd/backup.go index 284459c..617431e 100644 --- a/cmd/backup.go +++ b/cmd/backup.go @@ -40,6 +40,7 @@ type backupCfg struct { overwrite bool legacy bool + watch bool } // newBackupCmd creates the backup command @@ -101,6 +102,13 @@ func (c *backupCfg) registerFlags(fs *flag.FlagSet) { false, "flag indicating if the legacy output format should be used (tx-per-line)", ) + + fs.BoolVar( + &c.watch, + "watch", + false, + "flag indicating if the backup should append incoming tx data", + ) } // exec executes the backup command @@ -124,6 +132,7 @@ func (c *backupCfg) exec(ctx context.Context, _ []string) error { // Set up the config cfg := backup.DefaultConfig() cfg.FromBlock = c.fromBlock + cfg.Watch = c.watch if c.toBlock >= 0 { to64 := uint64(c.toBlock) diff --git a/cmd/restore.go b/cmd/restore.go index b187b69..08153b3 100644 --- a/cmd/restore.go +++ b/cmd/restore.go @@ -20,9 +20,11 @@ var errInvalidInputPath = errors.New("invalid file input path") // restoreCfg is the restore command configuration type restoreCfg struct { - inputPath string - remote string + inputPath string + remote string + legacyBackup bool + watch bool } // newRestoreCmd creates the restore command @@ -63,6 +65,13 @@ func (c *restoreCfg) registerFlags(fs *flag.FlagSet) { false, "flag indicating if the input file is legacy amino JSON", ) + + fs.BoolVar( + &c.watch, + "watch", + false, + "flag indicating if the restore should watch incoming tx data", + ) } // exec executes the restore command @@ -131,7 +140,7 @@ func (c *restoreCfg) exec(ctx context.Context, _ []string) error { ) // Run the backup service - if backupErr := service.ExecuteRestore(ctx); backupErr != nil { + if backupErr := service.ExecuteRestore(ctx, c.watch); backupErr != nil { return fmt.Errorf("unable to execute restore, %w", backupErr) } diff --git a/restore/restore.go b/restore/restore.go index 3240fbc..96eb850 100644 --- a/restore/restore.go +++ b/restore/restore.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "io" + "time" "github.com/gnolang/gno/tm2/pkg/std" "github.com/gnolang/tx-archive/log" @@ -18,14 +19,17 @@ type Service struct { client client.Client source source.Source logger log.Logger + + watchInterval time.Duration // interval for the watch routine } // NewService creates a new restore service func NewService(client client.Client, source source.Source, opts ...Option) *Service { s := &Service{ - client: client, - source: source, - logger: noop.New(), + client: client, + source: source, + logger: noop.New(), + watchInterval: 1 * time.Second, } for _, opt := range opts { @@ -36,54 +40,78 @@ func NewService(client client.Client, source source.Source, opts ...Option) *Ser } // ExecuteRestore executes the node restore process -func (s *Service) ExecuteRestore(ctx context.Context) error { - var ( - tx *std.Tx - nextErr error - - totalTxs uint64 - ) - - // Fetch next transactions - for nextErr == nil { - tx, nextErr = s.source.Next(ctx) - if nextErr != nil { - break - } +func (s *Service) ExecuteRestore(ctx context.Context, watch bool) error { + fetchTxAndSend := func() error { + var ( + tx *std.Tx + nextErr error + + totalTxs uint64 + ) - // Send the transaction - if sendErr := s.client.SendTransaction(tx); sendErr != nil { - // Invalid transaction sends are only logged, - // and do not stop the restore process - s.logger.Error( - "unable to send transaction", - "err", - sendErr.Error(), + // Fetch next transactions + for nextErr == nil { + tx, nextErr = s.source.Next(ctx) + if nextErr != nil { + break + } + + // Send the transaction + if sendErr := s.client.SendTransaction(tx); sendErr != nil { + // Invalid transaction sends are only logged, + // and do not stop the restore process + s.logger.Error( + "unable to send transaction", + "err", + sendErr.Error(), + ) + + continue + } + + totalTxs++ + + s.logger.Info( + "sent transaction", + "total", + totalTxs, ) + } - continue + // Check if this is the end of the road + if !errors.Is(nextErr, io.EOF) { + return fmt.Errorf("unable to get next transaction, %w", nextErr) } - totalTxs++ + return nil + } - s.logger.Info( - "sent transaction", - "total", - totalTxs, - ) + // Execute the initial restore + if fetchErr := fetchTxAndSend(); fetchErr != nil { + return fetchErr } - // Check if this is the end of the road - if !errors.Is(nextErr, io.EOF) { - return fmt.Errorf("unable to get next transaction, %w", nextErr) + // Check if there needs to be a watcher setup + if watch { + ticker := time.NewTicker(s.watchInterval) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + s.logger.Info("restore procedure stopped") + + return nil + case <-ticker.C: + if fetchErr := fetchTxAndSend(); fetchErr != nil { + return fetchErr + } + } + } } // No more transactions to apply - s.logger.Info( - "restore process finished", - "total", - totalTxs, - ) + s.logger.Info("restore process finished") return nil } diff --git a/restore/restore_test.go b/restore/restore_test.go index 8fe7c9b..409a428 100644 --- a/restore/restore_test.go +++ b/restore/restore_test.go @@ -3,7 +3,9 @@ package restore import ( "context" "io" + "sync/atomic" "testing" + "time" "github.com/gnolang/gno/tm2/pkg/std" "github.com/gnolang/tx-archive/log/noop" @@ -48,7 +50,92 @@ func TestRestore_ExecuteRestore(t *testing.T) { // Execute the restore assert.NoError( t, - s.ExecuteRestore(context.Background()), + s.ExecuteRestore(context.Background(), false), + ) + + // Verify the restore was correct + assert.Len(t, sentTxs, exampleTxCount) + + for _, tx := range sentTxs { + assert.Equal(t, exampleTx, tx) + } +} + +func TestRestore_ExecuteRestore_Watch(t *testing.T) { + t.Parallel() + + ctx, cancelFn := context.WithCancel(context.Background()) + defer cancelFn() + + var ( + exampleTxCount = 20 + exampleTxGiven = 0 + + simulateEOF atomic.Bool + + exampleTx = &std.Tx{ + Memo: "example tx", + } + + sentTxs = make([]*std.Tx, 0) + + mockClient = &mockClient{ + sendTransactionFn: func(tx *std.Tx) error { + sentTxs = append(sentTxs, tx) + + return nil + }, + } + mockSource = &mockSource{ + nextFn: func(ctx context.Context) (*std.Tx, error) { + if simulateEOF.Load() { + return nil, io.EOF + } + + // ~ the half mark, cut off the tx stream + // by simulating the end of the stream (temporarily) + if exampleTxGiven == exampleTxCount/2 { + // Simulate EOF, but after some time + // make sure the Next call returns an actual transaction + simulateEOF.Store(true) + + time.AfterFunc( + 50*time.Millisecond, + func() { + simulateEOF.Store(false) + }, + ) + + exampleTxGiven++ + + return exampleTx, nil + } + + if exampleTxGiven == exampleTxCount { + // All transactions parsed, simulate + // the user cancelling the context + cancelFn() + + return nil, io.EOF + } + + exampleTxGiven++ + + return exampleTx, nil + }, + } + ) + + s := NewService(mockClient, mockSource, WithLogger(noop.New())) + s.watchInterval = 10 * time.Millisecond // make the interval almost instant for the test + + // Execute the restore + assert.NoError( + t, + s.ExecuteRestore( + ctx, + true, // Enable watch + ), ) // Verify the restore was correct