Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add --watch support for the backup / restore process #6

Merged
merged 6 commits into from
Oct 6, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 80 additions & 36 deletions backup/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"context"
"fmt"
"io"
"time"

"github.com/gnolang/gno/tm2/pkg/amino"
"github.com/gnolang/tx-archive/backup/client"
Expand All @@ -20,14 +21,17 @@ type Service struct {
client client.Client
writer io.Writer
logger log.Logger

watchInterval time.Duration // interval for the watch routine
}

// NewService creates a new backup service
func NewService(client client.Client, writer io.Writer, opts ...Option) *Service {
s := &Service{
client: client,
writer: writer,
logger: noop.New(),
client: client,
writer: writer,
logger: noop.New(),
watchInterval: 1 * time.Second,
}

for _, opt := range opts {
Expand All @@ -50,39 +54,93 @@ func (s *Service) ExecuteBackup(ctx context.Context, cfg Config) error {
return fmt.Errorf("unable to determine right bound, %w", boundErr)
}

// Keep track of total txs backed up
totalTxs := uint64(0)

fetchAndWrite := func(block uint64) error {
txs, txErr := s.client.GetBlockTransactions(block)
if txErr != nil {
return fmt.Errorf("unable to fetch block transactions, %w", txErr)
}

// Skip empty blocks
if len(txs) == 0 {
return nil
}

// Save the block transaction data, if any
for _, tx := range txs {
data := &types.TxData{
Tx: tx,
BlockNum: block,
}

// Write the tx data to the file
if writeErr := writeTxData(s.writer, data); writeErr != nil {
return fmt.Errorf("unable to write tx data, %w", writeErr)
}

totalTxs++

// Log the progress
s.logger.Info(
"Transaction backed up",
"total", totalTxs,
)
}

return nil
}

// Gather the chain data from the node
for block := cfg.FromBlock; block <= toBlock; block++ {
select {
case <-ctx.Done():
s.logger.Info("export procedure stopped")
s.logger.Info("backup procedure stopped")

return nil
default:
txs, txErr := s.client.GetBlockTransactions(block)
if txErr != nil {
return fmt.Errorf("unable to fetch block transactions, %w", txErr)
if fetchErr := fetchAndWrite(block); fetchErr != nil {
return fetchErr
}
}
}

// Skip empty blocks
if len(txs) == 0 {
continue
}
// Check if there needs to be a watcher setup
if cfg.Watch {
ticker := time.NewTicker(s.watchInterval)
defer ticker.Stop()

lastBlock := toBlock

for {
select {
case <-ctx.Done():
s.logger.Info("export procedure stopped")

// Save the block transaction data, if any
for _, tx := range txs {
data := &types.TxData{
Tx: tx,
BlockNum: block,
return nil
case <-ticker.C:
// Fetch the latest block from the chain
latest, latestErr := s.client.GetLatestBlockNumber()
if latestErr != nil {
return fmt.Errorf("unable to fetch latest block number, %w", latestErr)
}

// Write the tx data to the file
if writeErr := writeTxData(s.writer, data); writeErr != nil {
return fmt.Errorf("unable to write tx data, %w", writeErr)
// Check if there have been blocks in the meantime
if lastBlock == latest {
continue
}
}

// Log the progress
logProgress(s.logger, cfg.FromBlock, toBlock, block)
// Catch up to the latest block
for block := lastBlock + 1; block <= latest; block++ {
if fetchErr := fetchAndWrite(block); fetchErr != nil {
return fetchErr
}
}

// Update the last exported block
lastBlock = latest
}
}
}

Expand Down Expand Up @@ -135,17 +193,3 @@ func writeTxData(writer io.Writer, txData *types.TxData) error {

return nil
}

// logProgress logs the backup progress
func logProgress(logger log.Logger, from, to, current uint64) {
total := to - from
status := (float64(current) - float64(from)) / float64(total) * 100

logger.Info(
fmt.Sprintf("Total of %d blocks backed up", current-from+1),
"total", total+1,
"from", from,
"to", to,
"status", fmt.Sprintf("%.2f%%", status),
)
}
97 changes: 96 additions & 1 deletion backup/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"errors"
"os"
"testing"
"time"

"github.com/gnolang/gno/tm2/pkg/amino"
"github.com/gnolang/gno/tm2/pkg/std"
Expand Down Expand Up @@ -79,7 +80,7 @@ func TestBackup_DetermineRightBound(t *testing.T) {
})
}

func TestBackup_ExecuteBackup(t *testing.T) {
func TestBackup_ExecuteBackup_FixedRange(t *testing.T) {
t.Parallel()

var (
Expand Down Expand Up @@ -159,3 +160,97 @@ func TestBackup_ExecuteBackup(t *testing.T) {
t.Fatalf("error encountered during scan, %v", err)
}
}

func TestBackup_ExecuteBackup_Watch(t *testing.T) {
t.Parallel()

// Set up the context that is controlled by the test
ctx, cancelFn := context.WithCancel(context.Background())
defer cancelFn()

var (
tempFile = createTempFile(t)

fromBlock uint64 = 10
toBlock = fromBlock + 10

requestToBlock = toBlock / 2

exampleTx = std.Tx{
Memo: "example transaction",
}

cfg = DefaultConfig()

mockClient = &mockClient{
getLatestBlockNumberFn: func() (uint64, error) {
return toBlock, nil
},
getBlockTransactionsFn: func(blockNum uint64) ([]std.Tx, error) {
// Sanity check
if blockNum < fromBlock && blockNum > toBlock {
t.Fatal("invalid block number requested")
}

if blockNum == toBlock {
// End of the road, close the watch process
cancelFn()
}

return []std.Tx{exampleTx}, nil // 1 tx per block
},
}
)

// Temp file cleanup
t.Cleanup(func() {
require.NoError(t, tempFile.Close())
require.NoError(t, os.Remove(tempFile.Name()))
})

// Set the config
cfg.FromBlock = fromBlock
cfg.ToBlock = &requestToBlock
cfg.Watch = true

s := NewService(mockClient, tempFile, WithLogger(noop.New()))
s.watchInterval = 10 * time.Millisecond // make the interval almost instant for the test

// Run the backup procedure
require.NoError(
t,
s.ExecuteBackup(
ctx,
cfg,
),
)

// Read the output file
fileRaw, err := os.Open(tempFile.Name())
require.NoError(t, err)

// Set up a line-by-line scanner
scanner := bufio.NewScanner(fileRaw)

expectedBlock := fromBlock

// Iterate over each line in the file
for scanner.Scan() {
var txData types.TxData

// Unmarshal the JSON data into the Person struct
if err := amino.UnmarshalJSON(scanner.Bytes(), &txData); err != nil {
t.Fatalf("unable to unmarshal JSON line, %v", err)
}

assert.Equal(t, expectedBlock, txData.BlockNum)
assert.Equal(t, exampleTx, txData.Tx)

expectedBlock++
}

// Check for errors during scanning
if err := scanner.Err(); err != nil {
t.Fatalf("error encountered during scan, %v", err)
}
}
7 changes: 5 additions & 2 deletions backup/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,16 @@ var (
type Config struct {
ToBlock *uint64 // the right bound for the block range; latest if not specified
FromBlock uint64 // the left bound for the block range

Watch bool // flag indicating if incoming tx data should be backed up
}

// DefaultConfig returns the default backup configuration
func DefaultConfig() Config {
return Config{
ToBlock: nil, // to latest block by default
FromBlock: 1, // from genesis + 1 by default
ToBlock: nil, // to latest block by default
FromBlock: 1, // from genesis + 1 by default
Watch: false, // no tracking by default
}
}

Expand Down
9 changes: 9 additions & 0 deletions cmd/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ type backupCfg struct {
fromBlock uint64

overwrite bool
watch bool
}

// newBackupCmd creates the backup command
Expand Down Expand Up @@ -90,6 +91,13 @@ func (c *backupCfg) registerFlags(fs *flag.FlagSet) {
false,
"flag indicating if the output file should be overwritten during backup",
)

fs.BoolVar(
&c.watch,
"watch",
false,
"flag indicating if the backup should append incoming tx data",
)
}

// exec executes the backup command
Expand All @@ -113,6 +121,7 @@ func (c *backupCfg) exec(ctx context.Context, _ []string) error {
// Set up the config
cfg := backup.DefaultConfig()
cfg.FromBlock = c.fromBlock
cfg.Watch = c.watch

if c.toBlock >= 0 {
to64 := uint64(c.toBlock)
Expand Down
15 changes: 12 additions & 3 deletions cmd/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@ var errInvalidInputPath = errors.New("invalid file input path")

// restoreCfg is the restore command configuration
type restoreCfg struct {
inputPath string
remote string
inputPath string
remote string

legacyBackup bool
watch bool
}

// newRestoreCmd creates the restore command
Expand Down Expand Up @@ -63,6 +65,13 @@ func (c *restoreCfg) registerFlags(fs *flag.FlagSet) {
false,
"flag indicating if the input file is legacy amino JSON",
)

fs.BoolVar(
&c.watch,
"watch",
false,
"flag indicating if the restore should watch incoming tx data",
)
}

// exec executes the restore command
Expand Down Expand Up @@ -131,7 +140,7 @@ func (c *restoreCfg) exec(ctx context.Context, _ []string) error {
)

// Run the backup service
if backupErr := service.ExecuteRestore(ctx); backupErr != nil {
if backupErr := service.ExecuteRestore(ctx, c.watch); backupErr != nil {
return fmt.Errorf("unable to execute restore, %w", backupErr)
}

Expand Down
Loading
Loading