From 4a008845669cbb5a82f2e5e69daa0f7da443e1d4 Mon Sep 17 00:00:00 2001 From: Andrii Rosa Date: Tue, 29 Oct 2024 07:33:02 -0700 Subject: [PATCH 1/4] Advance Velox --- presto-native-execution/velox | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/presto-native-execution/velox b/presto-native-execution/velox index eaff5006e96a..e67f11bbd068 160000 --- a/presto-native-execution/velox +++ b/presto-native-execution/velox @@ -1 +1 @@ -Subproject commit eaff5006e96a17426d1c8633cf305431b2fe06d4 +Subproject commit e67f11bbd06859d19cd7686b9dfda23d14b9f3e1 From 2f685cca7844712182e4f4bd6aec144882c6c8fa Mon Sep 17 00:00:00 2001 From: Andrii Rosa Date: Tue, 29 Oct 2024 08:49:41 -0700 Subject: [PATCH 2/4] Unblock TestPrestoNativeHiveExternalTableTpchQueriesParquet https://github.com/prestodb/presto/issues/23908 --- .../AbstractTestNativeHiveExternalTableTpchQueries.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/presto-native-execution/src/test/java/com/facebook/presto/nativeworker/AbstractTestNativeHiveExternalTableTpchQueries.java b/presto-native-execution/src/test/java/com/facebook/presto/nativeworker/AbstractTestNativeHiveExternalTableTpchQueries.java index dadce6334ed1..2e4e0ff6826a 100644 --- a/presto-native-execution/src/test/java/com/facebook/presto/nativeworker/AbstractTestNativeHiveExternalTableTpchQueries.java +++ b/presto-native-execution/src/test/java/com/facebook/presto/nativeworker/AbstractTestNativeHiveExternalTableTpchQueries.java @@ -41,7 +41,6 @@ import static com.facebook.presto.nativeworker.PrestoNativeQueryRunnerUtils.createExternalTable; import static com.facebook.presto.nativeworker.SymlinkManifestGeneratorUtils.cleanupSymlinkData; import static com.facebook.presto.tpch.TpchMetadata.getPrestoType; -import static java.lang.String.format; public abstract class AbstractTestNativeHiveExternalTableTpchQueries extends AbstractTestNativeTpchQueries @@ -114,7 +113,9 @@ public void tearDown() for (String tableName : TPCH_TABLES) { dropTableIfExists(javaQueryRunner, HIVE, TPCH, tableName); } - assertUpdate(format("DROP SCHEMA IF EXISTS %s.%s", HIVE, TPCH)); + + // https://github.com/prestodb/presto/issues/23908 + // assertUpdate(format("DROP SCHEMA IF EXISTS %s.%s", HIVE, TPCH)); File dataDirectory = ((DistributedQueryRunner) javaQueryRunner).getCoordinator().getDataDirectory().resolve(HIVE_DATA).toFile(); Path symlinkTableDataPath = dataDirectory.toPath().getParent().resolve(SYMLINK_FOLDER); From c51b959817546714e92c0e2d1d75abc2c99614e1 Mon Sep 17 00:00:00 2001 From: Andrii Rosa Date: Tue, 29 Oct 2024 09:54:44 -0700 Subject: [PATCH 3/4] Propagate underlying exception in NativeSystemSessionPropertyProvider --- .../NativeSystemSessionPropertyProvider.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/presto-native-sidecar-plugin/src/main/java/com/facebook/presto/sidecar/sessionpropertyproviders/NativeSystemSessionPropertyProvider.java b/presto-native-sidecar-plugin/src/main/java/com/facebook/presto/sidecar/sessionpropertyproviders/NativeSystemSessionPropertyProvider.java index 5d51ef08c524..424d81f4b2ef 100644 --- a/presto-native-sidecar-plugin/src/main/java/com/facebook/presto/sidecar/sessionpropertyproviders/NativeSystemSessionPropertyProvider.java +++ b/presto-native-sidecar-plugin/src/main/java/com/facebook/presto/sidecar/sessionpropertyproviders/NativeSystemSessionPropertyProvider.java @@ -102,7 +102,7 @@ private List> fetchSessionProperties() return propertyMetadataList; } catch (Exception e) { - throw new PrestoException(INVALID_ARGUMENTS, "Failed to get session properties from sidecar."); + throw new PrestoException(INVALID_ARGUMENTS, "Failed to get session properties from sidecar.", e); } } From 9701aecde99cf892a751a249446cdda72de716f6 Mon Sep 17 00:00:00 2001 From: Andrii Rosa Date: Tue, 29 Oct 2024 08:50:08 -0700 Subject: [PATCH 4/4] Add native_max_local_exchange_partition_count session property Maps to the max_local_exchange_partition_count Velox query property introduced in https://github.com/facebookincubator/velox/pull/11292 --- .../src/main/sphinx/presto_cpp/properties-session.rst | 9 +++++++++ .../NativeWorkerSessionPropertyProvider.java | 7 +++++++ .../presto_cpp/main/SessionProperties.cpp | 9 +++++++++ .../presto_cpp/main/SessionProperties.h | 5 +++++ 4 files changed, 30 insertions(+) diff --git a/presto-docs/src/main/sphinx/presto_cpp/properties-session.rst b/presto-docs/src/main/sphinx/presto_cpp/properties-session.rst index ebd6c4153070..10599a02bd99 100644 --- a/presto-docs/src/main/sphinx/presto_cpp/properties-session.rst +++ b/presto-docs/src/main/sphinx/presto_cpp/properties-session.rst @@ -258,3 +258,12 @@ The maximum size in bytes for the task's buffered output. The buffer is shared a The maximum bytes to buffer per PartitionedOutput operator to avoid creating tiny SerializedPages. For PartitionedOutputNode::Kind::kPartitioned, PartitionedOutput operator would buffer up to that number of bytes / number of destinations for each destination before producing a SerializedPage. Default is 32MB. + +``native_max_local_exchange_partition_count`` +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +* **Type:** ``bigint`` +* **Default value:** ``4294967296`` + +Maximum number of partitions created by a local exchange. +Affects concurrency for pipelines containing LocalPartitionNode. diff --git a/presto-main/src/main/java/com/facebook/presto/sessionpropertyproviders/NativeWorkerSessionPropertyProvider.java b/presto-main/src/main/java/com/facebook/presto/sessionpropertyproviders/NativeWorkerSessionPropertyProvider.java index ebe2c798587c..ef0b2f74f359 100644 --- a/presto-main/src/main/java/com/facebook/presto/sessionpropertyproviders/NativeWorkerSessionPropertyProvider.java +++ b/presto-main/src/main/java/com/facebook/presto/sessionpropertyproviders/NativeWorkerSessionPropertyProvider.java @@ -60,6 +60,7 @@ public class NativeWorkerSessionPropertyProvider public static final String NATIVE_QUERY_TRACE_NODE_IDS = "native_query_trace_node_ids"; public static final String NATIVE_QUERY_TRACE_MAX_BYTES = "native_query_trace_max_bytes"; public static final String NATIVE_QUERY_TRACE_REG_EXP = "native_query_trace_task_reg_exp"; + public static final String NATIVE_MAX_LOCAL_EXCHANGE_PARTITION_COUNT = "native_max_local_exchange_partition_count"; private final List> sessionProperties; @Inject @@ -225,6 +226,12 @@ public NativeWorkerSessionPropertyProvider(FeaturesConfig featuresConfig) "would buffer up to that number of bytes / number of destinations for each destination before " + "producing a SerializedPage.", 24L << 20, + !nativeExecution), + integerProperty( + NATIVE_MAX_LOCAL_EXCHANGE_PARTITION_COUNT, + "Maximum number of partitions created by a local exchange. " + + "Affects concurrency for pipelines containing LocalPartitionNode", + null, !nativeExecution)); } diff --git a/presto-native-execution/presto_cpp/main/SessionProperties.cpp b/presto-native-execution/presto_cpp/main/SessionProperties.cpp index 880d8231b83c..1b7df8ae5df4 100644 --- a/presto-native-execution/presto_cpp/main/SessionProperties.cpp +++ b/presto-native-execution/presto_cpp/main/SessionProperties.cpp @@ -344,6 +344,15 @@ SessionProperties::SessionProperties() { // Overrides velox default value. Set it to 1 second to be aligned with // Presto Java. std::to_string(1000)); + + addSessionProperty( + kMaxLocalExchangePartitionCount, + "Maximum number of partitions created by a local exchange." + "Affects concurrency for pipelines containing LocalPartitionNode", + BIGINT(), + false, + QueryConfig::kMaxLocalExchangePartitionCount, + std::to_string(c.maxLocalExchangePartitionCount())); } const std::unordered_map>& diff --git a/presto-native-execution/presto_cpp/main/SessionProperties.h b/presto-native-execution/presto_cpp/main/SessionProperties.h index b19d49f63448..d13a93e95c4b 100644 --- a/presto-native-execution/presto_cpp/main/SessionProperties.h +++ b/presto-native-execution/presto_cpp/main/SessionProperties.h @@ -215,6 +215,11 @@ class SessionProperties { static constexpr const char* kMaxPartitionedOutputBufferSize = "native_max_page_partitioning_buffer_size"; + /// Maximum number of partitions created by a local exchange. + /// Affects concurrency for pipelines containing LocalPartitionNode. + static constexpr const char* kMaxLocalExchangePartitionCount = + "native_max_local_exchange_partition_count"; + SessionProperties(); const std::unordered_map>&