Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

schemeshard: split schemeshard__operation_common.cpp, move code from .h #10631

Merged
merged 1 commit into from
Oct 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
781 changes: 472 additions & 309 deletions ydb/core/tx/schemeshard/schemeshard__operation_common.cpp

Large diffs are not rendered by default.

1,347 changes: 43 additions & 1,304 deletions ydb/core/tx/schemeshard/schemeshard__operation_common.h

Large diffs are not rendered by default.

203 changes: 203 additions & 0 deletions ydb/core/tx/schemeshard/schemeshard__operation_common_bsv.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,203 @@
#include "schemeshard__operation_common.h"

#include "schemeshard_private.h"
#include <ydb/core/base/hive.h>
#include <ydb/core/blockstore/core/blockstore.h>


namespace NKikimr::NSchemeShard::NBSVState {

// NBSVState::TConfigureParts
//
TConfigureParts::TConfigureParts(TOperationId id)
: OperationId(id)
{
IgnoreMessages(DebugHint(), {TEvHive::TEvCreateTabletReply::EventType});
}

bool TConfigureParts::HandleReply(TEvBlockStore::TEvUpdateVolumeConfigResponse::TPtr& ev, TOperationContext& context) {
TTabletId ssId = context.SS->SelfTabletId();

LOG_INFO_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
DebugHint() << " HandleReply TEvSetConfigResult"
<< ", at schemeshard: " << ssId);

TTxState* txState = context.SS->FindTx(OperationId);
Y_ABORT_UNLESS(txState);
Y_ABORT_UNLESS(txState->TxType == TTxState::TxCreateBlockStoreVolume || txState->TxType == TTxState::TxAlterBlockStoreVolume);
Y_ABORT_UNLESS(txState->State == TTxState::ConfigureParts);

TTabletId tabletId = TTabletId(ev->Get()->Record.GetOrigin());
NKikimrBlockStore::EStatus status = ev->Get()->Record.GetStatus();

// Schemeshard never sends invalid or outdated configs
Y_VERIFY_S(status == NKikimrBlockStore::OK || status == NKikimrBlockStore::ERROR_UPDATE_IN_PROGRESS,
"Unexpected error in UpdateVolumeConfigResponse,"
<< " status " << NKikimrBlockStore::EStatus_Name(status)
<< " Tx " << OperationId
<< " tablet " << tabletId);

if (status == NKikimrBlockStore::ERROR_UPDATE_IN_PROGRESS) {
LOG_ERROR_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
"BlockStore reconfiguration is in progress. We'll try to finish it later."
<< " Tx " << OperationId
<< " tablet " << tabletId);
return false;
}

TShardIdx idx = context.SS->MustGetShardIdx(tabletId);
txState->ShardsInProgress.erase(idx);

context.OnComplete.UnbindMsgFromPipe(OperationId, tabletId, idx);

if (txState->ShardsInProgress.empty()) {
NIceDb::TNiceDb db(context.GetDB());
context.SS->ChangeTxState(db, OperationId, TTxState::Propose);
context.OnComplete.ActivateTx(OperationId);
return true;
}

return false;
}

bool TConfigureParts::ProgressState(TOperationContext& context) {
TTabletId ssId = context.SS->SelfTabletId();

LOG_INFO_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
DebugHint() << " ProgressState"
<< ", at schemeshard" << ssId);

TTxState* txState = context.SS->FindTx(OperationId);
Y_ABORT_UNLESS(txState);
Y_ABORT_UNLESS(txState->TxType == TTxState::TxCreateBlockStoreVolume || txState->TxType == TTxState::TxAlterBlockStoreVolume);
Y_ABORT_UNLESS(!txState->Shards.empty());

txState->ClearShardsInProgress();

TBlockStoreVolumeInfo::TPtr volume = context.SS->BlockStoreVolumes[txState->TargetPathId];
Y_VERIFY_S(volume, "volume is null. PathId: " << txState->TargetPathId);

ui64 version = volume->AlterVersion;
const auto* volumeConfig = &volume->VolumeConfig;
if (volume->AlterData) {
version = volume->AlterData->AlterVersion;
volumeConfig = &volume->AlterData->VolumeConfig;
}

for (auto shard : txState->Shards) {
if (shard.TabletType == ETabletType::BlockStorePartition ||
shard.TabletType == ETabletType::BlockStorePartition2) {
continue;
}

Y_ABORT_UNLESS(shard.TabletType == ETabletType::BlockStoreVolume);
TShardIdx shardIdx = shard.Idx;
TTabletId tabletId = context.SS->ShardInfos[shardIdx].TabletID;

volume->VolumeTabletId = tabletId;
if (volume->AlterData) {
volume->AlterData->VolumeTabletId = tabletId;
volume->AlterData->VolumeShardIdx = shardIdx;
}

TAutoPtr<TEvBlockStore::TEvUpdateVolumeConfig> event(new TEvBlockStore::TEvUpdateVolumeConfig());
event->Record.SetTxId(ui64(OperationId.GetTxId()));

event->Record.MutableVolumeConfig()->CopyFrom(*volumeConfig);
event->Record.MutableVolumeConfig()->SetVersion(version);

for (const auto& p : volume->Shards) {
const auto& part = p.second;
const auto& partTabletId = context.SS->ShardInfos[p.first].TabletID;
auto info = event->Record.AddPartitions();
info->SetPartitionId(part->PartitionId);
info->SetTabletId(ui64(partTabletId));
}

context.OnComplete.BindMsgToPipe(OperationId, tabletId, shardIdx, event.Release());

// Wait for results from this shard
txState->ShardsInProgress.insert(shardIdx);
}

return false;
}


// NBSVState::TPropose
//
TPropose::TPropose(TOperationId id)
: OperationId(id)
{
IgnoreMessages(DebugHint(), {TEvHive::TEvCreateTabletReply::EventType, TEvBlockStore::TEvUpdateVolumeConfigResponse::EventType});
}

bool TPropose::HandleReply(TEvPrivate::TEvOperationPlan::TPtr& ev, TOperationContext& context) {
TStepId step = TStepId(ev->Get()->StepId);
TTabletId ssId = context.SS->SelfTabletId();

LOG_INFO_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
DebugHint() << " HandleReply TEvOperationPlan"
<< ", at schemeshard: " << ssId);

TTxState* txState = context.SS->FindTx(OperationId);
if (!txState) {
return false;
}
Y_ABORT_UNLESS(txState->TxType == TTxState::TxCreateBlockStoreVolume || txState->TxType == TTxState::TxAlterBlockStoreVolume);

TPathId pathId = txState->TargetPathId;
TPathElement::TPtr path = context.SS->PathsById.at(pathId);

NIceDb::TNiceDb db(context.GetDB());

if (path->StepCreated == InvalidStepId) {
path->StepCreated = step;
context.SS->PersistCreateStep(db, pathId, step);
}

TBlockStoreVolumeInfo::TPtr volume = context.SS->BlockStoreVolumes.at(pathId);

auto oldVolumeSpace = volume->GetVolumeSpace();
volume->FinishAlter();
auto newVolumeSpace = volume->GetVolumeSpace();
// Decrease in occupied space is applied on tx finish
auto domainDir = context.SS->PathsById.at(context.SS->ResolvePathIdForDomain(path));
Y_ABORT_UNLESS(domainDir);
domainDir->ChangeVolumeSpaceCommit(newVolumeSpace, oldVolumeSpace);

context.SS->PersistBlockStoreVolume(db, pathId, volume);
context.SS->PersistRemoveBlockStoreVolumeAlter(db, pathId);

if (txState->TxType == TTxState::TxCreateBlockStoreVolume) {
auto parentDir = context.SS->PathsById.at(path->ParentPathId);
++parentDir->DirAlterVersion;
context.SS->PersistPathDirAlterVersion(db, parentDir);
context.SS->ClearDescribePathCaches(parentDir);
context.OnComplete.PublishToSchemeBoard(OperationId, parentDir->PathId);
}

context.SS->ClearDescribePathCaches(path);
context.OnComplete.PublishToSchemeBoard(OperationId, pathId);

context.SS->ChangeTxState(db, OperationId, TTxState::Done);
return true;
}

bool TPropose::ProgressState(TOperationContext& context) {
TTabletId ssId = context.SS->SelfTabletId();

LOG_INFO_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
DebugHint() << " ProgressState"
<< ", at schemeshard: " << ssId);

TTxState* txState = context.SS->FindTx(OperationId);
Y_ABORT_UNLESS(txState);
Y_ABORT_UNLESS(txState->TxType == TTxState::TxCreateBlockStoreVolume || txState->TxType == TTxState::TxAlterBlockStoreVolume);


context.OnComplete.ProposeToCoordinator(OperationId, txState->TargetPathId, TStepId(0));
return false;
}

} // NKikimr::NSchemeShard::NBSVState
179 changes: 179 additions & 0 deletions ydb/core/tx/schemeshard/schemeshard__operation_common_cdc_stream.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
#include "schemeshard__operation_common.h"

#include "schemeshard_private.h"
#include <ydb/core/base/hive.h>
#include <ydb/core/tx/datashard/datashard.h>


namespace NKikimr::NSchemeShard::NCdcStreamState {

namespace {

bool IsExpectedTxType(TTxState::ETxType txType) {
switch (txType) {
case TTxState::TxCreateCdcStreamAtTable:
case TTxState::TxCreateCdcStreamAtTableWithInitialScan:
case TTxState::TxAlterCdcStreamAtTable:
case TTxState::TxAlterCdcStreamAtTableDropSnapshot:
case TTxState::TxDropCdcStreamAtTable:
case TTxState::TxDropCdcStreamAtTableDropSnapshot:
return true;
default:
return false;
}
}

} // namespace anonymous


// NCdcStreamState::TConfigurePartsAtTable
//
TConfigurePartsAtTable::TConfigurePartsAtTable(TOperationId id)
: OperationId(id)
{
IgnoreMessages(DebugHint(), {});
}

bool TConfigurePartsAtTable::ProgressState(TOperationContext& context) {
LOG_INFO_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
DebugHint() << " ProgressState"
<< ", at schemeshard: " << context.SS->SelfTabletId());

auto* txState = context.SS->FindTx(OperationId);
Y_ABORT_UNLESS(txState);
Y_ABORT_UNLESS(IsExpectedTxType(txState->TxType));
const auto& pathId = txState->TargetPathId;

if (NTableState::CheckPartitioningChangedForTableModification(*txState, context)) {
NTableState::UpdatePartitioningForTableModification(OperationId, *txState, context);
}

NKikimrTxDataShard::TFlatSchemeTransaction tx;
context.SS->FillSeqNo(tx, context.SS->StartRound(*txState));
FillNotice(pathId, tx, context);

txState->ClearShardsInProgress();
Y_ABORT_UNLESS(txState->Shards.size());

for (ui32 i = 0; i < txState->Shards.size(); ++i) {
const auto& idx = txState->Shards[i].Idx;
const auto datashardId = context.SS->ShardInfos[idx].TabletID;
auto ev = context.SS->MakeDataShardProposal(pathId, OperationId, tx.SerializeAsString(), context.Ctx);
context.OnComplete.BindMsgToPipe(OperationId, datashardId, idx, ev.Release());
}

txState->UpdateShardsInProgress(TTxState::ConfigureParts);
return false;
}

bool TConfigurePartsAtTable::HandleReply(TEvDataShard::TEvProposeTransactionResult::TPtr& ev, TOperationContext& context) {
LOG_INFO_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
DebugHint() << " HandleReply " << ev->Get()->ToString()
<< ", at schemeshard: " << context.SS->SelfTabletId());

if (!NTableState::CollectProposeTransactionResults(OperationId, ev, context)) {
return false;
}

return true;
}


// NCdcStreamState::TProposeAtTable
//
TProposeAtTable::TProposeAtTable(TOperationId id)
: OperationId(id)
{
IgnoreMessages(DebugHint(), {TEvDataShard::TEvProposeTransactionResult::EventType});
}

bool TProposeAtTable::ProgressState(TOperationContext& context) {
LOG_INFO_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
DebugHint() << " ProgressState"
<< ", at schemeshard: " << context.SS->SelfTabletId());

const auto* txState = context.SS->FindTx(OperationId);
Y_ABORT_UNLESS(txState);
Y_ABORT_UNLESS(IsExpectedTxType(txState->TxType));

TSet<TTabletId> shardSet;
for (const auto& shard : txState->Shards) {
Y_ABORT_UNLESS(context.SS->ShardInfos.contains(shard.Idx));
shardSet.insert(context.SS->ShardInfos.at(shard.Idx).TabletID);
}

context.OnComplete.ProposeToCoordinator(OperationId, txState->TargetPathId, txState->MinStep, shardSet);
return false;
}

bool TProposeAtTable::HandleReply(TEvPrivate::TEvOperationPlan::TPtr& ev, TOperationContext& context) {
LOG_INFO_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
DebugHint() << " HandleReply TEvOperationPlan"
<< ", step: " << ev->Get()->StepId
<< ", at schemeshard: " << context.SS->SelfTabletId());

const auto* txState = context.SS->FindTx(OperationId);
Y_ABORT_UNLESS(txState);
Y_ABORT_UNLESS(IsExpectedTxType(txState->TxType));
const auto& pathId = txState->TargetPathId;

Y_ABORT_UNLESS(context.SS->PathsById.contains(pathId));
auto path = context.SS->PathsById.at(pathId);

Y_ABORT_UNLESS(context.SS->Tables.contains(pathId));
auto table = context.SS->Tables.at(pathId);

table->AlterVersion += 1;

NIceDb::TNiceDb db(context.GetDB());
context.SS->PersistTableAlterVersion(db, pathId, table);

context.SS->ClearDescribePathCaches(path);
context.OnComplete.PublishToSchemeBoard(OperationId, pathId);

context.SS->ChangeTxState(db, OperationId, TTxState::ProposedWaitParts);
return true;
}

bool TProposeAtTable::HandleReply(TEvDataShard::TEvSchemaChanged::TPtr& ev, TOperationContext& context) {
LOG_INFO_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
DebugHint() << " TEvDataShard::TEvSchemaChanged"
<< " triggers early, save it"
<< ", at schemeshard: " << context.SS->SelfTabletId());

NTableState::CollectSchemaChanged(OperationId, ev, context);
return false;
}


// NCdcStreamState::TProposeAtTableDropSnapshot
//
bool TProposeAtTableDropSnapshot::HandleReply(TEvPrivate::TEvOperationPlan::TPtr& ev, TOperationContext& context) {
TProposeAtTable::HandleReply(ev, context);

const auto* txState = context.SS->FindTx(OperationId);
Y_ABORT_UNLESS(txState);
const auto& pathId = txState->TargetPathId;

Y_ABORT_UNLESS(context.SS->TablesWithSnapshots.contains(pathId));
const auto snapshotTxId = context.SS->TablesWithSnapshots.at(pathId);

auto it = context.SS->SnapshotTables.find(snapshotTxId);
if (it != context.SS->SnapshotTables.end()) {
it->second.erase(pathId);
if (it->second.empty()) {
context.SS->SnapshotTables.erase(it);
}
}

context.SS->SnapshotsStepIds.erase(snapshotTxId);
context.SS->TablesWithSnapshots.erase(pathId);

NIceDb::TNiceDb db(context.GetDB());
context.SS->PersistDropSnapshot(db, snapshotTxId, pathId);

context.SS->TabletCounters->Simple()[COUNTER_SNAPSHOTS_COUNT].Sub(1);
return true;
}

} // NKikimr::NSchemeShard::NCdcStreamState
Loading
Loading