Skip to content

Commit

Permalink
CDC initial scan settings (#10536)
Browse files Browse the repository at this point in the history
  • Loading branch information
CyberROFL authored Oct 22, 2024
1 parent 997b273 commit e562a67
Show file tree
Hide file tree
Showing 6 changed files with 42 additions and 2 deletions.
13 changes: 12 additions & 1 deletion ydb/core/protos/config.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1194,6 +1194,17 @@ message TImmediateControlsConfig {
MinValue: 0,
MaxValue: 134217728,
DefaultValue: 0 }];

optional uint64 CdcInitialScanReadAheadLo = 19 [(ControlOptions) = {
Description: "Override for CDC initial scan readahead (low watermark)",
MinValue: 0,
MaxValue: 67108864,
DefaultValue: 0 }];
optional uint64 CdcInitialScanReadAheadHi = 20 [(ControlOptions) = {
Description: "Override for CDC initial scan readahead (high watermark)",
MinValue: 0,
MaxValue: 134217728,
DefaultValue: 0 }];
}

message TTxLimitControls {
Expand Down Expand Up @@ -1658,7 +1669,7 @@ message TSchemeShardConfig {
repeated TInFlightCounterConfig InFlightCounterConfig = 4;

// number of shards per table
optional uint32 MaxCdcInitialScanShardsInFlight = 5 [default = 10];
optional uint32 MaxCdcInitialScanShardsInFlight = 5 [default = 32];
}

message TCompactionConfig {
Expand Down
2 changes: 2 additions & 0 deletions ydb/core/protos/datashard_config.proto
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ message TDataShardConfig {
optional uint64 RestoreReadBufferSizeLimit = 16 [default = 268435456]; // 256 MB
optional string CdcInitialScanTaskName = 17 [default = "cdc_initial_scan"];
optional uint32 CdcInitialScanTaskPriority = 18 [default = 10];
optional uint64 CdcInitialScanReadAheadLo = 22 [default = 524288];
optional uint64 CdcInitialScanReadAheadHi = 23 [default = 1048576];
optional bool DisabledOnSchemeShard = 19 [default = false];
optional uint64 IncrementalRestoreReadAheadLo = 20 [default = 524288];
optional uint64 IncrementalRestoreReadAheadHi = 21 [default = 1048576];
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/tablet/resource_broker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1423,7 +1423,7 @@ NKikimrResourceBroker::TResourceBrokerConfig MakeDefaultConfig()
queue = config.AddQueues();
queue->SetName("queue_cdc_initial_scan");
queue->SetWeight(100);
queue->MutableLimit()->SetCpu(4);
queue->MutableLimit()->SetCpu(2);

queue = config.AddQueues();
queue->SetName("queue_statistics_scan");
Expand Down
11 changes: 11 additions & 0 deletions ydb/core/tx/datashard/cdc_stream_scan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -661,6 +661,16 @@ class TDataShard::TTxCdcStreamScanRun: public TTransactionBase<TDataShard> {
const auto& taskName = appData->DataShardConfig.GetCdcInitialScanTaskName();
const auto taskPrio = appData->DataShardConfig.GetCdcInitialScanTaskPriority();

ui64 readAheadLo = appData->DataShardConfig.GetCdcInitialScanReadAheadLo();
if (ui64 readAheadLoOverride = Self->GetCdcInitialScanReadAheadLoOverride(); readAheadLoOverride > 0) {
readAheadLo = readAheadLoOverride;
}

ui64 readAheadHi = appData->DataShardConfig.GetCdcInitialScanReadAheadHi();
if (ui64 readAheadHiOverride = Self->GetCdcInitialScanReadAheadHiOverride(); readAheadHiOverride > 0) {
readAheadHi = readAheadHiOverride;
}

const auto snapshotVersion = TRowVersion(snapshotKey.Step, snapshotKey.TxId);
Y_ABORT_UNLESS(info->SnapshotVersion == snapshotVersion);

Expand All @@ -673,6 +683,7 @@ class TDataShard::TTxCdcStreamScanRun: public TTransactionBase<TDataShard> {
const ui64 scanId = Self->QueueScan(table->LocalTid, scan.Release(), localTxId,
TScanOptions()
.SetResourceBroker(taskName, taskPrio)
.SetReadAhead(readAheadLo, readAheadHi)
.SetSnapshotRowVersion(snapshotVersion)
);
Self->CdcStreamScanManager.Enqueue(streamPathId, localTxId, scanId);
Expand Down
5 changes: 5 additions & 0 deletions ydb/core/tx/datashard/datashard.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,8 @@ TDataShard::TDataShard(const TActorId &tablet, TTabletStorageInfo *info)
, TtlReadAheadHi(0, 0, 128*1024*1024)
, IncrementalRestoreReadAheadLo(0, 0, 64*1024*1024)
, IncrementalRestoreReadAheadHi(0, 0, 128*1024*1024)
, CdcInitialScanReadAheadLo(0, 0, 64*1024*1024)
, CdcInitialScanReadAheadHi(0, 0, 128*1024*1024)
, EnableLockedWrites(1, 0, 1)
, MaxLockedWritesPerKey(1000, 0, 1000000)
, EnableLeaderLeases(1, 0, 1)
Expand Down Expand Up @@ -328,6 +330,9 @@ void TDataShard::IcbRegister() {
appData->Icb->RegisterSharedControl(IncrementalRestoreReadAheadLo, "DataShardControls.IncrementalRestoreReadAheadLo");
appData->Icb->RegisterSharedControl(IncrementalRestoreReadAheadHi, "DataShardControls.IncrementalRestoreReadAheadHi");

appData->Icb->RegisterSharedControl(CdcInitialScanReadAheadLo, "DataShardControls.CdcInitialScanReadAheadLo");
appData->Icb->RegisterSharedControl(CdcInitialScanReadAheadHi, "DataShardControls.CdcInitialScanReadAheadHi");

IcbRegistered = true;
}
}
Expand Down
11 changes: 11 additions & 0 deletions ydb/core/tx/datashard/datashard_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -1715,6 +1715,14 @@ class TDataShard
return IncrementalRestoreReadAheadHi;
}

ui64 GetCdcInitialScanReadAheadLoOverride() const {
return CdcInitialScanReadAheadLo;
}

ui64 GetCdcInitialScanReadAheadHiOverride() const {
return CdcInitialScanReadAheadHi;
}

bool GetEnableLockedWrites() const {
ui64 value = EnableLockedWrites;
return value != 0;
Expand Down Expand Up @@ -2744,6 +2752,9 @@ class TDataShard
TControlWrapper IncrementalRestoreReadAheadLo;
TControlWrapper IncrementalRestoreReadAheadHi;

TControlWrapper CdcInitialScanReadAheadLo;
TControlWrapper CdcInitialScanReadAheadHi;

TControlWrapper EnableLockedWrites;
TControlWrapper MaxLockedWritesPerKey;

Expand Down

0 comments on commit e562a67

Please sign in to comment.