Skip to content

Commit

Permalink
Added PUT method in /v1/info/state for transitioning the server to th…
Browse files Browse the repository at this point in the history
…e SHUTING_DOWN state for graceful shutdown
  • Loading branch information
anandamideShakyan committed Nov 6, 2024
1 parent b036356 commit 32f1458
Show file tree
Hide file tree
Showing 4 changed files with 123 additions and 1 deletion.
27 changes: 26 additions & 1 deletion presto-native-execution/presto_cpp/main/PrestoServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include "presto_cpp/main/common/ConfigReader.h"
#include "presto_cpp/main/common/Counters.h"
#include "presto_cpp/main/common/Utils.h"
#include "presto_cpp/main/http/HttpConstants.h"
#include "presto_cpp/main/http/filters/AccessLogFilter.h"
#include "presto_cpp/main/http/filters/HttpEndpointLatencyFilter.h"
#include "presto_cpp/main/http/filters/InternalAuthenticationFilter.h"
Expand Down Expand Up @@ -371,6 +372,14 @@ void PrestoServer::run() {
json infoStateJson = convertNodeState(server->nodeState());
http::sendOkResponse(downstream, infoStateJson);
});
httpServer_->registerPut(
"/v1/info/state",
[server = this](
proxygen::HTTPMessage* /*message*/,
const std::vector<std::unique_ptr<folly::IOBuf>>& body,
proxygen::ResponseHandler* downstream) {
server->handleGracefulShutdown(body, downstream);
});
httpServer_->registerGet(
"/v1/status",
[server = this](
Expand Down Expand Up @@ -906,7 +915,6 @@ void PrestoServer::stop() {
PRESTO_SHUTDOWN_LOG(INFO)
<< "Waiting for " << shutdownOnsetSec
<< " second(s) before proceeding with the shutdown...";

// Give coordinator some time to receive our new node state and stop sending
// any tasks.
std::this_thread::sleep_for(std::chrono::seconds(shutdownOnsetSec));
Expand Down Expand Up @@ -1421,6 +1429,23 @@ void PrestoServer::reportNodeStatus(proxygen::ResponseHandler* downstream) {
http::sendOkResponse(downstream, json(fetchNodeStatus()));
}

void PrestoServer::handleGracefulShutdown(
const std::vector<std::unique_ptr<folly::IOBuf>>& body,
proxygen::ResponseHandler* downstream) {
if (body.size() == 1 &&
folly::trimWhitespace(body[0]->moveToFbString()) == "\"SHUTTING_DOWN\"") {
LOG(INFO) << "Shutdown requested";
if (nodeState() == NodeState::kActive) {
std::thread([this]() { this->stop(); }).detach();
} else {
LOG(INFO) << "Node is inactive or shutdown is already requested";
}
http::sendOkResponse(downstream);
} else {
http::sendErrorResponse(downstream, "Bad Request", http::kHttpBadRequest);
}
}

void PrestoServer::registerSidecarEndpoints() {
VELOX_CHECK(httpServer_);
httpServer_->registerGet(
Expand Down
4 changes: 4 additions & 0 deletions presto-native-execution/presto_cpp/main/PrestoServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,10 @@ class PrestoServer {

void reportNodeStatus(proxygen::ResponseHandler* downstream);

void handleGracefulShutdown(
const std::vector<std::unique_ptr<folly::IOBuf>>& body,
proxygen::ResponseHandler* downstream);

protocol::NodeStatus fetchNodeStatus();

void populateMemAndCPUInfo();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ namespace facebook::presto::http {
const uint16_t kHttpOk = 200;
const uint16_t kHttpAccepted = 202;
const uint16_t kHttpNoContent = 204;
const uint16_t kHttpBadRequest = 400;
const uint16_t kHttpUnauthorized = 401;
const uint16_t kHttpNotFound = 404;
const uint16_t kHttpInternalServerError = 500;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.nativeworker;

import com.facebook.presto.testing.QueryRunner;
import com.facebook.presto.tests.DistributedQueryRunner;
import org.testng.annotations.Test;

import java.io.OutputStream;
import java.net.HttpURLConnection;
import java.net.URI;
import java.net.URL;
import java.nio.charset.StandardCharsets;

import static org.testng.Assert.assertEquals;

public class TestPrestoNativeGracefulShutdown
{
@Test
public void testGracefulShutdown() throws Exception
{
QueryRunner queryRunner = createQueryRunner();
String baseUri = getServerUri(queryRunner);
int responseCode = sendShutdownRequest(baseUri, "\"SHUTTING_DOWN\"");
assertEquals(responseCode, 200, "Expected a 200 OK response for valid shutdown request");
queryRunner.close();
}

@Test
public void testInvalidShutdownRequest() throws Exception
{
QueryRunner queryRunner = createQueryRunner();
String baseUri = getServerUri(queryRunner);
int responseCode = sendShutdownRequest(baseUri, "\"INVALID_SHUTDOWN\"");
assertEquals(responseCode, 400, "Expected a 400 Bad Request response for invalid body");
queryRunner.close();
}

@Test
public void testEmptyBodyShutdownRequest() throws Exception
{
QueryRunner queryRunner = createQueryRunner();
String baseUri = getServerUri(queryRunner);
int responseCode = sendShutdownRequest(baseUri, "");
assertEquals(responseCode, 400, "Expected a 400 Bad Request response for empty body");
queryRunner.close();
}

public static int sendShutdownRequest(String shutdownUrl, String body)
{
try {
URL url = new URL(shutdownUrl);
HttpURLConnection connection = (HttpURLConnection) url.openConnection();
connection.setRequestMethod("PUT");
connection.setDoOutput(true);
connection.setRequestProperty("Content-Type", "application/json");

try (OutputStream os = connection.getOutputStream()) {
byte[] input = body.getBytes(StandardCharsets.UTF_8);
os.write(input, 0, input.length);
}
return connection.getResponseCode();
}
catch (Exception e) {
e.printStackTrace();
return 500;
}
}

private QueryRunner createQueryRunner() throws Exception
{
return PrestoNativeQueryRunnerUtils.createNativeQueryRunner(true);
}

private String getServerUri(QueryRunner queryRunner)
{
DistributedQueryRunner distributedQueryRunner = (DistributedQueryRunner) queryRunner;
URI coordinatorUri = distributedQueryRunner.getCoordinator().getBaseUrl();
return coordinatorUri.toString() + "/v1/info/state";
}
}

0 comments on commit 32f1458

Please sign in to comment.