Skip to content

Commit

Permalink
FMWK-570-backup-restore-state
Browse files Browse the repository at this point in the history
- tersting
  • Loading branch information
filkeith committed Oct 13, 2024
1 parent edfcae2 commit 81d4774
Show file tree
Hide file tree
Showing 12 changed files with 162 additions and 60 deletions.
2 changes: 1 addition & 1 deletion config_backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ func (c *BackupConfig) isStateFirstRun() bool {

// isStateContinueRun checks if we continue backup from a state file.
func (c *BackupConfig) isStateContinue() bool {
return c.StateFile != "" && !c.Continue
return c.StateFile != "" && c.Continue
}

func (c *BackupConfig) isFullBackup() bool {
Expand Down
9 changes: 6 additions & 3 deletions handler_backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,9 +185,9 @@ func (bh *BackupHandler) getEstimateSamples(ctx context.Context, recordsNumber i
scanPolicy.RawCDT = true

nodes := bh.aerospikeClient.GetNodes()
handler := newBackupRecordsHandler(bh.config, bh.aerospikeClient, bh.logger, bh.scanLimiter)
handler := newBackupRecordsHandler(bh.config, bh.aerospikeClient, bh.logger, bh.scanLimiter, bh.state)
readerConfig := handler.recordReaderConfigForNode(nodes, &scanPolicy)
recordReader := aerospike.NewRecordReader(ctx, bh.aerospikeClient, readerConfig, bh.logger)
recordReader := aerospike.NewRecordReader(ctx, bh.aerospikeClient, readerConfig, bh.logger, bh.state.RecordsChan)

// Timestamp processor.
tsProcessor := processors.NewVoidTimeSetter(bh.logger)
Expand Down Expand Up @@ -245,7 +245,7 @@ func (bh *BackupHandler) backupSync(ctx context.Context) error {

writeWorkers := bh.makeWriteWorkers(backupWriters)

handler := newBackupRecordsHandler(bh.config, bh.aerospikeClient, bh.logger, bh.scanLimiter)
handler := newBackupRecordsHandler(bh.config, bh.aerospikeClient, bh.logger, bh.scanLimiter, bh.state)

bh.stats.TotalRecords, err = handler.countRecords(ctx, bh.infoClient)
if err != nil {
Expand Down Expand Up @@ -321,6 +321,7 @@ func (bh *BackupHandler) newConfiguredWriter(ctx context.Context) (io.WriteClose
if bh.state != nil {
suffix = bh.state.getFileSuffix()
}

filename := bh.encoder.GenerateFilename(suffix)

storageWriter, err := bh.writer.NewWriter(ctx, filename)
Expand Down Expand Up @@ -454,6 +455,7 @@ func (bh *BackupHandler) backupSIndexes(
sindexWriteWorker := pipeline.NewWriteWorker(sindexWriter, bh.limiter)

sindexPipeline := pipeline.NewPipeline[*models.Token](
true,
[]pipeline.Worker[*models.Token]{sindexReadWorker},
[]pipeline.Worker[*models.Token]{sindexWriteWorker},
)
Expand All @@ -478,6 +480,7 @@ func (bh *BackupHandler) backupUDFs(
udfWriteWorker := pipeline.NewWriteWorker(udfWriter, bh.limiter)

udfPipeline := pipeline.NewPipeline[*models.Token](
true,
[]pipeline.Worker[*models.Token]{udfReadWorker},
[]pipeline.Worker[*models.Token]{udfWriteWorker},
)
Expand Down
13 changes: 9 additions & 4 deletions handler_backup_records.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,15 @@ type backupRecordsHandler struct {
aerospikeClient AerospikeClient
logger *slog.Logger
scanLimiter *semaphore.Weighted
state *State
}

func newBackupRecordsHandler(
config *BackupConfig,
ac AerospikeClient,
logger *slog.Logger,
scanLimiter *semaphore.Weighted,
state *State,
) *backupRecordsHandler {
logger.Debug("created new backup records handler")

Expand All @@ -52,6 +54,7 @@ func newBackupRecordsHandler(
aerospikeClient: ac,
logger: logger,
scanLimiter: scanLimiter,
state: state,
}

return h
Expand All @@ -72,9 +75,9 @@ func (bh *backupRecordsHandler) run(
processors.NewVoidTimeSetter(bh.logger),
processors.NewTPSLimiter[*models.Token](
ctx, bh.config.RecordsPerSecond),
))
), bh.config.ParallelRead)

return pipeline.NewPipeline(readWorkers, composeProcessor, writers).Run(ctx)
return pipeline.NewPipeline(true, readWorkers, composeProcessor, writers).Run(ctx)
}

func (bh *backupRecordsHandler) countRecords(ctx context.Context, infoClient *asinfo.InfoClient) (uint64, error) {
Expand Down Expand Up @@ -120,7 +123,7 @@ func (bh *backupRecordsHandler) countRecordsUsingScanByPartitions(ctx context.Co
// with this filter.
pf := *bh.config.PartitionFilters[j]
readerConfig := bh.recordReaderConfigForPartitions(&pf, scanPolicy)
recordReader := aerospike.NewRecordReader(ctx, bh.aerospikeClient, readerConfig, bh.logger)
recordReader := aerospike.NewRecordReader(ctx, bh.aerospikeClient, readerConfig, bh.logger, nil)

for {
if _, err := recordReader.Read(); err != nil {
Expand Down Expand Up @@ -162,7 +165,7 @@ func (bh *backupRecordsHandler) countRecordsUsingScanByNodes(ctx context.Context
var count uint64

readerConfig := bh.recordReaderConfigForNode(nodes, scanPolicy)
recordReader := aerospike.NewRecordReader(ctx, bh.aerospikeClient, readerConfig, bh.logger)
recordReader := aerospike.NewRecordReader(ctx, bh.aerospikeClient, readerConfig, bh.logger, nil)

for {
if _, err := recordReader.Read(); err != nil {
Expand Down Expand Up @@ -223,6 +226,7 @@ func (bh *backupRecordsHandler) makeAerospikeReadWorkersForPartition(
bh.aerospikeClient,
recordReaderConfig,
bh.logger,
bh.state.RecordsChan,
)

readWorkers[i] = pipeline.NewReadWorker[*models.Token](recordReader)
Expand Down Expand Up @@ -263,6 +267,7 @@ func (bh *backupRecordsHandler) makeAerospikeReadWorkersForNodes(
bh.aerospikeClient,
recordReaderConfig,
bh.logger,
bh.state.RecordsChan,
)

readWorkers[i] = pipeline.NewReadWorker[*models.Token](recordReader)
Expand Down
14 changes: 11 additions & 3 deletions handler_restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,9 +229,9 @@ func (rh *RestoreHandler) runRestorePipeline(ctx context.Context, readers []pipe
processors.NewChangeNamespace(nsSource, nsDest),
processors.NewExpirationSetter(&rh.stats.RecordsExpired, rh.config.ExtraTTL, rh.logger),
processors.NewTPSLimiter[*models.Token](ctx, rh.config.RecordsPerSecond),
))
), rh.config.Parallel)

return pipeline.NewPipeline(readers, composeProcessor, writeWorkers).Run(ctx)
return pipeline.NewPipeline(true, readers, composeProcessor, writeWorkers).Run(ctx)
}

func (rh *RestoreHandler) useBatchWrites() (bool, error) {
Expand All @@ -244,7 +244,15 @@ func (rh *RestoreHandler) useBatchWrites() (bool, error) {
return infoClient.SupportsBatchWrite()
}

func newTokenWorker(processor processors.TokenProcessor) []pipeline.Worker[*models.Token] {
func newTokenWorker(processor processors.TokenProcessor, parallel int) []pipeline.Worker[*models.Token] {
if parallel > 0 {
workers := make([]pipeline.Worker[*models.Token], 0, parallel)
for i := 0; i < parallel; i++ {
workers = append(workers, pipeline.NewProcessorWorker(processor))
}
return workers
}

return []pipeline.Worker[*models.Token]{
pipeline.NewProcessorWorker(processor),
}
Expand Down
36 changes: 21 additions & 15 deletions io/aerospike/record_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ type RecordReader struct {
logger *slog.Logger
config *RecordReaderConfig
scanResult *recordSets // initialized on first Read() call
stateChan chan<- models.PartitionFilterSerialized
}

// NewRecordReader creates a new RecordReader.
Expand All @@ -103,16 +104,18 @@ func NewRecordReader(
client scanner,
cfg *RecordReaderConfig,
logger *slog.Logger,
stateChan chan<- models.PartitionFilterSerialized,
) *RecordReader {
id := uuid.NewString()
logger = logging.WithReader(logger, id, logging.ReaderTypeRecord)
logger.Debug("created new aerospike record reader")

return &RecordReader{
ctx: ctx,
config: cfg,
client: client,
logger: logger,
ctx: ctx,
config: cfg,
client: client,
logger: logger,
stateChan: stateChan,
}
}

Expand All @@ -127,6 +130,19 @@ func (r *RecordReader) Read() (*models.Token, error) {
r.scanResult = scan
}

var (
pfs models.PartitionFilterSerialized
err error
)
// For indexes and udf, partition filter will be nil.
if r.config.partitionFilter != nil && r.stateChan != nil {
pfs, err = models.NewPartitionFilterSerialized(r.config.partitionFilter)
if err != nil {
return nil, fmt.Errorf("failed to serialize partition filter: %w", err)
}
// r.stateChan <- pfs
}

res, active := <-r.scanResult.Results()
if !active {
r.logger.Debug("scan finished")
Expand All @@ -142,17 +158,7 @@ func (r *RecordReader) Read() (*models.Token, error) {
Record: res.Record,
}

recToken := models.NewRecordToken(&rec, 0, nil)

// For indexes and udf, partition filter will be nil.
if r.config.partitionFilter != nil {
pfs, err := models.NewPartitionFilterSerialized(r.config.partitionFilter)
if err != nil {
return nil, fmt.Errorf("failed to serialize partition filter: %w", err)
}

recToken = models.NewRecordToken(&rec, 0, pfs)
}
recToken := models.NewRecordToken(&rec, 0, pfs)

return recToken, nil
}
Expand Down
48 changes: 46 additions & 2 deletions io/aerospike/result_sets.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,29 @@ package aerospike

import (
"log/slog"
"sync"

a "github.com/aerospike/aerospike-client-go/v7"
"github.com/aerospike/backup-go/internal/util"
"github.com/aerospike/backup-go/models"
)

// recordSets contains multiple Aerospike Recordset objects.
type recordSets struct {
resultsChannel <-chan *a.Result
resultsChannel <-chan *customRecord
logger *slog.Logger
data []*a.Recordset
}

type customRecord struct {
result *a.Result
filter models.PartitionFilterSerialized
}

func newCustomRecord(result *a.Result, filter *a.) *customRecord {

Check failure on line 38 in io/aerospike/result_sets.go

View workflow job for this annotation

GitHub Actions / lint

syntax error: unexpected ), expected name

Check failure on line 38 in io/aerospike/result_sets.go

View workflow job for this annotation

GitHub Actions / lint

syntax error: unexpected ), expected name

}

Check failure on line 40 in io/aerospike/result_sets.go

View workflow job for this annotation

GitHub Actions / lint

syntax error: unexpected } after top level declaration) (typecheck)

Check failure on line 40 in io/aerospike/result_sets.go

View workflow job for this annotation

GitHub Actions / lint

syntax error: unexpected } after top level declaration) (typecheck)

func newRecordSets(data []*a.Recordset, logger *slog.Logger) *recordSets {
resultChannels := make([]<-chan *a.Result, 0, len(data))
for _, recSet := range data {
Expand All @@ -52,6 +63,39 @@ func (r *recordSets) Close() {
}

// Results returns the results channel of the recordSets.
func (r *recordSets) Results() <-chan *a.Result {
func (r *recordSets) Results() <-chan *customRecord {
return r.resultsChannel
}

func MergeResultSets(channels []<-chan *a.Result) <-chan *a.PartitionFilter {
out := make(chan *a.PartitionFilter)

if len(channels) == 0 {
close(out)
return out
}

var wg sync.WaitGroup
// Run an output goroutine for each input channel.
output := func(c <-chan *a.Result) {
for n := range c {
out <- n
}

wg.Done()
}

wg.Add(len(channels))

for _, c := range channels {
go output(c)
}

// Run a goroutine to close out once all the output goroutines are done.
go func() {
wg.Wait()
close(out)
}()

return out
}
2 changes: 1 addition & 1 deletion io/encoding/asb/decode.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ func (r *Decoder) NextToken() (*models.Token, error) {
case *models.UDF:
return models.NewUDFToken(v, size), nil
case *models.Record:
return models.NewRecordToken(v, size, nil), nil
return models.NewRecordToken(v, size, models.PartitionFilterSerialized{}), nil
default:
return nil, fmt.Errorf("unsupported token type %T", v)
}
Expand Down
4 changes: 2 additions & 2 deletions models/data_models.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,11 +102,11 @@ type Token struct {
Type TokenType
Size uint64
// Current filter state.
Filter *PartitionFilterSerialized
Filter PartitionFilterSerialized
}

// NewRecordToken creates a new token with the given record.
func NewRecordToken(r *Record, size uint64, filter *PartitionFilterSerialized) *Token {
func NewRecordToken(r *Record, size uint64, filter PartitionFilterSerialized) *Token {
return &Token{
Record: r,
Type: TokenTypeRecord,
Expand Down
10 changes: 7 additions & 3 deletions models/partition_filter_serialized.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,17 @@ type PartitionFilterSerialized struct {
}

// NewPartitionFilterSerialized serialize *a.PartitionFilter and returns new PartitionFilterSerialized instance.
func NewPartitionFilterSerialized(pf *a.PartitionFilter) (*PartitionFilterSerialized, error) {
func NewPartitionFilterSerialized(pf *a.PartitionFilter) (PartitionFilterSerialized, error) {
if pf == nil || pf.IsDone() {
return PartitionFilterSerialized{}, nil
}

c, err := pf.EncodeCursor()
if err != nil {
return nil, fmt.Errorf("failed to encode cursor: %w", err)
return PartitionFilterSerialized{}, fmt.Errorf("failed to encode cursor: %w", err)
}

return &PartitionFilterSerialized{
return PartitionFilterSerialized{
Begin: pf.Begin,
Count: pf.Count,
Digest: pf.Digest,
Expand Down
Loading

0 comments on commit 81d4774

Please sign in to comment.