Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add native_max_local_exchange_partition_count session property #23910

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -258,3 +258,12 @@ The maximum size in bytes for the task's buffered output. The buffer is shared a
The maximum bytes to buffer per PartitionedOutput operator to avoid creating tiny SerializedPages.
For PartitionedOutputNode::Kind::kPartitioned, PartitionedOutput operator would buffer up to that number of
bytes / number of destinations for each destination before producing a SerializedPage. Default is 32MB.

``native_max_local_exchange_partition_count``
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

* **Type:** ``bigint``
* **Default value:** ``4294967296``

Maximum number of partitions created by a local exchange.
Affects concurrency for pipelines containing LocalPartitionNode.
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ public class NativeWorkerSessionPropertyProvider
public static final String NATIVE_QUERY_TRACE_NODE_IDS = "native_query_trace_node_ids";
public static final String NATIVE_QUERY_TRACE_MAX_BYTES = "native_query_trace_max_bytes";
public static final String NATIVE_QUERY_TRACE_REG_EXP = "native_query_trace_task_reg_exp";
public static final String NATIVE_MAX_LOCAL_EXCHANGE_PARTITION_COUNT = "native_max_local_exchange_partition_count";
private final List<PropertyMetadata<?>> sessionProperties;

@Inject
Expand Down Expand Up @@ -225,6 +226,12 @@ public NativeWorkerSessionPropertyProvider(FeaturesConfig featuresConfig)
"would buffer up to that number of bytes / number of destinations for each destination before " +
"producing a SerializedPage.",
24L << 20,
!nativeExecution),
integerProperty(
NATIVE_MAX_LOCAL_EXCHANGE_PARTITION_COUNT,
"Maximum number of partitions created by a local exchange. " +
"Affects concurrency for pipelines containing LocalPartitionNode",
null,
!nativeExecution));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,15 @@ SessionProperties::SessionProperties() {
// Overrides velox default value. Set it to 1 second to be aligned with
// Presto Java.
std::to_string(1000));

addSessionProperty(
kMaxLocalExchangePartitionCount,
"Maximum number of partitions created by a local exchange."
"Affects concurrency for pipelines containing LocalPartitionNode",
BIGINT(),
false,
QueryConfig::kMaxLocalExchangePartitionCount,
std::to_string(c.maxLocalExchangePartitionCount()));
}

const std::unordered_map<std::string, std::shared_ptr<SessionProperty>>&
Expand Down
5 changes: 5 additions & 0 deletions presto-native-execution/presto_cpp/main/SessionProperties.h
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,11 @@ class SessionProperties {
static constexpr const char* kMaxPartitionedOutputBufferSize =
"native_max_page_partitioning_buffer_size";

/// Maximum number of partitions created by a local exchange.
/// Affects concurrency for pipelines containing LocalPartitionNode.
static constexpr const char* kMaxLocalExchangePartitionCount =
"native_max_local_exchange_partition_count";

SessionProperties();

const std::unordered_map<std::string, std::shared_ptr<SessionProperty>>&
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
import static com.facebook.presto.nativeworker.PrestoNativeQueryRunnerUtils.createExternalTable;
import static com.facebook.presto.nativeworker.SymlinkManifestGeneratorUtils.cleanupSymlinkData;
import static com.facebook.presto.tpch.TpchMetadata.getPrestoType;
import static java.lang.String.format;

public abstract class AbstractTestNativeHiveExternalTableTpchQueries
extends AbstractTestNativeTpchQueries
Expand Down Expand Up @@ -114,7 +113,9 @@ public void tearDown()
for (String tableName : TPCH_TABLES) {
dropTableIfExists(javaQueryRunner, HIVE, TPCH, tableName);
}
assertUpdate(format("DROP SCHEMA IF EXISTS %s.%s", HIVE, TPCH));

// https://github.com/prestodb/presto/issues/23908
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove before land? Thanks!

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I commented the following line to unblock. Created an issue to resolve it properly: #23908

Let me leave a comment on the issue to remove this comment once resolved.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

// assertUpdate(format("DROP SCHEMA IF EXISTS %s.%s", HIVE, TPCH));

File dataDirectory = ((DistributedQueryRunner) javaQueryRunner).getCoordinator().getDataDirectory().resolve(HIVE_DATA).toFile();
Path symlinkTableDataPath = dataDirectory.toPath().getParent().resolve(SYMLINK_FOLDER);
Expand Down
2 changes: 1 addition & 1 deletion presto-native-execution/velox
Submodule velox updated 286 files
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ private List<PropertyMetadata<?>> fetchSessionProperties()
return propertyMetadataList;
}
catch (Exception e) {
throw new PrestoException(INVALID_ARGUMENTS, "Failed to get session properties from sidecar.");
throw new PrestoException(INVALID_ARGUMENTS, "Failed to get session properties from sidecar.", e);
}
}

Expand Down
Loading