diff --git a/velox/common/base/Counters.cpp b/velox/common/base/Counters.cpp index ffdd3823d7b8..4b8dbf5156b2 100644 --- a/velox/common/base/Counters.cpp +++ b/velox/common/base/Counters.cpp @@ -523,6 +523,12 @@ void registerVeloxMetrics() { DEFINE_METRIC( kMetricFileWriterEarlyFlushedRawBytes, facebook::velox::StatType::SUM); + // The distribution of the amount of time spent on hive sort writer finish + // call in range of [0, 120s] with 60 buckets. It is configured to report the + // latency at P50, P90, P99, and P100 percentiles. + DEFINE_HISTOGRAM_METRIC( + kMetricHiveSortWriterFinishTimeMs, 2'000, 0, 120'000, 50, 90, 99, 100); + // The current spilling memory usage in bytes. DEFINE_METRIC(kMetricSpillMemoryBytes, facebook::velox::StatType::AVG); diff --git a/velox/common/base/Counters.h b/velox/common/base/Counters.h index f2fb63128a75..0d26052bc929 100644 --- a/velox/common/base/Counters.h +++ b/velox/common/base/Counters.h @@ -162,6 +162,9 @@ constexpr folly::StringPiece kMetricSpillPeakMemoryBytes{ constexpr folly::StringPiece kMetricFileWriterEarlyFlushedRawBytes{ "velox.file_writer_early_flushed_raw_bytes"}; +constexpr folly::StringPiece kMetricHiveSortWriterFinishTimeMs{ + "velox.hive_sort_writer_finish_time_ms"}; + constexpr folly::StringPiece kMetricArbitratorRequestsCount{ "velox.arbitrator_requests_count"}; diff --git a/velox/connectors/Connector.h b/velox/connectors/Connector.h index c11fefa5a567..565e64fdb363 100644 --- a/velox/connectors/Connector.h +++ b/velox/connectors/Connector.h @@ -170,8 +170,12 @@ class DataSink { /// TODO maybe at some point we want to make it async. virtual void appendData(RowVectorPtr input) = 0; - /// Returns the stats of this data sink. - virtual Stats stats() const = 0; + /// Called after all data has been added via possibly multiple calls to + /// appendData() This function finishes the data procesing like sort all the + /// added data and write them to the file writer. The finish might take long + /// time so it returns false to yield in the middle of processing. The + /// function returns true if it has processed all data. This call is blocking. + virtual bool finish() = 0; /// Called once after all data has been added via possibly multiple calls to /// appendData(). The function returns the metadata of written data in string @@ -181,6 +185,9 @@ class DataSink { /// Called to abort this data sink object and we don't expect any appendData() /// calls on an aborted data sink object. virtual void abort() = 0; + + /// Returns the stats of this data sink. + virtual Stats stats() const = 0; }; class DataSource { diff --git a/velox/connectors/hive/HiveConfig.cpp b/velox/connectors/hive/HiveConfig.cpp index 2b76305a4699..2e7046819e8a 100644 --- a/velox/connectors/hive/HiveConfig.cpp +++ b/velox/connectors/hive/HiveConfig.cpp @@ -314,6 +314,13 @@ uint64_t HiveConfig::sortWriterMaxOutputBytes( config::CapacityUnit::BYTE); } +uint64_t HiveConfig::sortWriterFinishTimeSliceLimitMs( + const config::ConfigBase* session) const { + return session->get( + kSortWriterFinishTimeSliceLimitMsSession, + config_->get(kSortWriterFinishTimeSliceLimitMs, 5'000)); +} + uint64_t HiveConfig::footerEstimatedSize() const { return config_->get(kFooterEstimatedSize, 1UL << 20); } diff --git a/velox/connectors/hive/HiveConfig.h b/velox/connectors/hive/HiveConfig.h index 6bb7b624a549..2888125b2167 100644 --- a/velox/connectors/hive/HiveConfig.h +++ b/velox/connectors/hive/HiveConfig.h @@ -240,6 +240,13 @@ class HiveConfig { static constexpr const char* kSortWriterMaxOutputBytesSession = "sort_writer_max_output_bytes"; + /// Sort Writer will exit finish() method after this many milliseconds even if + /// it has not completed its work yet. Zero means no time limit. + static constexpr const char* kSortWriterFinishTimeSliceLimitMs = + "sort-writer_finish_time_slice_limit_ms"; + static constexpr const char* kSortWriterFinishTimeSliceLimitMsSession = + "sort_writer_finish_time_slice_limit_ms"; + static constexpr const char* kS3UseProxyFromEnv = "hive.s3.use-proxy-from-env"; @@ -355,6 +362,9 @@ class HiveConfig { uint64_t sortWriterMaxOutputBytes(const config::ConfigBase* session) const; + uint64_t sortWriterFinishTimeSliceLimitMs( + const config::ConfigBase* session) const; + uint64_t footerEstimatedSize() const; uint64_t filePreloadThreshold() const; diff --git a/velox/connectors/hive/HiveDataSink.cpp b/velox/connectors/hive/HiveDataSink.cpp index 6baba20b14e7..b68917bbd63c 100644 --- a/velox/connectors/hive/HiveDataSink.cpp +++ b/velox/connectors/hive/HiveDataSink.cpp @@ -38,6 +38,9 @@ using facebook::velox::common::testutil::TestValue; namespace facebook::velox::connector::hive { namespace { +#define WRITER_NON_RECLAIMABLE_SECTION_GUARD(index) \ + memory::NonReclaimableSectionGuard nonReclaimableGuard( \ + writerInfo_[(index)]->nonReclaimableSectionHolder.get()) // Returns the type of non-partition data columns. RowTypePtr getNonPartitionTypes( @@ -181,10 +184,17 @@ std::shared_ptr createSortPool( return writerPool->addLeafChild(fmt::format("{}.sort", writerPool->name())); } -#define WRITER_NON_RECLAIMABLE_SECTION_GUARD(index) \ - memory::NonReclaimableSectionGuard nonReclaimableGuard( \ - writerInfo_[(index)]->nonReclaimableSectionHolder.get()) - +uint64_t getFinishTimeSliceLimitMsFromHiveConfig( + const std::shared_ptr& config, + const config::ConfigBase* sessions) { + const uint64_t flushTimeSliceLimitMsFromConfig = + config->sortWriterFinishTimeSliceLimitMs(sessions); + // NOTE: if the flush time slice limit is set to 0, then we treat it as no + // limit. + return flushTimeSliceLimitMsFromConfig == 0 + ? std::numeric_limits::max() + : flushTimeSliceLimitMsFromConfig; +} } // namespace const HiveWriterId& HiveWriterId::unpartitionedId() { @@ -388,7 +398,10 @@ HiveDataSink::HiveDataSink( : nullptr), writerFactory_(dwio::common::getWriterFactory( insertTableHandle_->tableStorageFormat())), - spillConfig_(connectorQueryCtx->spillConfig()) { + spillConfig_(connectorQueryCtx->spillConfig()), + sortWriterFinishTimeSliceLimitMs_(getFinishTimeSliceLimitMsFromHiveConfig( + hiveConfig_, + connectorQueryCtx->sessionProperties())) { if (isBucketed()) { VELOX_USER_CHECK_LT( bucketCount_, maxBucketCount(), "bucketCount exceeds the limit"); @@ -482,6 +495,8 @@ std::string HiveDataSink::stateString(State state) { switch (state) { case State::kRunning: return "RUNNING"; + case State::kFinishing: + return "FLUSHING"; case State::kClosed: return "CLOSED"; case State::kAborted: @@ -578,23 +593,52 @@ void HiveDataSink::setState(State newState) { void HiveDataSink::checkStateTransition(State oldState, State newState) { switch (oldState) { case State::kRunning: - if (newState == State::kAborted || newState == State::kClosed) { + if (newState == State::kAborted || newState == State::kFinishing) { return; } break; - case State::kAborted: + case State::kFinishing: + if (newState == State::kAborted || newState == State::kClosed || + // The finishing state is reentry state if we yield in the middle of + // finish processing if a single run takes too long. + newState == State::kFinishing) { + return; + } [[fallthrough]]; + case State::kAborted: case State::kClosed: - [[fallthrough]]; default: break; } VELOX_FAIL("Unexpected state transition from {} to {}", oldState, newState); } +bool HiveDataSink::finish() { + // Flush is reentry state. + setState(State::kFinishing); + + // As for now, only sorted writer needs flush buffered data. For non-sorted + // writer, data is directly written to the underlying file writer. + if (!sortWrite()) { + return true; + } + + // TODO: we might refactor to move the data sorting logic into hive data sink. + const uint64_t startTimeMs = getCurrentTimeMs(); + for (auto i = 0; i < writers_.size(); ++i) { + WRITER_NON_RECLAIMABLE_SECTION_GUARD(i); + if (!writers_[i]->finish()) { + return false; + } + if (getCurrentTimeMs() - startTimeMs > sortWriterFinishTimeSliceLimitMs_) { + return false; + } + } + return true; +} + std::vector HiveDataSink::close() { - checkRunning(); - state_ = State::kClosed; + setState(State::kClosed); closeInternal(); std::vector partitionUpdates; @@ -629,13 +673,13 @@ std::vector HiveDataSink::close() { } void HiveDataSink::abort() { - checkRunning(); - state_ = State::kAborted; + setState(State::kAborted); closeInternal(); } void HiveDataSink::closeInternal() { VELOX_CHECK_NE(state_, State::kRunning); + VELOX_CHECK_NE(state_, State::kFinishing); TestValue::adjust( "facebook::velox::connector::hive::HiveDataSink::closeInternal", this); @@ -799,7 +843,8 @@ HiveDataSink::maybeCreateBucketSortWriter( hiveConfig_->sortWriterMaxOutputRows( connectorQueryCtx_->sessionProperties()), hiveConfig_->sortWriterMaxOutputBytes( - connectorQueryCtx_->sessionProperties())); + connectorQueryCtx_->sessionProperties()), + sortWriterFinishTimeSliceLimitMs_); } HiveWriterId HiveDataSink::getWriterId(size_t row) const { diff --git a/velox/connectors/hive/HiveDataSink.h b/velox/connectors/hive/HiveDataSink.h index a30b68f4d2fd..72269131f29f 100644 --- a/velox/connectors/hive/HiveDataSink.h +++ b/velox/connectors/hive/HiveDataSink.h @@ -429,6 +429,20 @@ class HiveDataSink : public DataSink { /// The list of runtime stats reported by hive data sink static constexpr const char* kEarlyFlushedRawBytes = "earlyFlushedRawBytes"; + /// Defines the execution states of a hive data sink running internally. + enum class State { + /// The data sink accepts new append data in this state. + kRunning = 0, + /// The data sink flushes any buffered data to the underlying file writer + /// but no more data can be appended. + kFinishing = 1, + /// The data sink is aborted on error and no more data can be appended. + kAborted = 2, + /// The data sink is closed on error and no more data can be appended. + kClosed = 3 + }; + static std::string stateString(State state); + HiveDataSink( RowTypePtr inputType, std::shared_ptr insertTableHandle, @@ -443,6 +457,8 @@ class HiveDataSink : public DataSink { void appendData(RowVectorPtr input) override; + bool finish() override; + Stats stats() const override; std::vector close() override; @@ -452,12 +468,6 @@ class HiveDataSink : public DataSink { bool canReclaim() const; private: - enum class State { kRunning = 0, kAborted = 1, kClosed = 2 }; - friend struct fmt::formatter< - facebook::velox::connector::hive::HiveDataSink::State>; - - static std::string stateString(State state); - // Validates the state transition from 'oldState' to 'newState'. void checkStateTransition(State oldState, State newState); void setState(State newState); @@ -588,6 +598,7 @@ class HiveDataSink : public DataSink { const std::unique_ptr bucketFunction_; const std::shared_ptr writerFactory_; const common::SpillConfig* const spillConfig_; + const uint64_t sortWriterFinishTimeSliceLimitMs_{0}; std::vector sortColumnIndices_; std::vector sortCompareFlags_; @@ -619,15 +630,22 @@ class HiveDataSink : public DataSink { std::vector bucketIds_; }; +FOLLY_ALWAYS_INLINE std::ostream& operator<<( + std::ostream& os, + HiveDataSink::State state) { + os << HiveDataSink::stateString(state); + return os; +} } // namespace facebook::velox::connector::hive template <> struct fmt::formatter - : formatter { + : formatter { auto format( facebook::velox::connector::hive::HiveDataSink::State s, format_context& ctx) const { - return formatter::format(static_cast(s), ctx); + return formatter::format( + facebook::velox::connector::hive::HiveDataSink::stateString(s), ctx); } }; diff --git a/velox/connectors/hive/tests/HiveConfigTest.cpp b/velox/connectors/hive/tests/HiveConfigTest.cpp index d40cc3a58293..511191176612 100644 --- a/velox/connectors/hive/tests/HiveConfigTest.cpp +++ b/velox/connectors/hive/tests/HiveConfigTest.cpp @@ -69,6 +69,8 @@ TEST(HiveConfigTest, defaultConfig) { ASSERT_EQ(hiveConfig.sortWriterMaxOutputRows(emptySession.get()), 1024); ASSERT_EQ( hiveConfig.sortWriterMaxOutputBytes(emptySession.get()), 10UL << 20); + ASSERT_EQ( + hiveConfig.sortWriterFinishTimeSliceLimitMs(emptySession.get()), 5'000); ASSERT_EQ(hiveConfig.isPartitionPathAsLowerCase(emptySession.get()), true); ASSERT_EQ(hiveConfig.allowNullPartitionKeys(emptySession.get()), true); ASSERT_EQ(hiveConfig.orcWriterMinCompressionSize(emptySession.get()), 1024); @@ -109,6 +111,7 @@ TEST(HiveConfigTest, overrideConfig) { {HiveConfig::kOrcWriterStringDictionaryEncodingEnabled, "false"}, {HiveConfig::kSortWriterMaxOutputRows, "100"}, {HiveConfig::kSortWriterMaxOutputBytes, "100MB"}, + {HiveConfig::kSortWriterFinishTimeSliceLimitMs, "400"}, {HiveConfig::kOrcWriterLinearStripeSizeHeuristics, "false"}, {HiveConfig::kOrcWriterMinCompressionSize, "512"}, {HiveConfig::kOrcWriterCompressionLevel, "1"}, @@ -159,6 +162,8 @@ TEST(HiveConfigTest, overrideConfig) { ASSERT_EQ(hiveConfig.sortWriterMaxOutputRows(emptySession.get()), 100); ASSERT_EQ( hiveConfig.sortWriterMaxOutputBytes(emptySession.get()), 100UL << 20); + ASSERT_EQ( + hiveConfig.sortWriterFinishTimeSliceLimitMs(emptySession.get()), 400); ASSERT_EQ(hiveConfig.orcWriterMinCompressionSize(emptySession.get()), 512); ASSERT_EQ(hiveConfig.orcWriterCompressionLevel(emptySession.get()), 1); ASSERT_EQ( @@ -180,6 +185,7 @@ TEST(HiveConfigTest, overrideSession) { {HiveConfig::kOrcWriterStringDictionaryEncodingEnabledSession, "false"}, {HiveConfig::kSortWriterMaxOutputRowsSession, "20"}, {HiveConfig::kSortWriterMaxOutputBytesSession, "20MB"}, + {HiveConfig::kSortWriterFinishTimeSliceLimitMsSession, "300"}, {HiveConfig::kPartitionPathAsLowerCaseSession, "false"}, {HiveConfig::kAllowNullPartitionKeysSession, "false"}, {HiveConfig::kIgnoreMissingFilesSession, "true"}, @@ -227,6 +233,7 @@ TEST(HiveConfigTest, overrideSession) { false); ASSERT_EQ(hiveConfig.sortWriterMaxOutputRows(session.get()), 20); ASSERT_EQ(hiveConfig.sortWriterMaxOutputBytes(session.get()), 20UL << 20); + ASSERT_EQ(hiveConfig.sortWriterFinishTimeSliceLimitMs(session.get()), 300); ASSERT_EQ(hiveConfig.isPartitionPathAsLowerCase(session.get()), false); ASSERT_EQ(hiveConfig.allowNullPartitionKeys(session.get()), false); ASSERT_EQ(hiveConfig.ignoreMissingFiles(session.get()), true); diff --git a/velox/connectors/hive/tests/HiveDataSinkTest.cpp b/velox/connectors/hive/tests/HiveDataSinkTest.cpp index 2fde0ec88758..c182129f062b 100644 --- a/velox/connectors/hive/tests/HiveDataSinkTest.cpp +++ b/velox/connectors/hive/tests/HiveDataSinkTest.cpp @@ -221,7 +221,8 @@ class HiveDataSinkTest : public exec::test::HiveConnectorTestBase { RowTypePtr rowType_; std::shared_ptr connectorSessionProperties_ = std::make_shared( - std::unordered_map()); + std::unordered_map(), + /*mutable=*/true); std::unique_ptr connectorQueryCtx_; std::shared_ptr connectorConfig_ = std::make_shared(std::make_shared( @@ -522,7 +523,8 @@ TEST_F(HiveDataSinkTest, basic) { ASSERT_FALSE(stats.empty()); ASSERT_GT(stats.numWrittenBytes, 0); ASSERT_EQ(stats.numWrittenFiles, 0); - + ASSERT_TRUE(dataSink->finish()); + ASSERT_TRUE(dataSink->finish()); const auto partitions = dataSink->close(); stats = dataSink->stats(); ASSERT_FALSE(stats.empty()); @@ -544,6 +546,8 @@ TEST_F(HiveDataSinkTest, basicBucket) { std::vector>{ std::make_shared( "c1", core::SortOrder{false, false})}); + connectorSessionProperties_->set( + HiveConfig::kSortWriterFinishTimeSliceLimitMsSession, "1"); auto dataSink = createDataSink( rowType_, outputDirectory->getPath(), @@ -570,7 +574,10 @@ TEST_F(HiveDataSinkTest, basicBucket) { ASSERT_FALSE(stats.empty()); ASSERT_GT(stats.numWrittenBytes, 0); ASSERT_EQ(stats.numWrittenFiles, 0); - + VELOX_ASSERT_THROW( + dataSink->close(), "Unexpected state transition from RUNNING to CLOSED"); + while (!dataSink->finish()) { + } const auto partitions = dataSink->close(); stats = dataSink->stats(); ASSERT_FALSE(stats.empty()); @@ -594,12 +601,16 @@ TEST_F(HiveDataSinkTest, close) { } else { ASSERT_EQ(dataSink->stats().numWrittenBytes, 0); } + ASSERT_TRUE(dataSink->finish()); const auto partitions = dataSink->close(); // Can't append after close. VELOX_ASSERT_THROW( dataSink->appendData(vectors.back()), "Hive data sink is not running"); - VELOX_ASSERT_THROW(dataSink->close(), "Hive data sink is not running"); - VELOX_ASSERT_THROW(dataSink->abort(), "Hive data sink is not running"); + VELOX_ASSERT_THROW( + dataSink->close(), "Unexpected state transition from CLOSED to CLOSED"); + VELOX_ASSERT_THROW( + dataSink->abort(), + "Unexpected state transition from CLOSED to ABORTED"); const auto stats = dataSink->stats(); if (!empty) { @@ -634,8 +645,12 @@ TEST_F(HiveDataSinkTest, abort) { const auto stats = dataSink->stats(); ASSERT_TRUE(stats.empty()); // Can't close after abort. - VELOX_ASSERT_THROW(dataSink->close(), "Hive data sink is not running"); - VELOX_ASSERT_THROW(dataSink->abort(), "Hive data sink is not running"); + VELOX_ASSERT_THROW( + dataSink->close(), + "Unexpected state transition from ABORTED to CLOSED"); + VELOX_ASSERT_THROW( + dataSink->abort(), + "Unexpected state transition from ABORTED to ABORTED"); // Can't append after abort. VELOX_ASSERT_THROW( dataSink->appendData(vectors.back()), "Hive data sink is not running"); @@ -781,6 +796,8 @@ DEBUG_ONLY_TEST_F(HiveDataSinkTest, memoryReclaim) { memory::memoryManager()->arbitrator()->stats(); ASSERT_EQ(curStats.reclaimedUsedBytes - oldStats.reclaimedUsedBytes, 0); } + while (!dataSink->finish()) { + } const auto partitions = dataSink->close(); if (testData.sortWriter && testData.expectedWriterReclaimed) { ASSERT_FALSE(dataSink->stats().spillStats.empty()); @@ -900,6 +917,7 @@ TEST_F(HiveDataSinkTest, memoryReclaimAfterClose) { dataSink->appendData(vectors[i]); } if (testData.close) { + ASSERT_TRUE(dataSink->finish()); const auto partitions = dataSink->close(); ASSERT_GE(partitions.size(), 1); } else { @@ -932,6 +950,116 @@ TEST_F(HiveDataSinkTest, memoryReclaimAfterClose) { } } +DEBUG_ONLY_TEST_F(HiveDataSinkTest, sortWriterAbortDuringFinish) { + const auto outputDirectory = TempDirectoryPath::create(); + const int32_t numBuckets = 4; + auto bucketProperty = std::make_shared( + HiveBucketProperty::Kind::kHiveCompatible, + numBuckets, + std::vector{"c0"}, + std::vector{BIGINT()}, + std::vector>{ + std::make_shared( + "c1", core::SortOrder{false, false})}); + connectorSessionProperties_->set( + HiveConfig::kSortWriterFinishTimeSliceLimitMsSession, "1"); + connectorSessionProperties_->set( + HiveConfig::kSortWriterMaxOutputRowsSession, "100"); + auto dataSink = createDataSink( + rowType_, + outputDirectory->getPath(), + dwio::common::FileFormat::DWRF, + {}, + bucketProperty); + const int numBatches{10}; + const auto vectors = createVectors(500, numBatches); + for (const auto& vector : vectors) { + dataSink->appendData(vector); + } + + std::atomic_int injectCount{0}; + SCOPED_TESTVALUE_SET( + "facebook::velox::dwrf::Writer::write", + std::function([&](dwrf::Writer* /*unused*/) { + std::this_thread::sleep_for(std::chrono::seconds(1)); + })); + + for (int i = 0;; ++i) { + ASSERT_FALSE(dataSink->finish()); + if (i == 2) { + dataSink->abort(); + break; + } + } + const auto stats = dataSink->stats(); + ASSERT_TRUE(stats.empty()); +} + +TEST_F(HiveDataSinkTest, sortWriterMemoryReclaimDuringFinish) { + const auto outputDirectory = TempDirectoryPath::create(); + const int32_t numBuckets = 4; + auto bucketProperty = std::make_shared( + HiveBucketProperty::Kind::kHiveCompatible, + numBuckets, + std::vector{"c0"}, + std::vector{BIGINT()}, + std::vector>{ + std::make_shared( + "c1", core::SortOrder{false, false})}); + std::shared_ptr spillDirectory = + exec::test::TempDirectoryPath::create(); + std::unique_ptr spillConfig = + getSpillConfig(spillDirectory->getPath(), 1); + connectorSessionProperties_->set( + HiveConfig::kSortWriterFinishTimeSliceLimitMsSession, "1"); + connectorSessionProperties_->set( + HiveConfig::kSortWriterMaxOutputRowsSession, "100"); + auto connectorQueryCtx = std::make_unique( + opPool_.get(), + connectorPool_.get(), + connectorSessionProperties_.get(), + spillConfig.get(), + exec::test::defaultPrefixSortConfig(), + nullptr, + nullptr, + "query.HiveDataSinkTest", + "task.HiveDataSinkTest", + "planNodeId.HiveDataSinkTest", + 0, + ""); + setConnectorQueryContext(std::move(connectorQueryCtx)); + auto dataSink = createDataSink( + rowType_, + outputDirectory->getPath(), + dwio::common::FileFormat::DWRF, + {}, + bucketProperty); + const int numBatches{10}; + const auto vectors = createVectors(500, numBatches); + for (const auto& vector : vectors) { + dataSink->appendData(vector); + } + + for (int i = 0; !dataSink->finish(); ++i) { + if (i == 2) { + ASSERT_GT(root_->reclaimableBytes().value(), 0); + const memory::MemoryArbitrator::Stats prevStats = + memory::memoryManager()->arbitrator()->stats(); + memory::testingRunArbitration(); + memory::MemoryArbitrator::Stats curStats = + memory::memoryManager()->arbitrator()->stats(); + ASSERT_GT(curStats.reclaimedUsedBytes - prevStats.reclaimedUsedBytes, 0); + } + } + const auto partitions = dataSink->close(); + const auto stats = dataSink->stats(); + ASSERT_FALSE(stats.empty()); + ASSERT_EQ(partitions.size(), numBuckets); + + createDuckDbTable(vectors); + verifyWrittenData(outputDirectory->getPath(), numBuckets); +} + DEBUG_ONLY_TEST_F(HiveDataSinkTest, sortWriterFailureTest) { auto vectors = createVectors(500, 10); @@ -981,7 +1109,7 @@ DEBUG_ONLY_TEST_F(HiveDataSinkTest, sortWriterFailureTest) { std::function( [&](memory::MemoryPool* pool) { VELOX_FAIL("inject failure"); })); - VELOX_ASSERT_THROW(dataSink->close(), "inject failure"); + VELOX_ASSERT_THROW(dataSink->finish(), "inject failure"); } TEST_F(HiveDataSinkTest, insertTableHandleToString) { @@ -1026,6 +1154,7 @@ TEST_F(HiveDataSinkTest, flushPolicyWithParquet) { for (const auto& vector : vectors) { dataSink->appendData(vector); } + ASSERT_TRUE(dataSink->finish()); dataSink->close(); dwio::common::ReaderOptions readerOpts{pool_.get()}; @@ -1063,6 +1192,7 @@ TEST_F(HiveDataSinkTest, flushPolicyWithDWRF) { for (const auto& vector : vectors) { dataSink->appendData(vector); } + ASSERT_TRUE(dataSink->finish()); dataSink->close(); dwio::common::ReaderOptions readerOpts{pool_.get()}; @@ -1073,8 +1203,8 @@ TEST_F(HiveDataSinkTest, flushPolicyWithDWRF) { auto reader = std::make_unique( readerOpts, std::move(bufferedInput)); - EXPECT_EQ(reader->getNumberOfStripes(), 10); - EXPECT_EQ(reader->getRowsPerStripe()[0], 500); + ASSERT_EQ(reader->getNumberOfStripes(), 10); + ASSERT_EQ(reader->getRowsPerStripe()[0], 500); } } // namespace diff --git a/velox/docs/monitoring/metrics.rst b/velox/docs/monitoring/metrics.rst index 9126466d3586..b61d95d43c7a 100644 --- a/velox/docs/monitoring/metrics.rst +++ b/velox/docs/monitoring/metrics.rst @@ -544,3 +544,10 @@ Hive Connector - The distribution of hive file open latency in range of [0, 100s] with 10 buckets. It is configured to report latency at P50, P90, P99, and P100 percentiles. + * - hive_sort_writer_finish_time_ms + - Histogram + - The distribution of hive sort writer finish processing time slice in range + of[0, 120s] with 60 buckets. It is configured to report latency at P50, + P90, P99, and P100 percentiles. + + diff --git a/velox/dwio/common/SortingWriter.cpp b/velox/dwio/common/SortingWriter.cpp index 0243db048bbc..00bab4ee5360 100644 --- a/velox/dwio/common/SortingWriter.cpp +++ b/velox/dwio/common/SortingWriter.cpp @@ -17,15 +17,16 @@ #include "velox/dwio/common/SortingWriter.h" namespace facebook::velox::dwio::common { - SortingWriter::SortingWriter( std::unique_ptr writer, std::unique_ptr sortBuffer, vector_size_t maxOutputRowsConfig, - uint64_t maxOutputBytesConfig) + uint64_t maxOutputBytesConfig, + uint64_t outputTimeSliceLimitMs) : outputWriter_(std::move(writer)), maxOutputRowsConfig_(maxOutputRowsConfig), maxOutputBytesConfig_(maxOutputBytesConfig), + finishTimeSliceLimitMs_(outputTimeSliceLimitMs), sortPool_(sortBuffer->pool()), canReclaim_(sortBuffer->canSpill()), sortBuffer_(std::move(sortBuffer)) { @@ -51,19 +52,44 @@ void SortingWriter::flush() { outputWriter_->flush(); } -void SortingWriter::close() { - setState(State::kClosed); +bool SortingWriter::finish() { + const uint64_t startTimeMs = getCurrentTimeMs(); + SCOPE_EXIT { + const uint64_t flushTimeMs = getCurrentTimeMs() - startTimeMs; + if (flushTimeMs != 0) { + RECORD_HISTOGRAM_METRIC_VALUE( + kMetricHiveSortWriterFinishTimeMs, flushTimeMs); + } + }; + if (isRunning()) { + sortBuffer_->noMoreInput(); + setState(State::kFinishing); + } + if (sortBuffer_ == nullptr) { + return true; + } - sortBuffer_->noMoreInput(); const auto maxOutputBatchRows = outputBatchRows(); - RowVectorPtr output = sortBuffer_->getOutput(maxOutputBatchRows); - while (output != nullptr) { - outputWriter_->write(output); + RowVectorPtr output{nullptr}; + do { + if (getCurrentTimeMs() - startTimeMs > finishTimeSliceLimitMs_) { + return false; + } output = sortBuffer_->getOutput(maxOutputBatchRows); - } + if (output != nullptr) { + outputWriter_->write(output); + } + } while (output != nullptr); sortBuffer_.reset(); sortPool_->release(); + return true; +} + +void SortingWriter::close() { + VELOX_CHECK(isFinishing()); + setState(State::kClosed); + VELOX_CHECK_NULL(sortBuffer_); outputWriter_->close(); } @@ -86,7 +112,7 @@ uint64_t SortingWriter::reclaim( return 0; } - if (!isRunning()) { + if (!isRunning() && !isFinishing()) { LOG(WARNING) << "Can't reclaim from a not running hive sort writer pool: " << sortPool_->name() << ", state: " << state() << "used memory: " << succinctBytes(sortPool_->usedBytes()) diff --git a/velox/dwio/common/SortingWriter.h b/velox/dwio/common/SortingWriter.h index d7b70f09032d..a7edd69ed8e2 100644 --- a/velox/dwio/common/SortingWriter.h +++ b/velox/dwio/common/SortingWriter.h @@ -29,14 +29,17 @@ class SortingWriter : public Writer { std::unique_ptr writer, std::unique_ptr sortBuffer, vector_size_t maxOutputRowsConfig, - uint64_t maxOutputBytesConfig); + uint64_t maxOutputBytesConfig, + uint64_t outputTimeSliceLimitMs); ~SortingWriter() override; void write(const VectorPtr& data) override; + bool finish() override; + /// No action because we need to accumulate all data and sort before data can - /// be flushed + /// be flushed. void flush() override; void close() override; @@ -78,6 +81,7 @@ class SortingWriter : public Writer { const std::unique_ptr outputWriter_; const vector_size_t maxOutputRowsConfig_; const uint64_t maxOutputBytesConfig_; + const uint64_t finishTimeSliceLimitMs_; memory::MemoryPool* const sortPool_; const bool canReclaim_; diff --git a/velox/dwio/common/Writer.cpp b/velox/dwio/common/Writer.cpp index 52ff1848f0a0..87951cad7c59 100644 --- a/velox/dwio/common/Writer.cpp +++ b/velox/dwio/common/Writer.cpp @@ -26,7 +26,18 @@ void Writer::checkStateTransition(State oldState, State newState) { } break; case State::kRunning: - if (newState == State::kAborted || newState == State::kClosed) { + // NOTE: some physical file writer might switch from kRunning to kClosed + // directly as there is nothing to do with finish() call. + if (newState == State::kAborted || newState == State::kClosed || + newState == State::kFinishing) { + return; + } + break; + case State::kFinishing: + if (newState == State::kAborted || newState == State::kClosed || + // NOTE: the finishing state is reentry state as we could yield in the + // middle of long running finish processing. + newState == State::kFinishing) { return; } break; @@ -49,6 +60,8 @@ std::string Writer::stateString(State state) { return "INIT"; case State::kRunning: return "RUNNING"; + case State::kFinishing: + return "FINISHING"; case State::kClosed: return "CLOSED"; case State::kAborted: @@ -62,10 +75,14 @@ bool Writer::isRunning() const { return state_ == State::kRunning; } +bool Writer::isFinishing() const { + return state_ == State::kFinishing; +} + void Writer::checkRunning() const { VELOX_CHECK_EQ( - static_cast(state_), - static_cast(State::kRunning), + state_, + State::kRunning, "Writer is not running: {}", Writer::stateString(state_)); } diff --git a/velox/dwio/common/Writer.h b/velox/dwio/common/Writer.h index 011e9f156162..774aafe4c9a8 100644 --- a/velox/dwio/common/Writer.h +++ b/velox/dwio/common/Writer.h @@ -37,8 +37,9 @@ class Writer { enum class State { kInit = 0, kRunning = 1, - kAborted = 2, - kClosed = 3, + kFinishing = 2, + kAborted = 3, + kClosed = 4, }; static std::string stateString(State state); @@ -56,8 +57,22 @@ class Writer { /// Does not close the writer. virtual void flush() = 0; - /// Invokes flush and closes the writer. - /// Data can no longer be written. + /// Invokes to finish the writing before close call. For logical writer like + /// hive sorting writer which is built on top of a physical file writer, it + /// sorts the buffered data and flush them to the physical file writer. This + /// process might take very long time so we allow to yield in the middle of + /// this process in favor of the other concurrent running threads in a query + /// system. It returns false if the finish process needs to continue, + /// otherwise true. Ihis should be called repeatedly when it returns false + /// until it returns true. Data can no longer be written after the first + /// finish call. + /// + /// NOTE: this must be called before close(). + virtual bool finish() = 0; + + /// Invokes closes the writer. Data can no longer be written. + /// + /// NOTE: this must be called after the last finish() which returns true. virtual void close() = 0; /// Aborts the writing by closing the writer and dropping everything. @@ -66,6 +81,7 @@ class Writer { protected: bool isRunning() const; + bool isFinishing() const; void checkRunning() const; @@ -86,3 +102,14 @@ FOLLY_ALWAYS_INLINE std::ostream& operator<<( } } // namespace facebook::velox::dwio::common + +template <> +struct fmt::formatter + : formatter { + auto format( + facebook::velox::dwio::common::Writer::State s, + format_context& ctx) const { + return formatter::format( + facebook::velox::dwio::common::Writer::stateString(s), ctx); + } +}; diff --git a/velox/dwio/common/tests/WriterTest.cpp b/velox/dwio/common/tests/WriterTest.cpp index b35f1ba09fbf..51e68de59d08 100644 --- a/velox/dwio/common/tests/WriterTest.cpp +++ b/velox/dwio/common/tests/WriterTest.cpp @@ -16,6 +16,7 @@ #include "velox/dwio/common/Writer.h" #include +#include "velox/common/base/tests/GTestUtils.h" using namespace ::testing; @@ -25,7 +26,10 @@ TEST(WriterTest, stateString) { ASSERT_EQ(Writer::stateString(Writer::State::kInit), "INIT"); ASSERT_EQ(Writer::stateString(Writer::State::kRunning), "RUNNING"); ASSERT_EQ(Writer::stateString(Writer::State::kClosed), "CLOSED"); + ASSERT_EQ(Writer::stateString(Writer::State::kFinishing), "FINISHING"); ASSERT_EQ(Writer::stateString(Writer::State::kAborted), "ABORTED"); + VELOX_ASSERT_THROW( + Writer::stateString(static_cast(100)), "BAD STATE: 100"); } } // namespace } // namespace facebook::velox::dwio::common diff --git a/velox/dwio/dwrf/writer/Writer.h b/velox/dwio/dwrf/writer/Writer.h index 56ea96088fbc..8cc82303e10f 100644 --- a/velox/dwio/dwrf/writer/Writer.h +++ b/velox/dwio/dwrf/writer/Writer.h @@ -76,6 +76,10 @@ class Writer : public dwio::common::Writer { // Forces the writer to flush, does not close the writer. virtual void flush() override; + virtual bool finish() override { + return true; + } + virtual void close() override; virtual void abort() override; diff --git a/velox/dwio/parquet/writer/Writer.h b/velox/dwio/parquet/writer/Writer.h index ce98acb64fab..26969a59d52f 100644 --- a/velox/dwio/parquet/writer/Writer.h +++ b/velox/dwio/parquet/writer/Writer.h @@ -150,6 +150,10 @@ class Writer : public dwio::common::Writer { // Forces a row group boundary before the data added by next write(). void newRowGroup(int32_t numRows); + bool finish() override { + return true; + } + // Closes 'this', After close, data can no longer be added and the completed // Parquet file is flushed into 'sink' provided at construction. 'sink' stays // live until destruction of 'this'. diff --git a/velox/exec/Driver.cpp b/velox/exec/Driver.cpp index ff0cd8bba365..30844a7d5ffc 100644 --- a/velox/exec/Driver.cpp +++ b/velox/exec/Driver.cpp @@ -1086,7 +1086,10 @@ StopReason Driver::blockDriver( future.valid(), "The operator {} is blocked but blocking future is not valid", op->operatorType()); - + VELOX_CHECK_NE(blockingReason_, BlockingReason::kNotBlocked); + if (blockingReason_ == BlockingReason::kYield) { + recordYieldCount(); + } blockedOperatorId_ = blockedOperatorId; blockingState = std::make_shared( self, std::move(future), op, blockingReason_); diff --git a/velox/exec/Driver.h b/velox/exec/Driver.h index 38cae64d0cb7..5854ffe310f1 100644 --- a/velox/exec/Driver.h +++ b/velox/exec/Driver.h @@ -757,6 +757,16 @@ DriverThreadContext* driverThreadContext(); } // namespace facebook::velox::exec +template <> +struct fmt::formatter + : formatter { + auto format(facebook::velox::exec::BlockingReason b, format_context& ctx) + const { + return formatter::format( + facebook::velox::exec::blockingReasonToString(b), ctx); + } +}; + template <> struct fmt::formatter : formatter { diff --git a/velox/exec/TableWriter.cpp b/velox/exec/TableWriter.cpp index 2ce07af95cd1..3d289621f38a 100644 --- a/velox/exec/TableWriter.cpp +++ b/velox/exec/TableWriter.cpp @@ -115,6 +115,13 @@ std::vector TableWriter::closeDataSink() { return dataSink_->close(); } +bool TableWriter::finishDataSink() { + // We only expect finish on a non-closed data sink. + VELOX_CHECK(!closed_); + VELOX_CHECK_NOT_NULL(dataSink_); + return dataSink_->finish(); +} + void TableWriter::addInput(RowVectorPtr input) { if (input->size() == 0) { return; @@ -150,6 +157,14 @@ void TableWriter::noMoreInput() { } } +BlockingReason TableWriter::isBlocked(ContinueFuture* future) { + if (blockingFuture_.valid()) { + *future = std::move(blockingFuture_); + return blockingReason_; + } + return BlockingReason::kNotBlocked; +} + RowVectorPtr TableWriter::getOutput() { // Making sure the output is read only once after the write is fully done. if (!noMoreInput_ || finished_) { @@ -165,6 +180,12 @@ RowVectorPtr TableWriter::getOutput() { pool()); } + if (!finishDataSink()) { + blockingReason_ = BlockingReason::kYield; + blockingFuture_ = ContinueFuture{folly::Unit{}}; + return nullptr; + } + finished_ = true; const std::vector fragments = closeDataSink(); updateStats(dataSink_->stats()); diff --git a/velox/exec/TableWriter.h b/velox/exec/TableWriter.h index 7d8cf245c0f9..b188019362fb 100644 --- a/velox/exec/TableWriter.h +++ b/velox/exec/TableWriter.h @@ -104,9 +104,7 @@ class TableWriter : public Operator { DriverCtx* driverCtx, const std::shared_ptr& tableWriteNode); - BlockingReason isBlocked(ContinueFuture* /* future */) override { - return BlockingReason::kNotBlocked; - } + BlockingReason isBlocked(ContinueFuture* future) override; void initialize() override; @@ -192,6 +190,8 @@ class TableWriter : public Operator { void createDataSink(); + bool finishDataSink(); + std::vector closeDataSink(); void abortDataSink(); @@ -215,6 +215,10 @@ class TableWriter : public Operator { std::vector inputMapping_; std::shared_ptr mappedType_; + // The blocking future might be set when finish data sink. + ContinueFuture blockingFuture_{ContinueFuture::makeEmpty()}; + BlockingReason blockingReason_{BlockingReason::kNotBlocked}; + bool finished_{false}; bool closed_{false}; vector_size_t numWrittenRows_{0}; diff --git a/velox/exec/tests/TableWriteTest.cpp b/velox/exec/tests/TableWriteTest.cpp index e10b08854b01..4a0b0a4890af 100644 --- a/velox/exec/tests/TableWriteTest.cpp +++ b/velox/exec/tests/TableWriteTest.cpp @@ -3399,6 +3399,93 @@ DEBUG_ONLY_TEST_P(BucketSortOnlyTableWriterTest, outputBatchRows) { } } +DEBUG_ONLY_TEST_P(BucketSortOnlyTableWriterTest, yield) { + auto rowType = + ROW({"c0", "p0", "c1", "c3", "c4", "c5"}, + {VARCHAR(), BIGINT(), INTEGER(), REAL(), DOUBLE(), VARCHAR()}); + std::vector partitionKeys = {"p0"}; + + // Partition vector is constant vector. + std::vector vectors = makeBatches(1, [&](auto) { + return makeRowVector( + rowType->names(), + {makeFlatVector( + 1'000, + [&](auto row) { + return StringView::makeInline(fmt::format("str_{}", row)); + }), + makeConstant((int64_t)365, 1'000), + makeConstant((int32_t)365, 1'000), + makeFlatVector(1'000, [&](auto row) { return row + 33.23; }), + makeFlatVector(1'000, [&](auto row) { return row + 33.23; }), + makeFlatVector(1'000, [&](auto row) { + return StringView::makeInline(fmt::format("bucket_{}", row * 3)); + })}); + }); + createDuckDbTable(vectors); + + struct { + uint64_t flushTimeSliceLimitMs; + bool expectedYield; + + std::string debugString() const { + return fmt::format( + "flushTimeSliceLimitMs: {}, expectedYield: {}", + flushTimeSliceLimitMs, + expectedYield); + } + } testSettings[] = {{0, false}, {1, true}, {10'000, false}}; + + for (const auto& testData : testSettings) { + SCOPED_TRACE(testData.debugString()); + std::atomic_bool injectDelayOnce{true}; + SCOPED_TESTVALUE_SET( + "facebook::velox::dwrf::Writer::write", + std::function([&](dwrf::Writer* /*unused*/) { + if (!injectDelayOnce.exchange(false)) { + return; + } + std::this_thread::sleep_for(std::chrono::seconds(2)); + })); + createDuckDbTable(vectors); + + auto outputDirectory = TempDirectoryPath::create(); + auto plan = createInsertPlan( + PlanBuilder().values({vectors}), + rowType, + outputDirectory->getPath(), + partitionKeys, + bucketProperty_, + compressionKind_, + 1, + connector::hive::LocationHandle::TableType::kNew, + commitStrategy_); + const int prevYieldCount = Driver::yieldCount(); + const std::shared_ptr task = + AssertQueryBuilder(plan, duckDbQueryRunner_) + .config(QueryConfig::kTaskWriterCount, std::to_string(1)) + .connectorSessionProperty( + kHiveConnectorId, + HiveConfig::kSortWriterFinishTimeSliceLimitMsSession, + folly::to(testData.flushTimeSliceLimitMs)) + .connectorSessionProperty( + kHiveConnectorId, + HiveConfig::kSortWriterMaxOutputRowsSession, + folly::to(100)) + .connectorSessionProperty( + kHiveConnectorId, + HiveConfig::kSortWriterMaxOutputBytesSession, + folly::to("1KB")) + .assertResults("SELECT count(*) FROM tmp"); + auto stats = task->taskStats().pipelineStats.front().operatorStats; + if (testData.expectedYield) { + ASSERT_GT(Driver::yieldCount(), prevYieldCount); + } else { + ASSERT_EQ(Driver::yieldCount(), prevYieldCount); + } + } +} + VELOX_INSTANTIATE_TEST_SUITE_P( TableWriterTest, UnpartitionedTableWriterTest,