diff --git a/.gitmodules b/.gitmodules index 70fcc9d..828955f 100644 --- a/.gitmodules +++ b/.gitmodules @@ -1,3 +1,6 @@ [submodule "3rdparty/drogon"] path = 3rdparty/drogon url = https://github.com/drogonframework/drogon.git +[submodule "3rdparty/magic_enum"] + path = 3rdparty/magic_enum + url = https://github.com/Neargye/magic_enum.git diff --git a/3rdparty/magic_enum b/3rdparty/magic_enum new file mode 160000 index 0000000..126539e --- /dev/null +++ b/3rdparty/magic_enum @@ -0,0 +1 @@ +Subproject commit 126539e13cccdc2e75ce770e94f3c26403099fa5 diff --git a/CMakeLists.txt b/CMakeLists.txt index 6668592..f6a5e43 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -13,8 +13,13 @@ option(BUILD_EXAMPLES "Build examples" OFF) option(BUILD_CTL "Build drogon_ctl" OFF) add_subdirectory(3rdparty/drogon) +add_subdirectory(3rdparty/magic_enum) + set(Boost_USE_STATIC_LIBS ON) find_package(Boost REQUIRED COMPONENTS program_options) + +add_subdirectory(src) + add_executable(simple_inference_server main.cpp) -target_link_libraries(simple_inference_server drogon Boost::program_options) +target_link_libraries(simple_inference_server Boost::program_options libsimple_inference_server) diff --git a/main.cpp b/main.cpp index 310496c..c5e870d 100644 --- a/main.cpp +++ b/main.cpp @@ -4,118 +4,11 @@ #include #include +#include "src/Controller.h" -using namespace drogon; - - -struct GlobalConfig -{ - std::filesystem::path invokePath; - std::filesystem::path indexPath; -} globalConfig; - -bool invokeProcessing( const std::filesystem::path& input, const std::filesystem::path& output ) -{ - auto exitCode = system( fmt::format( "{} {} {}", globalConfig.invokePath.string(), input.string(), output.string() ).c_str() ); - return exitCode == 0; -} -std::string getInputFileName( const std::string& taskId ) -{ - return taskId + ".input.zip"; -} -std::filesystem::path getInputPath( const std::string& taskId ) -{ - return std::filesystem::path{ app().getUploadPath() } / getInputFileName( taskId ); -} -std::filesystem::path getOutputPath( const std::string& taskId ) -{ - return std::filesystem::path{ app().getUploadPath() } / ( taskId + ".zip" ); -} - - -Task trackResultHandler( HttpRequestPtr req ) -{ - const auto maybeTask = req->getOptionalParameter( "task" ); - const auto maybeOutput = maybeTask.transform( getOutputPath ); - - if ( !maybeOutput || !std::filesystem::exists( *maybeOutput ) ) - { - auto resp = HttpResponse::newHttpResponse(); - resp->setBody( "The task is not available (yet)" ); - resp->setStatusCode( HttpStatusCode::k200OK ); - resp->addHeader( "Refresh", "10" ); - co_return resp; - } - - auto resp = HttpResponse::newHttpResponse(); - resp->setBody( fmt::format( "Result can be downloaded here", *maybeTask ) ); - resp->setStatusCode( HttpStatusCode::k200OK ); - co_return resp; -} - -Task getResultHandler( HttpRequestPtr req ) -{ - const auto maybeTask = req->getOptionalParameter( "task" ); - const auto maybeOutput = maybeTask.transform( getOutputPath ); - if ( !maybeOutput || !std::filesystem::exists( *maybeOutput ) ) - { - co_return HttpResponse::newNotFoundResponse(); - } - - auto resp = HttpResponse::newFileResponse( *maybeOutput, maybeOutput->filename() ); - co_return resp; -} - -Task submitTaskHandler( HttpRequestPtr req ) -{ - MultiPartParser filesUpload; - if ( filesUpload.parse(req) != 0 || filesUpload.getFiles().size() != 1 ) - { - auto resp = HttpResponse::newHttpResponse(); - resp->setBody( "Must only be one file" ); - resp->setStatusCode( k403Forbidden ); - co_return resp; - } - - auto file = filesUpload.getFiles()[0]; - if ( !file.getFileName().ends_with( ".zip" ) ) - { - auto resp = HttpResponse::newHttpResponse(); - resp->setBody( "Must be zip file" ); - resp->setStatusCode( k403Forbidden ); - co_return resp; - } - - const auto taskId = drogon::utils::getUuid(); - file.saveAs( getInputFileName( taskId ) ); - - app().getLoop()->queueInLoop( [taskId] - { - const auto inputPath = getInputPath( taskId ); - const auto outputPath = getOutputPath( taskId ); - invokeProcessing( inputPath, outputPath ); - } ); +using namespace drogon; - auto resp = HttpResponse::newHttpResponse(); - resp->setStatusCode( HttpStatusCode::k202Accepted ); - resp->setBody( fmt::format( "Your requested has been accepted. You can track it here", - taskId ) ); - co_return resp; -} - -Task rootHandler( HttpRequestPtr req ) -{ - auto resp = HttpResponse::newHttpResponse( HttpStatusCode::k200OK, CT_TEXT_HTML ); - if ( exists( globalConfig.indexPath ) ) - { - std::ifstream fin( globalConfig.indexPath ); - std::stringstream buffer; - buffer << fin.rdbuf(); - resp->setBody( buffer.str() ); - } - co_return resp; -} // Fix parsing std::filesystem::path with spaces (see https://github.com/boostorg/program_options/issues/69) namespace boost @@ -133,12 +26,13 @@ int main( int argc, char** argv ) std::string host; std::filesystem::path certPath, keyPath; int port; + Controller::Config config; po::options_description desc( "Simple Inference Server" ); desc.add_options() ( "help,h", "Display help message" ) - ( "invokePath", po::value( &globalConfig.invokePath )->required(), "Path to the script that will be invoked" ) - ( "indexPath", po::value( &globalConfig.indexPath )->default_value( {} ), "Path to the index.html" ) + ( "invokePath", po::value( &config.invokePath )->required(), "Path to the script that will be invoked" ) + ( "indexPath", po::value( &config.indexPath )->default_value( {} ), "Path to the index.html" ) ( "host", po::value( &host )->default_value( "127.0.0.1" ), "Host to bind to" ) ( "port", po::value( &port )->default_value( 7654 ), "Port to bind to" ) ( "cert", po::value( &certPath )->default_value( {} ), "Path to SSL certificate" ) @@ -163,11 +57,6 @@ int main( int argc, char** argv ) return -1; } - app().registerHandler( "/track_result", std::function{ trackResultHandler } ); - app().registerHandler( "/get_result", std::function{ getResultHandler } ); - app().registerHandler( "/submit", std::function{ submitTaskHandler }, {Post } ); - app().registerHandler( "/", std::function{ rootHandler } ); - const bool useSSL = exists( certPath ) && exists( keyPath ); if ( useSSL && !app().supportSSL() ) { @@ -185,6 +74,7 @@ int main( int argc, char** argv ) app() .setClientMaxBodySize( 1024*1024*1024 ) // 1gb .addListener( host, port, useSSL, certPath, keyPath ) + .registerController( std::make_shared( config ) ) .run(); return 0; diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt new file mode 100644 index 0000000..bece545 --- /dev/null +++ b/src/CMakeLists.txt @@ -0,0 +1,6 @@ +project(libsimple_inference_server) + + +file(GLOB source *.cpp) +add_library(libsimple_inference_server STATIC ${source}) +target_link_libraries(libsimple_inference_server drogon magic_enum::magic_enum) diff --git a/src/Controller.cpp b/src/Controller.cpp new file mode 100644 index 0000000..23f98ae --- /dev/null +++ b/src/Controller.cpp @@ -0,0 +1,213 @@ +// +// Created by Andrey Aralov on 9/23/24. +// +#include "Controller.h" + +#include + +#include + +#include + + +using namespace drogon; + + +namespace +{ + +bool + invokeProcessing( const std::filesystem::path& invokePath, + const std::filesystem::path& input, + const std::filesystem::path& output ) +{ + auto exitCode = system( fmt::format( "{} {} {}", invokePath.string(), input.string(), output.string() ).c_str() ); + return exitCode == 0; +} + +std::string getInputFileName( const std::string& taskId ) +{ + return taskId + ".input.zip"; +} +std::filesystem::path getInputPath( const std::string& taskId ) +{ + return std::filesystem::path{ app().getUploadPath() } / getInputFileName( taskId ); +} +std::filesystem::path getOutputPath( const std::string& taskId ) +{ + return std::filesystem::path{ app().getUploadPath() } / ( taskId + ".zip" ); +} + +} + + +Controller::Error::operator HttpResponsePtr() const +{ + auto resp = HttpResponse::newHttpResponse(); + resp->setBody( msg ); + resp->setStatusCode( code ); + return resp; +} + + +Controller::Controller( const Config& config ): + config_( config ) +{} + +Controller::ExpectedOrError Controller::submit_( drogon::HttpRequestPtr req ) +{ + MultiPartParser filesUpload; + if ( filesUpload.parse(req) != 0 || filesUpload.getFiles().size() != 1 ) + return std::unexpected{ { k403Forbidden, "Must be only one file" } }; + + auto file = filesUpload.getFiles()[0]; + if ( !file.getFileName().ends_with( ".zip" ) ) + return std::unexpected{ { k403Forbidden, "Must be a zip file" } }; + + const auto jobId = drogon::utils::getUuid(); + file.saveAs( getInputFileName( jobId ) ); + + app().getLoop()->queueInLoop( [jobId, this] + { + { + Job job{ + .status = Job::InProgress, + .id = jobId + }; + std::lock_guard lock( jobsMutex_ ); + jobs_[jobId] = job; + } + + const auto inputPath = getInputPath( jobId ); + const auto outputPath = getOutputPath( jobId ); + auto result = invokeProcessing( config_.invokePath, inputPath, outputPath ); + + { + std::shared_lock lock( jobsMutex_ ); + if ( result ) + jobs_[jobId].status = Job::Success; + else + jobs_[jobId].status = Job::Failed; + } + } ); + + return jobId; +} + +Controller::ExpectedOrError Controller::getInfo_( drogon::HttpRequestPtr req ) +{ + const auto maybeJobId = req->getOptionalParameter( "job_id" ); + + if ( !maybeJobId ) + return std::unexpected{ { k404NotFound, "Job is not found" } }; + + std::shared_lock lock( jobsMutex_ ); + return jobs_[*maybeJobId]; +} + +drogon::HttpResponsePtr Controller::getResult_( drogon::HttpRequestPtr req ) +{ + const auto maybeTask = req->getOptionalParameter( "job_id" ); + const auto maybeOutput = maybeTask.transform( getOutputPath ); + if ( !maybeOutput || !std::filesystem::exists( *maybeOutput ) ) + { + return HttpResponse::newNotFoundResponse(); + } + + auto resp = HttpResponse::newFileResponse( *maybeOutput, maybeOutput->filename() ); + return resp; +} + + +Task Controller::submit( HttpRequestPtr req ) +{ + if ( auto res = submit_( req ) ) + { + auto resp = HttpResponse::newHttpResponse(); + resp->setStatusCode( HttpStatusCode::k202Accepted ); + resp->setBody( fmt::format( "Your requested has been accepted. You can track it here", + *res ) ); + co_return resp; + } + else + { + co_return res.error(); + } +} + +Task Controller::getInfo( HttpRequestPtr req ) +{ + if ( auto res = getInfo_( req ) ) + { + auto resp = HttpResponse::newHttpResponse(); + resp->setStatusCode( k200OK ); + + switch ( res->status ) + { + case Job::InProgress: + resp->setBody( "The job is running" ); + break; + case Job::Failed: + resp->setBody( "The job is failed" ); + break; + case Job::Success: + resp->setBody( fmt::format( "The job is completed. Result can be downloaded here", + res->id ) ); + break; + } + + co_return resp; + } + else + { + co_return res.error(); + } +} + +Task Controller::getResult( HttpRequestPtr req ) +{ + co_return getResult_( req ); +} + + +Task Controller::submit_api( HttpRequestPtr req ) +{ + if ( auto res = submit_( req ) ) + { + Json::Value root; + root["job_id"] = *res; + std::stringstream ss; + ss << root; + + auto resp = HttpResponse::newHttpResponse(); + resp->setStatusCode( HttpStatusCode::k202Accepted ); + resp->setBody( ss.str() ); + co_return resp; + } + else + { + co_return res.error(); + } +} + + +Task Controller::getInfo_api( HttpRequestPtr req ) +{ + if ( auto res = getInfo_( req ) ) + { + Json::Value root; + root["status"] = std::string( magic_enum::enum_name( res->status ) ); + std::stringstream ss; + ss << root; + + auto resp = HttpResponse::newHttpResponse(); + resp->setStatusCode( k200OK ); + resp->addHeader( "Content-Type", "application/json" ); + resp->setBody( ss.str() ); + co_return resp; + } + else + { + co_return res.error(); + } +} diff --git a/src/Controller.h b/src/Controller.h new file mode 100644 index 0000000..332e09b --- /dev/null +++ b/src/Controller.h @@ -0,0 +1,68 @@ +// +// Created by Andrey Aralov on 9/23/24. +// +#pragma once + +#include "Job.h" + +#include +#include +#include + +#include + + +class Controller : public drogon::HttpController +{ +public: + struct Config + { + std::filesystem::path invokePath; + std::filesystem::path indexPath; + }; + + struct Error + { + drogon::HttpStatusCode code; + std::string msg; + + operator drogon::HttpResponsePtr() const; + }; + + template + using ExpectedOrError = std::expected; + +private: + std::unordered_map jobs_; + std::shared_mutex jobsMutex_; + Config config_; + +public: + + explicit Controller( const Config& config ); + + + METHOD_LIST_BEGIN + ADD_METHOD_TO( Controller::submit, "/submit", drogon::Post ); + ADD_METHOD_TO( Controller::getInfo, "/get_job_info", drogon::Get ); + ADD_METHOD_TO( Controller::getResult, "/get_result", drogon::Get ); + + ADD_METHOD_TO( Controller::submit_api, "/api/submit", drogon::Post ); + ADD_METHOD_TO( Controller::getInfo_api, "/api/get_job_info", drogon::Get ); + ADD_METHOD_TO( Controller::getResult, "/api/get_result", drogon::Get ); + METHOD_LIST_END + + drogon::Task submit( drogon::HttpRequestPtr req ); + drogon::Task getInfo( drogon::HttpRequestPtr req ); + drogon::Task getResult( drogon::HttpRequestPtr req ); + + drogon::Task submit_api( drogon::HttpRequestPtr req ); + drogon::Task getInfo_api( drogon::HttpRequestPtr req ); + +protected: + ExpectedOrError submit_( drogon::HttpRequestPtr req ); + ExpectedOrError getInfo_( drogon::HttpRequestPtr req ); + drogon::HttpResponsePtr getResult_( drogon::HttpRequestPtr req ); +}; + + diff --git a/src/Job.cpp b/src/Job.cpp new file mode 100644 index 0000000..870218a --- /dev/null +++ b/src/Job.cpp @@ -0,0 +1,5 @@ +// +// Created by Andrey Aralov on 9/22/24. +// + +#include "Job.h" diff --git a/src/Job.h b/src/Job.h new file mode 100644 index 0000000..1f12c51 --- /dev/null +++ b/src/Job.h @@ -0,0 +1,21 @@ +// +// Created by Andrey Aralov on 9/22/24. +// +#pragma once +#include + + +using JobId = std::string; + +struct Job +{ + enum Status + { + InProgress, + Success, + Failed + }; + + Status status; + JobId id; +}; \ No newline at end of file