Skip to content

Commit

Permalink
Support yield in bucket sort table write getout to prevent stuck driv…
Browse files Browse the repository at this point in the history
…er detection (#11229)

Summary:
Pull Request resolved: #11229

Support yield in the middle of sort writer get output processing to prevent stuck driver detection as well
as friendly to other concurrent running queries or threads. We found in production that the long running get
output from sort writer can trigger alerts as it does sort, potential read spilled data from remote storage
and, encode and flush to remote storage through file writer. This can take hour in case of a small bucket
table which only has 64 buckets such as only 64 threads in the whole cluster for running the query.

This PR adds finish API to data sink and file writer for table writer to do incremental sort and flush processing.
The data sink finish API call each file writer's finish API and both check the configured finish time slice limit
which are configured through a hive config. Both API returns false if finish needs continue processing or true
when finishes. Correspondingly, when table writer get output it returns null if finish data sink has more work
to do and set the ready block future and yield reason for driver framework to check and yield.

This PR also changes data sink and file writer interface with a new finish state. A new hive config
added for finish time slice limit. The driver framework adds to report the yield from a operator which
currently only reports the yield metric when the yield is triggered by the driver framework itself. A new
histogram metric is added to track the sort writer finish time distribution to monitoring

bypass-github-export-checks

Reviewed By: Yuhta, spershin, oerling

Differential Revision: D64159781

fbshipit-source-id: c13ca478a4e3c444b44e6f059cd82b76131299a2
  • Loading branch information
xiaoxmeng authored and facebook-github-bot committed Oct 12, 2024
1 parent 6cc4152 commit b00751e
Show file tree
Hide file tree
Showing 22 changed files with 507 additions and 56 deletions.
6 changes: 6 additions & 0 deletions velox/common/base/Counters.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -523,6 +523,12 @@ void registerVeloxMetrics() {
DEFINE_METRIC(
kMetricFileWriterEarlyFlushedRawBytes, facebook::velox::StatType::SUM);

// The distribution of the amount of time spent on hive sort writer finish
// call in range of [0, 120s] with 60 buckets. It is configured to report the
// latency at P50, P90, P99, and P100 percentiles.
DEFINE_HISTOGRAM_METRIC(
kMetricHiveSortWriterFinishTimeMs, 2'000, 0, 120'000, 50, 90, 99, 100);

// The current spilling memory usage in bytes.
DEFINE_METRIC(kMetricSpillMemoryBytes, facebook::velox::StatType::AVG);

Expand Down
3 changes: 3 additions & 0 deletions velox/common/base/Counters.h
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,9 @@ constexpr folly::StringPiece kMetricSpillPeakMemoryBytes{
constexpr folly::StringPiece kMetricFileWriterEarlyFlushedRawBytes{
"velox.file_writer_early_flushed_raw_bytes"};

constexpr folly::StringPiece kMetricHiveSortWriterFinishTimeMs{
"velox.hive_sort_writer_finish_time_ms"};

constexpr folly::StringPiece kMetricArbitratorRequestsCount{
"velox.arbitrator_requests_count"};

Expand Down
11 changes: 9 additions & 2 deletions velox/connectors/Connector.h
Original file line number Diff line number Diff line change
Expand Up @@ -170,8 +170,12 @@ class DataSink {
/// TODO maybe at some point we want to make it async.
virtual void appendData(RowVectorPtr input) = 0;

/// Returns the stats of this data sink.
virtual Stats stats() const = 0;
/// Called after all data has been added via possibly multiple calls to
/// appendData() This function finishes the data procesing like sort all the
/// added data and write them to the file writer. The finish might take long
/// time so it returns false to yield in the middle of processing. The
/// function returns true if it has processed all data. This call is blocking.
virtual bool finish() = 0;

/// Called once after all data has been added via possibly multiple calls to
/// appendData(). The function returns the metadata of written data in string
Expand All @@ -181,6 +185,9 @@ class DataSink {
/// Called to abort this data sink object and we don't expect any appendData()
/// calls on an aborted data sink object.
virtual void abort() = 0;

/// Returns the stats of this data sink.
virtual Stats stats() const = 0;
};

class DataSource {
Expand Down
7 changes: 7 additions & 0 deletions velox/connectors/hive/HiveConfig.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,13 @@ uint64_t HiveConfig::sortWriterMaxOutputBytes(
config::CapacityUnit::BYTE);
}

uint64_t HiveConfig::sortWriterFinishTimeSliceLimitMs(
const config::ConfigBase* session) const {
return session->get<uint64_t>(
kSortWriterFinishTimeSliceLimitMsSession,
config_->get<uint64_t>(kSortWriterFinishTimeSliceLimitMs, 5'000));
}

uint64_t HiveConfig::footerEstimatedSize() const {
return config_->get<uint64_t>(kFooterEstimatedSize, 1UL << 20);
}
Expand Down
10 changes: 10 additions & 0 deletions velox/connectors/hive/HiveConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,13 @@ class HiveConfig {
static constexpr const char* kSortWriterMaxOutputBytesSession =
"sort_writer_max_output_bytes";

/// Sort Writer will exit finish() method after this many milliseconds even if
/// it has not completed its work yet. Zero means no time limit.
static constexpr const char* kSortWriterFinishTimeSliceLimitMs =
"sort-writer_finish_time_slice_limit_ms";
static constexpr const char* kSortWriterFinishTimeSliceLimitMsSession =
"sort_writer_finish_time_slice_limit_ms";

static constexpr const char* kS3UseProxyFromEnv =
"hive.s3.use-proxy-from-env";

Expand Down Expand Up @@ -355,6 +362,9 @@ class HiveConfig {

uint64_t sortWriterMaxOutputBytes(const config::ConfigBase* session) const;

uint64_t sortWriterFinishTimeSliceLimitMs(
const config::ConfigBase* session) const;

uint64_t footerEstimatedSize() const;

uint64_t filePreloadThreshold() const;
Expand Down
71 changes: 58 additions & 13 deletions velox/connectors/hive/HiveDataSink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ using facebook::velox::common::testutil::TestValue;

namespace facebook::velox::connector::hive {
namespace {
#define WRITER_NON_RECLAIMABLE_SECTION_GUARD(index) \
memory::NonReclaimableSectionGuard nonReclaimableGuard( \
writerInfo_[(index)]->nonReclaimableSectionHolder.get())

// Returns the type of non-partition data columns.
RowTypePtr getNonPartitionTypes(
Expand Down Expand Up @@ -181,10 +184,17 @@ std::shared_ptr<memory::MemoryPool> createSortPool(
return writerPool->addLeafChild(fmt::format("{}.sort", writerPool->name()));
}

#define WRITER_NON_RECLAIMABLE_SECTION_GUARD(index) \
memory::NonReclaimableSectionGuard nonReclaimableGuard( \
writerInfo_[(index)]->nonReclaimableSectionHolder.get())

uint64_t getFinishTimeSliceLimitMsFromHiveConfig(
const std::shared_ptr<const HiveConfig>& config,
const config::ConfigBase* sessions) {
const uint64_t flushTimeSliceLimitMsFromConfig =
config->sortWriterFinishTimeSliceLimitMs(sessions);
// NOTE: if the flush time slice limit is set to 0, then we treat it as no
// limit.
return flushTimeSliceLimitMsFromConfig == 0
? std::numeric_limits<uint64_t>::max()
: flushTimeSliceLimitMsFromConfig;
}
} // namespace

const HiveWriterId& HiveWriterId::unpartitionedId() {
Expand Down Expand Up @@ -388,7 +398,10 @@ HiveDataSink::HiveDataSink(
: nullptr),
writerFactory_(dwio::common::getWriterFactory(
insertTableHandle_->tableStorageFormat())),
spillConfig_(connectorQueryCtx->spillConfig()) {
spillConfig_(connectorQueryCtx->spillConfig()),
sortWriterFinishTimeSliceLimitMs_(getFinishTimeSliceLimitMsFromHiveConfig(
hiveConfig_,
connectorQueryCtx->sessionProperties())) {
if (isBucketed()) {
VELOX_USER_CHECK_LT(
bucketCount_, maxBucketCount(), "bucketCount exceeds the limit");
Expand Down Expand Up @@ -482,6 +495,8 @@ std::string HiveDataSink::stateString(State state) {
switch (state) {
case State::kRunning:
return "RUNNING";
case State::kFinishing:
return "FLUSHING";
case State::kClosed:
return "CLOSED";
case State::kAborted:
Expand Down Expand Up @@ -578,23 +593,52 @@ void HiveDataSink::setState(State newState) {
void HiveDataSink::checkStateTransition(State oldState, State newState) {
switch (oldState) {
case State::kRunning:
if (newState == State::kAborted || newState == State::kClosed) {
if (newState == State::kAborted || newState == State::kFinishing) {
return;
}
break;
case State::kAborted:
case State::kFinishing:
if (newState == State::kAborted || newState == State::kClosed ||
// The finishing state is reentry state if we yield in the middle of
// finish processing if a single run takes too long.
newState == State::kFinishing) {
return;
}
[[fallthrough]];
case State::kAborted:
case State::kClosed:
[[fallthrough]];
default:
break;
}
VELOX_FAIL("Unexpected state transition from {} to {}", oldState, newState);
}

bool HiveDataSink::finish() {
// Flush is reentry state.
setState(State::kFinishing);

// As for now, only sorted writer needs flush buffered data. For non-sorted
// writer, data is directly written to the underlying file writer.
if (!sortWrite()) {
return true;
}

// TODO: we might refactor to move the data sorting logic into hive data sink.
const uint64_t startTimeMs = getCurrentTimeMs();
for (auto i = 0; i < writers_.size(); ++i) {
WRITER_NON_RECLAIMABLE_SECTION_GUARD(i);
if (!writers_[i]->finish()) {
return false;
}
if (getCurrentTimeMs() - startTimeMs > sortWriterFinishTimeSliceLimitMs_) {
return false;
}
}
return true;
}

std::vector<std::string> HiveDataSink::close() {
checkRunning();
state_ = State::kClosed;
setState(State::kClosed);
closeInternal();

std::vector<std::string> partitionUpdates;
Expand Down Expand Up @@ -629,13 +673,13 @@ std::vector<std::string> HiveDataSink::close() {
}

void HiveDataSink::abort() {
checkRunning();
state_ = State::kAborted;
setState(State::kAborted);
closeInternal();
}

void HiveDataSink::closeInternal() {
VELOX_CHECK_NE(state_, State::kRunning);
VELOX_CHECK_NE(state_, State::kFinishing);

TestValue::adjust(
"facebook::velox::connector::hive::HiveDataSink::closeInternal", this);
Expand Down Expand Up @@ -799,7 +843,8 @@ HiveDataSink::maybeCreateBucketSortWriter(
hiveConfig_->sortWriterMaxOutputRows(
connectorQueryCtx_->sessionProperties()),
hiveConfig_->sortWriterMaxOutputBytes(
connectorQueryCtx_->sessionProperties()));
connectorQueryCtx_->sessionProperties()),
sortWriterFinishTimeSliceLimitMs_);
}

HiveWriterId HiveDataSink::getWriterId(size_t row) const {
Expand Down
34 changes: 26 additions & 8 deletions velox/connectors/hive/HiveDataSink.h
Original file line number Diff line number Diff line change
Expand Up @@ -429,6 +429,20 @@ class HiveDataSink : public DataSink {
/// The list of runtime stats reported by hive data sink
static constexpr const char* kEarlyFlushedRawBytes = "earlyFlushedRawBytes";

/// Defines the execution states of a hive data sink running internally.
enum class State {
/// The data sink accepts new append data in this state.
kRunning = 0,
/// The data sink flushes any buffered data to the underlying file writer
/// but no more data can be appended.
kFinishing = 1,
/// The data sink is aborted on error and no more data can be appended.
kAborted = 2,
/// The data sink is closed on error and no more data can be appended.
kClosed = 3
};
static std::string stateString(State state);

HiveDataSink(
RowTypePtr inputType,
std::shared_ptr<const HiveInsertTableHandle> insertTableHandle,
Expand All @@ -443,6 +457,8 @@ class HiveDataSink : public DataSink {

void appendData(RowVectorPtr input) override;

bool finish() override;

Stats stats() const override;

std::vector<std::string> close() override;
Expand All @@ -452,12 +468,6 @@ class HiveDataSink : public DataSink {
bool canReclaim() const;

private:
enum class State { kRunning = 0, kAborted = 1, kClosed = 2 };
friend struct fmt::formatter<
facebook::velox::connector::hive::HiveDataSink::State>;

static std::string stateString(State state);

// Validates the state transition from 'oldState' to 'newState'.
void checkStateTransition(State oldState, State newState);
void setState(State newState);
Expand Down Expand Up @@ -588,6 +598,7 @@ class HiveDataSink : public DataSink {
const std::unique_ptr<core::PartitionFunction> bucketFunction_;
const std::shared_ptr<dwio::common::WriterFactory> writerFactory_;
const common::SpillConfig* const spillConfig_;
const uint64_t sortWriterFinishTimeSliceLimitMs_{0};

std::vector<column_index_t> sortColumnIndices_;
std::vector<CompareFlags> sortCompareFlags_;
Expand Down Expand Up @@ -619,15 +630,22 @@ class HiveDataSink : public DataSink {
std::vector<uint32_t> bucketIds_;
};

FOLLY_ALWAYS_INLINE std::ostream& operator<<(
std::ostream& os,
HiveDataSink::State state) {
os << HiveDataSink::stateString(state);
return os;
}
} // namespace facebook::velox::connector::hive

template <>
struct fmt::formatter<facebook::velox::connector::hive::HiveDataSink::State>
: formatter<int> {
: formatter<std::string> {
auto format(
facebook::velox::connector::hive::HiveDataSink::State s,
format_context& ctx) const {
return formatter<int>::format(static_cast<int>(s), ctx);
return formatter<std::string>::format(
facebook::velox::connector::hive::HiveDataSink::stateString(s), ctx);
}
};

Expand Down
7 changes: 7 additions & 0 deletions velox/connectors/hive/tests/HiveConfigTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ TEST(HiveConfigTest, defaultConfig) {
ASSERT_EQ(hiveConfig.sortWriterMaxOutputRows(emptySession.get()), 1024);
ASSERT_EQ(
hiveConfig.sortWriterMaxOutputBytes(emptySession.get()), 10UL << 20);
ASSERT_EQ(
hiveConfig.sortWriterFinishTimeSliceLimitMs(emptySession.get()), 5'000);
ASSERT_EQ(hiveConfig.isPartitionPathAsLowerCase(emptySession.get()), true);
ASSERT_EQ(hiveConfig.allowNullPartitionKeys(emptySession.get()), true);
ASSERT_EQ(hiveConfig.orcWriterMinCompressionSize(emptySession.get()), 1024);
Expand Down Expand Up @@ -109,6 +111,7 @@ TEST(HiveConfigTest, overrideConfig) {
{HiveConfig::kOrcWriterStringDictionaryEncodingEnabled, "false"},
{HiveConfig::kSortWriterMaxOutputRows, "100"},
{HiveConfig::kSortWriterMaxOutputBytes, "100MB"},
{HiveConfig::kSortWriterFinishTimeSliceLimitMs, "400"},
{HiveConfig::kOrcWriterLinearStripeSizeHeuristics, "false"},
{HiveConfig::kOrcWriterMinCompressionSize, "512"},
{HiveConfig::kOrcWriterCompressionLevel, "1"},
Expand Down Expand Up @@ -159,6 +162,8 @@ TEST(HiveConfigTest, overrideConfig) {
ASSERT_EQ(hiveConfig.sortWriterMaxOutputRows(emptySession.get()), 100);
ASSERT_EQ(
hiveConfig.sortWriterMaxOutputBytes(emptySession.get()), 100UL << 20);
ASSERT_EQ(
hiveConfig.sortWriterFinishTimeSliceLimitMs(emptySession.get()), 400);
ASSERT_EQ(hiveConfig.orcWriterMinCompressionSize(emptySession.get()), 512);
ASSERT_EQ(hiveConfig.orcWriterCompressionLevel(emptySession.get()), 1);
ASSERT_EQ(
Expand All @@ -180,6 +185,7 @@ TEST(HiveConfigTest, overrideSession) {
{HiveConfig::kOrcWriterStringDictionaryEncodingEnabledSession, "false"},
{HiveConfig::kSortWriterMaxOutputRowsSession, "20"},
{HiveConfig::kSortWriterMaxOutputBytesSession, "20MB"},
{HiveConfig::kSortWriterFinishTimeSliceLimitMsSession, "300"},
{HiveConfig::kPartitionPathAsLowerCaseSession, "false"},
{HiveConfig::kAllowNullPartitionKeysSession, "false"},
{HiveConfig::kIgnoreMissingFilesSession, "true"},
Expand Down Expand Up @@ -227,6 +233,7 @@ TEST(HiveConfigTest, overrideSession) {
false);
ASSERT_EQ(hiveConfig.sortWriterMaxOutputRows(session.get()), 20);
ASSERT_EQ(hiveConfig.sortWriterMaxOutputBytes(session.get()), 20UL << 20);
ASSERT_EQ(hiveConfig.sortWriterFinishTimeSliceLimitMs(session.get()), 300);
ASSERT_EQ(hiveConfig.isPartitionPathAsLowerCase(session.get()), false);
ASSERT_EQ(hiveConfig.allowNullPartitionKeys(session.get()), false);
ASSERT_EQ(hiveConfig.ignoreMissingFiles(session.get()), true);
Expand Down
Loading

0 comments on commit b00751e

Please sign in to comment.