Skip to content

Commit

Permalink
feat: add --watch support for the backup / restore process (#6)
Browse files Browse the repository at this point in the history
* Add watch support for the backup process

* Add watch support for the restore process

* Change backup log output

* Resolve race condition in test

* Update README
  • Loading branch information
zivkovicmilos authored Oct 6, 2023
1 parent 1eb8b7e commit d63c689
Show file tree
Hide file tree
Showing 8 changed files with 359 additions and 83 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ Runs the chain backup service

FLAGS
-from-block 1 the starting block number for the backup (inclusive)
-legacy=false flag indicating if the legacy output format should be used (tx-per-line)
-output-path ./backup.jsonl the output path for the JSONL chain data
-overwrite=false flag indicating if the output file should be overwritten during backup
-remote http://127.0.0.1:26657 the JSON-RPC URL of the chain to be backed up
Expand Down
116 changes: 80 additions & 36 deletions backup/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package backup
import (
"context"
"fmt"
"time"

"github.com/gnolang/tx-archive/backup/client"
"github.com/gnolang/tx-archive/backup/writer"
Expand All @@ -19,14 +20,17 @@ type Service struct {
client client.Client
writer writer.Writer
logger log.Logger

watchInterval time.Duration // interval for the watch routine
}

// NewService creates a new backup service
func NewService(client client.Client, writer writer.Writer, opts ...Option) *Service {
s := &Service{
client: client,
writer: writer,
logger: noop.New(),
client: client,
writer: writer,
logger: noop.New(),
watchInterval: 1 * time.Second,
}

for _, opt := range opts {
Expand All @@ -49,39 +53,93 @@ func (s *Service) ExecuteBackup(ctx context.Context, cfg Config) error {
return fmt.Errorf("unable to determine right bound, %w", boundErr)
}

// Keep track of total txs backed up
totalTxs := uint64(0)

fetchAndWrite := func(block uint64) error {
txs, txErr := s.client.GetBlockTransactions(block)
if txErr != nil {
return fmt.Errorf("unable to fetch block transactions, %w", txErr)
}

// Skip empty blocks
if len(txs) == 0 {
return nil
}

// Save the block transaction data, if any
for _, tx := range txs {
data := &types.TxData{
Tx: tx,
BlockNum: block,
}

// Write the tx data to the file
if writeErr := s.writer.WriteTxData(data); writeErr != nil {
return fmt.Errorf("unable to write tx data, %w", writeErr)
}

totalTxs++

// Log the progress
s.logger.Info(
"Transaction backed up",
"total", totalTxs,
)
}

return nil
}

// Gather the chain data from the node
for block := cfg.FromBlock; block <= toBlock; block++ {
select {
case <-ctx.Done():
s.logger.Info("export procedure stopped")
s.logger.Info("backup procedure stopped")

return nil
default:
txs, txErr := s.client.GetBlockTransactions(block)
if txErr != nil {
return fmt.Errorf("unable to fetch block transactions, %w", txErr)
if fetchErr := fetchAndWrite(block); fetchErr != nil {
return fetchErr
}
}
}

// Skip empty blocks
if len(txs) == 0 {
continue
}
// Check if there needs to be a watcher setup
if cfg.Watch {
ticker := time.NewTicker(s.watchInterval)
defer ticker.Stop()

lastBlock := toBlock

for {
select {
case <-ctx.Done():
s.logger.Info("export procedure stopped")

// Save the block transaction data, if any
for _, tx := range txs {
data := &types.TxData{
Tx: tx,
BlockNum: block,
return nil
case <-ticker.C:
// Fetch the latest block from the chain
latest, latestErr := s.client.GetLatestBlockNumber()
if latestErr != nil {
return fmt.Errorf("unable to fetch latest block number, %w", latestErr)
}

// Write the tx data to the file
if writeErr := s.writer.WriteTxData(data); writeErr != nil {
return fmt.Errorf("unable to write tx data, %w", writeErr)
// Check if there have been blocks in the meantime
if lastBlock == latest {
continue
}
}

// Log the progress
logProgress(s.logger, cfg.FromBlock, toBlock, block)
// Catch up to the latest block
for block := lastBlock + 1; block <= latest; block++ {
if fetchErr := fetchAndWrite(block); fetchErr != nil {
return fetchErr
}
}

// Update the last exported block
lastBlock = latest
}
}
}

Expand Down Expand Up @@ -111,17 +169,3 @@ func determineRightBound(
// Requested right bound is not valid, use the latest block number
return latestBlockNumber, nil
}

// logProgress logs the backup progress
func logProgress(logger log.Logger, from, to, current uint64) {
total := to - from
status := (float64(current) - float64(from)) / float64(total) * 100

logger.Info(
fmt.Sprintf("Total of %d blocks backed up", current-from+1),
"total", total+1,
"from", from,
"to", to,
"status", fmt.Sprintf("%.2f%%", status),
)
}
97 changes: 96 additions & 1 deletion backup/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"errors"
"os"
"testing"
"time"

"github.com/gnolang/gno/tm2/pkg/amino"
"github.com/gnolang/gno/tm2/pkg/std"
Expand Down Expand Up @@ -80,7 +81,7 @@ func TestBackup_DetermineRightBound(t *testing.T) {
})
}

func TestBackup_ExecuteBackup(t *testing.T) {
func TestBackup_ExecuteBackup_FixedRange(t *testing.T) {
t.Parallel()

var (
Expand Down Expand Up @@ -160,3 +161,97 @@ func TestBackup_ExecuteBackup(t *testing.T) {
t.Fatalf("error encountered during scan, %v", err)
}
}

func TestBackup_ExecuteBackup_Watch(t *testing.T) {
t.Parallel()

// Set up the context that is controlled by the test
ctx, cancelFn := context.WithCancel(context.Background())
defer cancelFn()

var (
tempFile = createTempFile(t)

fromBlock uint64 = 10
toBlock = fromBlock + 10

requestToBlock = toBlock / 2

exampleTx = std.Tx{
Memo: "example transaction",
}

cfg = DefaultConfig()

mockClient = &mockClient{
getLatestBlockNumberFn: func() (uint64, error) {
return toBlock, nil
},
getBlockTransactionsFn: func(blockNum uint64) ([]std.Tx, error) {
// Sanity check
if blockNum < fromBlock && blockNum > toBlock {
t.Fatal("invalid block number requested")
}

if blockNum == toBlock {
// End of the road, close the watch process
cancelFn()
}

return []std.Tx{exampleTx}, nil // 1 tx per block
},
}
)

// Temp file cleanup
t.Cleanup(func() {
require.NoError(t, tempFile.Close())
require.NoError(t, os.Remove(tempFile.Name()))
})

// Set the config
cfg.FromBlock = fromBlock
cfg.ToBlock = &requestToBlock
cfg.Watch = true

s := NewService(mockClient, standard.NewWriter(tempFile), WithLogger(noop.New()))
s.watchInterval = 10 * time.Millisecond // make the interval almost instant for the test

// Run the backup procedure
require.NoError(
t,
s.ExecuteBackup(
ctx,
cfg,
),
)

// Read the output file
fileRaw, err := os.Open(tempFile.Name())
require.NoError(t, err)

// Set up a line-by-line scanner
scanner := bufio.NewScanner(fileRaw)

expectedBlock := fromBlock

// Iterate over each line in the file
for scanner.Scan() {
var txData types.TxData

// Unmarshal the JSON data into the Person struct
if err := amino.UnmarshalJSON(scanner.Bytes(), &txData); err != nil {
t.Fatalf("unable to unmarshal JSON line, %v", err)
}

assert.Equal(t, expectedBlock, txData.BlockNum)
assert.Equal(t, exampleTx, txData.Tx)

expectedBlock++
}

// Check for errors during scanning
if err := scanner.Err(); err != nil {
t.Fatalf("error encountered during scan, %v", err)
}
}
7 changes: 5 additions & 2 deletions backup/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,16 @@ var (
type Config struct {
ToBlock *uint64 // the right bound for the block range; latest if not specified
FromBlock uint64 // the left bound for the block range

Watch bool // flag indicating if incoming tx data should be backed up
}

// DefaultConfig returns the default backup configuration
func DefaultConfig() Config {
return Config{
ToBlock: nil, // to latest block by default
FromBlock: 1, // from genesis + 1 by default
ToBlock: nil, // to latest block by default
FromBlock: 1, // from genesis + 1 by default
Watch: false, // no tracking by default
}
}

Expand Down
9 changes: 9 additions & 0 deletions cmd/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ type backupCfg struct {

overwrite bool
legacy bool
watch bool
}

// newBackupCmd creates the backup command
Expand Down Expand Up @@ -101,6 +102,13 @@ func (c *backupCfg) registerFlags(fs *flag.FlagSet) {
false,
"flag indicating if the legacy output format should be used (tx-per-line)",
)

fs.BoolVar(
&c.watch,
"watch",
false,
"flag indicating if the backup should append incoming tx data",
)
}

// exec executes the backup command
Expand All @@ -124,6 +132,7 @@ func (c *backupCfg) exec(ctx context.Context, _ []string) error {
// Set up the config
cfg := backup.DefaultConfig()
cfg.FromBlock = c.fromBlock
cfg.Watch = c.watch

if c.toBlock >= 0 {
to64 := uint64(c.toBlock)
Expand Down
15 changes: 12 additions & 3 deletions cmd/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@ var errInvalidInputPath = errors.New("invalid file input path")

// restoreCfg is the restore command configuration
type restoreCfg struct {
inputPath string
remote string
inputPath string
remote string

legacyBackup bool
watch bool
}

// newRestoreCmd creates the restore command
Expand Down Expand Up @@ -63,6 +65,13 @@ func (c *restoreCfg) registerFlags(fs *flag.FlagSet) {
false,
"flag indicating if the input file is legacy amino JSON",
)

fs.BoolVar(
&c.watch,
"watch",
false,
"flag indicating if the restore should watch incoming tx data",
)
}

// exec executes the restore command
Expand Down Expand Up @@ -131,7 +140,7 @@ func (c *restoreCfg) exec(ctx context.Context, _ []string) error {
)

// Run the backup service
if backupErr := service.ExecuteRestore(ctx); backupErr != nil {
if backupErr := service.ExecuteRestore(ctx, c.watch); backupErr != nil {
return fmt.Errorf("unable to execute restore, %w", backupErr)
}

Expand Down
Loading

0 comments on commit d63c689

Please sign in to comment.