diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto index c904a86a30d4..da6cdef9a6da 100644 --- a/ydb/core/protos/config.proto +++ b/ydb/core/protos/config.proto @@ -1194,6 +1194,17 @@ message TImmediateControlsConfig { MinValue: 0, MaxValue: 134217728, DefaultValue: 0 }]; + + optional uint64 CdcInitialScanReadAheadLo = 19 [(ControlOptions) = { + Description: "Override for CDC initial scan readahead (low watermark)", + MinValue: 0, + MaxValue: 67108864, + DefaultValue: 0 }]; + optional uint64 CdcInitialScanReadAheadHi = 20 [(ControlOptions) = { + Description: "Override for CDC initial scan readahead (high watermark)", + MinValue: 0, + MaxValue: 134217728, + DefaultValue: 0 }]; } message TTxLimitControls { @@ -1658,7 +1669,7 @@ message TSchemeShardConfig { repeated TInFlightCounterConfig InFlightCounterConfig = 4; // number of shards per table - optional uint32 MaxCdcInitialScanShardsInFlight = 5 [default = 10]; + optional uint32 MaxCdcInitialScanShardsInFlight = 5 [default = 32]; } message TCompactionConfig { diff --git a/ydb/core/protos/datashard_config.proto b/ydb/core/protos/datashard_config.proto index 93975ff7499d..e670812feb2d 100644 --- a/ydb/core/protos/datashard_config.proto +++ b/ydb/core/protos/datashard_config.proto @@ -20,6 +20,8 @@ message TDataShardConfig { optional uint64 RestoreReadBufferSizeLimit = 16 [default = 268435456]; // 256 MB optional string CdcInitialScanTaskName = 17 [default = "cdc_initial_scan"]; optional uint32 CdcInitialScanTaskPriority = 18 [default = 10]; + optional uint64 CdcInitialScanReadAheadLo = 22 [default = 524288]; + optional uint64 CdcInitialScanReadAheadHi = 23 [default = 1048576]; optional bool DisabledOnSchemeShard = 19 [default = false]; optional uint64 IncrementalRestoreReadAheadLo = 20 [default = 524288]; optional uint64 IncrementalRestoreReadAheadHi = 21 [default = 1048576]; diff --git a/ydb/core/tablet/resource_broker.cpp b/ydb/core/tablet/resource_broker.cpp index 260274503f56..0d1e91e1e574 100644 --- a/ydb/core/tablet/resource_broker.cpp +++ b/ydb/core/tablet/resource_broker.cpp @@ -1423,7 +1423,7 @@ NKikimrResourceBroker::TResourceBrokerConfig MakeDefaultConfig() queue = config.AddQueues(); queue->SetName("queue_cdc_initial_scan"); queue->SetWeight(100); - queue->MutableLimit()->SetCpu(4); + queue->MutableLimit()->SetCpu(2); queue = config.AddQueues(); queue->SetName("queue_statistics_scan"); diff --git a/ydb/core/tx/datashard/cdc_stream_scan.cpp b/ydb/core/tx/datashard/cdc_stream_scan.cpp index 9b576b48e398..f4d300ad6cea 100644 --- a/ydb/core/tx/datashard/cdc_stream_scan.cpp +++ b/ydb/core/tx/datashard/cdc_stream_scan.cpp @@ -661,6 +661,16 @@ class TDataShard::TTxCdcStreamScanRun: public TTransactionBase { const auto& taskName = appData->DataShardConfig.GetCdcInitialScanTaskName(); const auto taskPrio = appData->DataShardConfig.GetCdcInitialScanTaskPriority(); + ui64 readAheadLo = appData->DataShardConfig.GetCdcInitialScanReadAheadLo(); + if (ui64 readAheadLoOverride = Self->GetCdcInitialScanReadAheadLoOverride(); readAheadLoOverride > 0) { + readAheadLo = readAheadLoOverride; + } + + ui64 readAheadHi = appData->DataShardConfig.GetCdcInitialScanReadAheadHi(); + if (ui64 readAheadHiOverride = Self->GetCdcInitialScanReadAheadHiOverride(); readAheadHiOverride > 0) { + readAheadHi = readAheadHiOverride; + } + const auto snapshotVersion = TRowVersion(snapshotKey.Step, snapshotKey.TxId); Y_ABORT_UNLESS(info->SnapshotVersion == snapshotVersion); @@ -673,6 +683,7 @@ class TDataShard::TTxCdcStreamScanRun: public TTransactionBase { const ui64 scanId = Self->QueueScan(table->LocalTid, scan.Release(), localTxId, TScanOptions() .SetResourceBroker(taskName, taskPrio) + .SetReadAhead(readAheadLo, readAheadHi) .SetSnapshotRowVersion(snapshotVersion) ); Self->CdcStreamScanManager.Enqueue(streamPathId, localTxId, scanId); diff --git a/ydb/core/tx/datashard/datashard.cpp b/ydb/core/tx/datashard/datashard.cpp index a64424bec69b..889d6387520f 100644 --- a/ydb/core/tx/datashard/datashard.cpp +++ b/ydb/core/tx/datashard/datashard.cpp @@ -151,6 +151,8 @@ TDataShard::TDataShard(const TActorId &tablet, TTabletStorageInfo *info) , TtlReadAheadHi(0, 0, 128*1024*1024) , IncrementalRestoreReadAheadLo(0, 0, 64*1024*1024) , IncrementalRestoreReadAheadHi(0, 0, 128*1024*1024) + , CdcInitialScanReadAheadLo(0, 0, 64*1024*1024) + , CdcInitialScanReadAheadHi(0, 0, 128*1024*1024) , EnableLockedWrites(1, 0, 1) , MaxLockedWritesPerKey(1000, 0, 1000000) , EnableLeaderLeases(1, 0, 1) @@ -328,6 +330,9 @@ void TDataShard::IcbRegister() { appData->Icb->RegisterSharedControl(IncrementalRestoreReadAheadLo, "DataShardControls.IncrementalRestoreReadAheadLo"); appData->Icb->RegisterSharedControl(IncrementalRestoreReadAheadHi, "DataShardControls.IncrementalRestoreReadAheadHi"); + appData->Icb->RegisterSharedControl(CdcInitialScanReadAheadLo, "DataShardControls.CdcInitialScanReadAheadLo"); + appData->Icb->RegisterSharedControl(CdcInitialScanReadAheadHi, "DataShardControls.CdcInitialScanReadAheadHi"); + IcbRegistered = true; } } diff --git a/ydb/core/tx/datashard/datashard_impl.h b/ydb/core/tx/datashard/datashard_impl.h index 9fb1b4d084f6..6ee0b9ce21a8 100644 --- a/ydb/core/tx/datashard/datashard_impl.h +++ b/ydb/core/tx/datashard/datashard_impl.h @@ -1715,6 +1715,14 @@ class TDataShard return IncrementalRestoreReadAheadHi; } + ui64 GetCdcInitialScanReadAheadLoOverride() const { + return CdcInitialScanReadAheadLo; + } + + ui64 GetCdcInitialScanReadAheadHiOverride() const { + return CdcInitialScanReadAheadHi; + } + bool GetEnableLockedWrites() const { ui64 value = EnableLockedWrites; return value != 0; @@ -2744,6 +2752,9 @@ class TDataShard TControlWrapper IncrementalRestoreReadAheadLo; TControlWrapper IncrementalRestoreReadAheadHi; + TControlWrapper CdcInitialScanReadAheadLo; + TControlWrapper CdcInitialScanReadAheadHi; + TControlWrapper EnableLockedWrites; TControlWrapper MaxLockedWritesPerKey;