diff --git a/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp b/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp index 46abf76845a6..2eda9ad5a45d 100644 --- a/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp +++ b/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp @@ -471,7 +471,7 @@ class TDqAsyncComputeActor : public TDqComputeActorBaseAddCounters2(ev->Get()->Sensors); } TypeEnv = const_cast(&typeEnv); - FillIoMaps(holderFactory, typeEnv, secureParams, taskParams, readRanges); + FillIoMaps(holderFactory, typeEnv, secureParams, taskParams, readRanges, nullptr); { // say "Hello" to executer diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h index 89fb706d88e3..e1054a5183d0 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h @@ -1441,19 +1441,12 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped const NKikimr::NMiniKQL::TTypeEnvironment& typeEnv, const THashMap& secureParams, const THashMap& taskParams, - const TVector& readRanges) + const TVector& readRanges, + IRandomProvider* randomProvider + ) { - if (TaskRunner) { - for (auto& [channelId, channel] : InputChannelsMap) { - channel.Channel = TaskRunner->GetInputChannel(channelId); - } - } auto collectStatsLevel = StatsModeToCollectStatsLevel(RuntimeSettings.StatsMode); for (auto& [inputIndex, source] : SourcesMap) { - if constexpr (!TDerived::HasAsyncTaskRunner) { - source.Buffer = TaskRunner->GetSource(inputIndex); - Y_ABORT_UNLESS(source.Buffer); - } Y_ABORT_UNLESS(AsyncIoFactory); const auto& inputDesc = Task.GetInputs(inputIndex); Y_ABORT_UNLESS(inputDesc.HasSource()); @@ -1487,9 +1480,8 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped this->RegisterWithSameMailbox(source.Actor); } for (auto& [inputIndex, transform] : InputTransformsMap) { - if constexpr (!TDerived::HasAsyncTaskRunner) { - transform.ProgramBuilder.ConstructInPlace(TaskRunner->GetTypeEnv(), *FunctionRegistry); - std::tie(transform.InputBuffer, transform.Buffer) = TaskRunner->GetInputTransform(inputIndex); + Y_ABORT_UNLESS(TaskRunner); + transform.ProgramBuilder.ConstructInPlace(typeEnv, *FunctionRegistry); Y_ABORT_UNLESS(AsyncIoFactory); const auto& inputDesc = Task.GetInputs(inputIndex); CA_LOG_D("Create transform for input " << inputIndex << " " << inputDesc.ShortDebugString()); @@ -1515,43 +1507,33 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped throw yexception() << "Failed to create input transform " << inputDesc.GetTransform().GetType() << ": " << ex.what(); } this->RegisterWithSameMailbox(transform.Actor); - } - } - if (TaskRunner) { - for (auto& [channelId, channel] : OutputChannelsMap) { - channel.Channel = TaskRunner->GetOutputChannel(channelId); - } } for (auto& [outputIndex, transform] : OutputTransformsMap) { - if (TaskRunner) { - transform.ProgramBuilder.ConstructInPlace(TaskRunner->GetTypeEnv(), *FunctionRegistry); - std::tie(transform.Buffer, transform.OutputBuffer) = TaskRunner->GetOutputTransform(outputIndex); - Y_ABORT_UNLESS(AsyncIoFactory); - const auto& outputDesc = Task.GetOutputs(outputIndex); - CA_LOG_D("Create transform for output " << outputIndex << " " << outputDesc.ShortDebugString()); - try { - std::tie(transform.AsyncOutput, transform.Actor) = AsyncIoFactory->CreateDqOutputTransform( - IDqAsyncIoFactory::TOutputTransformArguments { - .OutputDesc = outputDesc, - .OutputIndex = outputIndex, - .StatsLevel = collectStatsLevel, - .TxId = TxId, - .TransformOutput = transform.OutputBuffer, - .Callback = static_cast(this), - .SecureParams = secureParams, - .TaskParams = taskParams, - .TypeEnv = typeEnv, - .HolderFactory = holderFactory, - .ProgramBuilder = *transform.ProgramBuilder - }); - } catch (const std::exception& ex) { - throw yexception() << "Failed to create output transform " << outputDesc.GetTransform().GetType() << ": " << ex.what(); - } - this->RegisterWithSameMailbox(transform.Actor); + transform.ProgramBuilder.ConstructInPlace(typeEnv, *FunctionRegistry); + Y_ABORT_UNLESS(AsyncIoFactory); + const auto& outputDesc = Task.GetOutputs(outputIndex); + CA_LOG_D("Create transform for output " << outputIndex << " " << outputDesc.ShortDebugString()); + try { + std::tie(transform.AsyncOutput, transform.Actor) = AsyncIoFactory->CreateDqOutputTransform( + IDqAsyncIoFactory::TOutputTransformArguments { + .OutputDesc = outputDesc, + .OutputIndex = outputIndex, + .StatsLevel = collectStatsLevel, + .TxId = TxId, + .TransformOutput = transform.OutputBuffer, + .Callback = static_cast(this), + .SecureParams = secureParams, + .TaskParams = taskParams, + .TypeEnv = typeEnv, + .HolderFactory = holderFactory, + .ProgramBuilder = *transform.ProgramBuilder + }); + } catch (const std::exception& ex) { + throw yexception() << "Failed to create output transform " << outputDesc.GetTransform().GetType() << ": " << ex.what(); } + this->RegisterWithSameMailbox(transform.Actor); } for (auto& [outputIndex, sink] : SinksMap) { - if (TaskRunner) { sink.Buffer = TaskRunner->GetSink(outputIndex); } Y_ABORT_UNLESS(AsyncIoFactory); const auto& outputDesc = Task.GetOutputs(outputIndex); Y_ABORT_UNLESS(outputDesc.HasSink()); @@ -1569,7 +1551,7 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped .TaskParams = taskParams, .TypeEnv = typeEnv, .HolderFactory = holderFactory, - .RandomProvider = TaskRunner ? TaskRunner->GetRandomProvider() : nullptr + .RandomProvider = randomProvider }); } catch (const std::exception& ex) { throw yexception() << "Failed to create sink " << outputDesc.GetSink().GetType() << ": " << ex.what(); diff --git a/ydb/library/yql/dq/actors/compute/dq_sync_compute_actor_base.h b/ydb/library/yql/dq/actors/compute/dq_sync_compute_actor_base.h index efb0b3c05905..f999357be5d4 100644 --- a/ydb/library/yql/dq/actors/compute/dq_sync_compute_actor_base.h +++ b/ydb/library/yql/dq/actors/compute/dq_sync_compute_actor_base.h @@ -89,12 +89,39 @@ class TDqSyncComputeActorBase: public TDqComputeActorBaseTaskRunner->Prepare(this->Task, limits, execCtx); + for (auto& [channelId, channel] : this->InputChannelsMap) { + channel.Channel = this->TaskRunner->GetInputChannel(channelId); + } + + for (auto& [inputIndex, source] : this->SourcesMap) { + source.Buffer = this->TaskRunner->GetSource(inputIndex); + Y_ABORT_UNLESS(source.Buffer); + } + + for (auto& [inputIndex, transform] : this->InputTransformsMap) { + std::tie(transform.InputBuffer, transform.Buffer) = this->TaskRunner->GetInputTransform(inputIndex); + } + + for (auto& [channelId, channel] : this->OutputChannelsMap) { + channel.Channel = this->TaskRunner->GetOutputChannel(channelId); + } + + for (auto& [outputIndex, transform] : this->OutputTransformsMap) { + std::tie(transform.Buffer, transform.OutputBuffer) = this->TaskRunner->GetOutputTransform(outputIndex); + } + + for (auto& [outputIndex, sink] : this->SinksMap) { + sink.Buffer = this->TaskRunner->GetSink(outputIndex); + } + TBase::FillIoMaps( - this->TaskRunner->GetHolderFactory(), - this->TaskRunner->GetTypeEnv(), - this->TaskRunner->GetSecureParams(), - this->TaskRunner->GetTaskParams(), - this->TaskRunner->GetReadRanges()); + this->TaskRunner->GetHolderFactory(), + this->TaskRunner->GetTypeEnv(), + this->TaskRunner->GetSecureParams(), + this->TaskRunner->GetTaskParams(), + this->TaskRunner->GetReadRanges(), + this->TaskRunner->GetRandomProvider() + ); } };