From 32f1458266a3fb3080e4ee82aeea09f6f22f86aa Mon Sep 17 00:00:00 2001 From: Shakyan Kushwaha Date: Sun, 25 Aug 2024 17:30:21 +0530 Subject: [PATCH] Added PUT method in /v1/info/state for transitioning the server to the SHUTING_DOWN state for graceful shutdown --- .../presto_cpp/main/PrestoServer.cpp | 27 +++++- .../presto_cpp/main/PrestoServer.h | 4 + .../presto_cpp/main/http/HttpConstants.h | 1 + .../TestPrestoNativeGracefulShutdown.java | 92 +++++++++++++++++++ 4 files changed, 123 insertions(+), 1 deletion(-) create mode 100644 presto-native-execution/src/test/java/com/facebook/presto/nativeworker/TestPrestoNativeGracefulShutdown.java diff --git a/presto-native-execution/presto_cpp/main/PrestoServer.cpp b/presto-native-execution/presto_cpp/main/PrestoServer.cpp index 0cdd3830599a..b3c417dfba5b 100644 --- a/presto-native-execution/presto_cpp/main/PrestoServer.cpp +++ b/presto-native-execution/presto_cpp/main/PrestoServer.cpp @@ -26,6 +26,7 @@ #include "presto_cpp/main/common/ConfigReader.h" #include "presto_cpp/main/common/Counters.h" #include "presto_cpp/main/common/Utils.h" +#include "presto_cpp/main/http/HttpConstants.h" #include "presto_cpp/main/http/filters/AccessLogFilter.h" #include "presto_cpp/main/http/filters/HttpEndpointLatencyFilter.h" #include "presto_cpp/main/http/filters/InternalAuthenticationFilter.h" @@ -371,6 +372,14 @@ void PrestoServer::run() { json infoStateJson = convertNodeState(server->nodeState()); http::sendOkResponse(downstream, infoStateJson); }); + httpServer_->registerPut( + "/v1/info/state", + [server = this]( + proxygen::HTTPMessage* /*message*/, + const std::vector>& body, + proxygen::ResponseHandler* downstream) { + server->handleGracefulShutdown(body, downstream); + }); httpServer_->registerGet( "/v1/status", [server = this]( @@ -906,7 +915,6 @@ void PrestoServer::stop() { PRESTO_SHUTDOWN_LOG(INFO) << "Waiting for " << shutdownOnsetSec << " second(s) before proceeding with the shutdown..."; - // Give coordinator some time to receive our new node state and stop sending // any tasks. std::this_thread::sleep_for(std::chrono::seconds(shutdownOnsetSec)); @@ -1421,6 +1429,23 @@ void PrestoServer::reportNodeStatus(proxygen::ResponseHandler* downstream) { http::sendOkResponse(downstream, json(fetchNodeStatus())); } +void PrestoServer::handleGracefulShutdown( + const std::vector>& body, + proxygen::ResponseHandler* downstream) { + if (body.size() == 1 && + folly::trimWhitespace(body[0]->moveToFbString()) == "\"SHUTTING_DOWN\"") { + LOG(INFO) << "Shutdown requested"; + if (nodeState() == NodeState::kActive) { + std::thread([this]() { this->stop(); }).detach(); + } else { + LOG(INFO) << "Node is inactive or shutdown is already requested"; + } + http::sendOkResponse(downstream); + } else { + http::sendErrorResponse(downstream, "Bad Request", http::kHttpBadRequest); + } +} + void PrestoServer::registerSidecarEndpoints() { VELOX_CHECK(httpServer_); httpServer_->registerGet( diff --git a/presto-native-execution/presto_cpp/main/PrestoServer.h b/presto-native-execution/presto_cpp/main/PrestoServer.h index 3082b9aa1881..b6d6ad50659c 100644 --- a/presto-native-execution/presto_cpp/main/PrestoServer.h +++ b/presto-native-execution/presto_cpp/main/PrestoServer.h @@ -211,6 +211,10 @@ class PrestoServer { void reportNodeStatus(proxygen::ResponseHandler* downstream); + void handleGracefulShutdown( + const std::vector>& body, + proxygen::ResponseHandler* downstream); + protocol::NodeStatus fetchNodeStatus(); void populateMemAndCPUInfo(); diff --git a/presto-native-execution/presto_cpp/main/http/HttpConstants.h b/presto-native-execution/presto_cpp/main/http/HttpConstants.h index 55c1d41266c9..2949725f27cf 100644 --- a/presto-native-execution/presto_cpp/main/http/HttpConstants.h +++ b/presto-native-execution/presto_cpp/main/http/HttpConstants.h @@ -17,6 +17,7 @@ namespace facebook::presto::http { const uint16_t kHttpOk = 200; const uint16_t kHttpAccepted = 202; const uint16_t kHttpNoContent = 204; +const uint16_t kHttpBadRequest = 400; const uint16_t kHttpUnauthorized = 401; const uint16_t kHttpNotFound = 404; const uint16_t kHttpInternalServerError = 500; diff --git a/presto-native-execution/src/test/java/com/facebook/presto/nativeworker/TestPrestoNativeGracefulShutdown.java b/presto-native-execution/src/test/java/com/facebook/presto/nativeworker/TestPrestoNativeGracefulShutdown.java new file mode 100644 index 000000000000..2490426caefa --- /dev/null +++ b/presto-native-execution/src/test/java/com/facebook/presto/nativeworker/TestPrestoNativeGracefulShutdown.java @@ -0,0 +1,92 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.nativeworker; + +import com.facebook.presto.testing.QueryRunner; +import com.facebook.presto.tests.DistributedQueryRunner; +import org.testng.annotations.Test; + +import java.io.OutputStream; +import java.net.HttpURLConnection; +import java.net.URI; +import java.net.URL; +import java.nio.charset.StandardCharsets; + +import static org.testng.Assert.assertEquals; + +public class TestPrestoNativeGracefulShutdown +{ + @Test + public void testGracefulShutdown() throws Exception + { + QueryRunner queryRunner = createQueryRunner(); + String baseUri = getServerUri(queryRunner); + int responseCode = sendShutdownRequest(baseUri, "\"SHUTTING_DOWN\""); + assertEquals(responseCode, 200, "Expected a 200 OK response for valid shutdown request"); + queryRunner.close(); + } + + @Test + public void testInvalidShutdownRequest() throws Exception + { + QueryRunner queryRunner = createQueryRunner(); + String baseUri = getServerUri(queryRunner); + int responseCode = sendShutdownRequest(baseUri, "\"INVALID_SHUTDOWN\""); + assertEquals(responseCode, 400, "Expected a 400 Bad Request response for invalid body"); + queryRunner.close(); + } + + @Test + public void testEmptyBodyShutdownRequest() throws Exception + { + QueryRunner queryRunner = createQueryRunner(); + String baseUri = getServerUri(queryRunner); + int responseCode = sendShutdownRequest(baseUri, ""); + assertEquals(responseCode, 400, "Expected a 400 Bad Request response for empty body"); + queryRunner.close(); + } + + public static int sendShutdownRequest(String shutdownUrl, String body) + { + try { + URL url = new URL(shutdownUrl); + HttpURLConnection connection = (HttpURLConnection) url.openConnection(); + connection.setRequestMethod("PUT"); + connection.setDoOutput(true); + connection.setRequestProperty("Content-Type", "application/json"); + + try (OutputStream os = connection.getOutputStream()) { + byte[] input = body.getBytes(StandardCharsets.UTF_8); + os.write(input, 0, input.length); + } + return connection.getResponseCode(); + } + catch (Exception e) { + e.printStackTrace(); + return 500; + } + } + + private QueryRunner createQueryRunner() throws Exception + { + return PrestoNativeQueryRunnerUtils.createNativeQueryRunner(true); + } + + private String getServerUri(QueryRunner queryRunner) + { + DistributedQueryRunner distributedQueryRunner = (DistributedQueryRunner) queryRunner; + URI coordinatorUri = distributedQueryRunner.getCoordinator().getBaseUrl(); + return coordinatorUri.toString() + "/v1/info/state"; + } +}