Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update ray coreworker library to 2.31.0 #247

Draft
wants to merge 11 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Project.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
name = "Ray"
uuid = "3f779ece-f0b6-4c4f-a81a-0cb2add9eb95"
authors = ["Beacon Biosignals, Inc"]
version = "0.0.4"
version = "0.0.5"

[deps]
ArgParse = "c7e460c6-2fb9-53a9-8c5b-16f535851c63"
Expand Down
12 changes: 5 additions & 7 deletions build/.bazelrc
Original file line number Diff line number Diff line change
@@ -1,19 +1,17 @@
# Must be first. Enables build:windows, build:linux, build:macos, build:freebsd, build:openbsd
build --enable_platform_specific_config
###############################################################################
# On Windows, provide: BAZEL_SH, and BAZEL_LLVM (if using clang-cl)
# On all platforms, provide: PYTHON3_BIN_PATH=python
###############################################################################

# For --compilation_mode=dbg, consider enabling checks in the standard library as well (below).
build --compilation_mode=opt
# Using C++ 17 on all platforms.
build:linux --host_cxxopt="-std=c++17"
build:macos --host_cxxopt="-std=c++17"
build:linux --cxxopt="-std=c++17"
build:macos --cxxopt="-std=c++17"
# This workaround is needed to prevent Bazel from compiling the same file twice (once PIC and once not).
build:linux --force_pic
build:macos --force_pic
build:linux --copt="-fPIC"
build:macos --copt="-fPIC"

# TODO(mehrdadn): Revert the "-\\.(asm|S)$" exclusion when this Bazel bug
# Ignore warnings for protobuf generated files and external projects.
build --per_file_copt="\\.pb\\.cc$@-w"
build:linux --per_file_copt="-\\.(asm|S)$,external/.*@-w,-Wno-error=implicit-function-declaration"
Expand Down
1 change: 1 addition & 0 deletions build/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ JULIA_CORE_WORKER_COPTS = [
"-isystem external/julia",
"-Wno-unused-variable",
"-Wno-pessimizing-move",
# "-Wno-deprecated",
]

cc_binary(
Expand Down
22 changes: 20 additions & 2 deletions build/WORKSPACE.bazel.tpl
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,8 @@ new_local_repository(

# The code below is copied from the Ray project WORKSPACE file and just
# modifies labels starting with `//` to be `@com_github_ray_project_ray//`
# (e.g. `sed '\|//visibility|! s|"//|"@com_github_ray_project_ray//|g' WORKSPACE`)
# https://github.com/ray-project/ray/blob/ray-2.5.1/WORKSPACE#L4-L17
# (e.g. `sed 's|"//|"@com_github_ray_project_ray//|g' WORKSPACE`)
# https://github.com/ray-project/ray/blob/ray-2.31.0/WORKSPACE#L13-L26

load("@com_github_ray_project_ray//bazel:ray_deps_setup.bzl", "ray_deps_setup")

Expand All @@ -87,3 +87,21 @@ ray_deps_build_all()
load("@com_github_grpc_grpc//bazel:grpc_extra_deps.bzl", "grpc_extra_deps")

grpc_extra_deps()


# ---

http_archive(
name = "rules_python",
sha256 = "c68bdc4fbec25de5b5493b8819cfc877c4ea299c0dcb15c244c5a00208cde311",
strip_prefix = "rules_python-0.31.0",
url = "https://github.com/bazelbuild/rules_python/releases/download/0.31.0/rules_python-0.31.0.tar.gz",
)

load("@rules_python//python:repositories.bzl", "python_register_toolchains")

python_register_toolchains(
name = "python3_9",
python_version = "3.9",
register_toolchains = False,
)
2 changes: 1 addition & 1 deletion build/ray_commit
Original file line number Diff line number Diff line change
@@ -1 +1 @@
448a83caf44108fc1bc44fa7c6c358cffcfcb0d7
7660c2d020e30b6eee07a7fc2017a636b85c4672
64 changes: 42 additions & 22 deletions build/wrapper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ void initialize_worker(
options.metrics_agent_port = -1;
options.startup_token = startup_token;
options.runtime_env_hash = runtime_env_hash;
// https://github.com/ray-project/ray/blob/3bdcab68d49b74411144c61df8e64e7f291f92e2/src/ray/core_worker/core_worker_options.h#L35
options.task_execution_callback =
[task_executor](
const rpc::Address &caller_address,
Expand All @@ -94,13 +95,16 @@ void initialize_worker(
const std::string &serialized_retry_exception_allowlist,
std::vector<std::pair<ObjectID, std::shared_ptr<RayObject>>> *returns,
std::vector<std::pair<ObjectID, std::shared_ptr<RayObject>>> *dynamic_returns,
std::vector<std::pair<ObjectID, bool>> *streaming_generator_returns,
std::shared_ptr<LocalMemoryBuffer> &creation_task_exception_pb_bytes,
bool *is_retryable_error,
std::string *application_error,
const std::vector<ConcurrencyGroup> &defined_concurrency_groups,
const std::string name_of_concurrency_group_to_execute,
bool is_reattempt,
bool is_streaming_generator) {
bool is_streaming_generator,
bool retry_exception,
int64_t generator_backpressure_num_objects) {

std::vector<std::shared_ptr<RayObject>> return_vec;
task_executor(ray_function,
Expand Down Expand Up @@ -202,7 +206,7 @@ Status get(const ObjectID object_id, const int64_t timeout_ms, std::shared_ptr<R
// Retrieve our data from the object store
std::vector<ObjectID> get_obj_ids = {object_id};
std::vector<shared_ptr<RayObject>> result_vec;
auto status = worker.Get(get_obj_ids, timeout_ms, &result_vec);
auto status = worker.Get(get_obj_ids, timeout_ms, result_vec);
*result = result_vec[0];

// TODO (maybe?): allow multiple return values
Expand Down Expand Up @@ -284,7 +288,7 @@ std::string JuliaGcsClient::Get(const std::string &ns, const std::string &key) {
throw std::runtime_error("GCS client not initialized; did you forget to Connect?");
}
std::string value;
Status status = gcs_client_->InternalKV().Get(ns, key, value);
Status status = gcs_client_->InternalKV().Get(ns, key, -1, value);
if (!status.ok()) {
throw std::runtime_error(status.ToString());
}
Expand All @@ -299,7 +303,7 @@ bool JuliaGcsClient::Put(const std::string &ns,
throw std::runtime_error("GCS client not initialized; did you forget to Connect?");
}
bool added;
Status status = gcs_client_->InternalKV().Put(ns, key, value, overwrite, added);
Status status = gcs_client_->InternalKV().Put(ns, key, value, overwrite, -1, added);
if (!status.ok()) {
throw std::runtime_error(status.ToString());
}
Expand All @@ -311,7 +315,7 @@ std::vector<std::string> JuliaGcsClient::Keys(const std::string &ns, const std::
throw std::runtime_error("GCS client not initialized; did you forget to Connect?");
}
std::vector<std::string> results;
Status status = gcs_client_->InternalKV().Keys(ns, prefix, results);
Status status = gcs_client_->InternalKV().Keys(ns, prefix, -1, results);
if (!status.ok()) {
throw std::runtime_error(status.ToString());
}
Expand All @@ -322,7 +326,8 @@ void JuliaGcsClient::Del(const std::string &ns, const std::string &key, bool del
if (!gcs_client_) {
throw std::runtime_error("GCS client not initialized; did you forget to Connect?");
}
Status status = gcs_client_->InternalKV().Del(ns, key, del_by_prefix);
int num_deleted = 0;
Status status = gcs_client_->InternalKV().Del(ns, key, del_by_prefix, -1, num_deleted);
if (!status.ok()) {
throw std::runtime_error(status.ToString());
}
Expand All @@ -333,7 +338,7 @@ bool JuliaGcsClient::Exists(const std::string &ns, const std::string &key) {
throw std::runtime_error("GCS client not initialized; did you forget to Connect?");
}
bool exists;
Status status = gcs_client_->InternalKV().Exists(ns, key, exists);
Status status = gcs_client_->InternalKV().Exists(ns, key, -1, exists);
if (!status.ok()) {
throw std::runtime_error(status.ToString());
}
Expand Down Expand Up @@ -442,8 +447,17 @@ JLCXX_MODULE define_julia_module(jlcxx::Module& mod)
return keys;
});

// The version of Ray library used (matches the tag). Unable to use `set_const` on a
// `char[]` as this causes:
// ```
// C++ exception while wrapping module ray_julia_jll: Type A7_c has no Julia wrapper
// ERROR: LoadError: Type A7_c has no Julia wrapper
// ```
// https://github.com/ray-project/ray/blob/ray-2.31.0/src/ray/common/constants.h#L67
mod.method("RayVersion", []() { return kRayVersion; });

// enum StatusCode
// https://github.com/ray-project/ray/blob/ray-2.5.1/src/ray/common/status.h#L81
// https://github.com/ray-project/ray/blob/ray-2.31.0/src/ray/common/status.h#L82
mod.add_bits<ray::StatusCode>("StatusCode", jlcxx::julia_type("CppEnum"));
mod.set_const("OK", ray::StatusCode::OK);
mod.set_const("OutOfMemory", ray::StatusCode::OutOfMemory);
Expand All @@ -467,13 +481,13 @@ JLCXX_MODULE define_julia_module(jlcxx::Module& mod)
mod.set_const("ObjectAlreadySealed", ray::StatusCode::ObjectAlreadySealed);
mod.set_const("ObjectStoreFull", ray::StatusCode::ObjectStoreFull);
mod.set_const("TransientObjectStoreFull", ray::StatusCode::TransientObjectStoreFull);
mod.set_const("GrpcUnavailable", ray::StatusCode::GrpcUnavailable);
mod.set_const("GrpcUnknown", ray::StatusCode::GrpcUnknown);
mod.set_const("OutOfDisk", ray::StatusCode::OutOfDisk);
mod.set_const("ObjectUnknownOwner", ray::StatusCode::ObjectUnknownOwner);
mod.set_const("RpcError", ray::StatusCode::RpcError);
mod.set_const("OutOfResource", ray::StatusCode::OutOfResource);
mod.set_const("ObjectRefEndOfStream", ray::StatusCode::ObjectRefEndOfStream);
mod.set_const("AuthError", ray::StatusCode::AuthError);
mod.set_const("InvalidArgument", ray::StatusCode::InvalidArgument);

// class Status
// https://github.com/ray-project/ray/blob/ray-2.5.1/src/ray/common/status.h#L127
Expand All @@ -488,7 +502,7 @@ JLCXX_MODULE define_julia_module(jlcxx::Module& mod)
// abstract Julia type called `BaseID` to assist with dispatch.
// https://github.com/ray-project/ray/blob/ray-2.5.1/src/ray/common/id.h#L106

// https://github.com/ray-project/ray/blob/ray-2.5.1/src/ray/common/id.h#L261
// https://github.com/ray-project/ray/blob/ray-2.31.0/src/ray/common/id.h#L263
mod.method("ObjectIDSize", &ObjectID::Size);
mod.add_type<ObjectID>("ObjectID", jlcxx::julia_type("BaseID"))
.method("ObjectIDFromBinary", &ObjectID::FromBinary)
Expand All @@ -503,7 +517,7 @@ JLCXX_MODULE define_julia_module(jlcxx::Module& mod)
.method("IsNil", &ObjectID::IsNil);


// https://github.com/ray-project/ray/blob/ray-2.5.1/src/ray/common/id.h#L261
// https://github.com/ray-project/ray/blob/ray-2.31.0/src/ray/common/id.h#L108
mod.method("JobIDSize", &JobID::Size);
mod.add_type<JobID>("JobID", jlcxx::julia_type("BaseID"))
.method("JobIDFromBinary", &JobID::FromBinary)
Expand All @@ -518,7 +532,7 @@ JLCXX_MODULE define_julia_module(jlcxx::Module& mod)
.method("IsNil", &JobID::IsNil)
.method("ToInt", &JobID::ToInt);

// https://github.com/ray-project/ray/blob/ray-2.5.1/src/ray/common/id.h#L175
// https://github.com/ray-project/ray/blob/ray-2.31.0/src/ray/common/id.h#L177
mod.method("TaskIDSize", &TaskID::Size);
mod.add_type<TaskID>("TaskID", jlcxx::julia_type("BaseID"))
.method("TaskIDFromBinary", &TaskID::FromBinary)
Expand All @@ -531,7 +545,7 @@ JLCXX_MODULE define_julia_module(jlcxx::Module& mod)
.method("Hex", &TaskID::Hex)
.method("IsNil", &TaskID::IsNil);

// https://github.com/ray-project/ray/blob/ray-2.5.1/src/ray/common/id.h#L35
// https://github.com/ray-project/ray/blob/ray-2.31.0/src/ray/common/id.h#L35
mod.method("WorkerIDSize", &WorkerID::Size);
mod.add_type<WorkerID>("WorkerID", jlcxx::julia_type("BaseID"))
.method("WorkerIDFromBinary", &WorkerID::FromBinary)
Expand Down Expand Up @@ -561,23 +575,23 @@ JLCXX_MODULE define_julia_module(jlcxx::Module& mod)
mod.method("initialize_worker", &initialize_worker);

// enum Language
// https://github.com/beacon-biosignals/ray/blob/ray-2.5.1%2B1/src/ray/protobuf/common.proto#L25
// https://github.com/ray-project/ray/blob/ray-2.31.0/src/ray/protobuf/common.proto#L25
mod.add_bits<rpc::Language>("Language", jlcxx::julia_type("CppEnum"));
mod.set_const("PYTHON", rpc::Language::PYTHON);
mod.set_const("JAVA", rpc::Language::JAVA);
mod.set_const("CPP", rpc::Language::CPP);
mod.set_const("JULIA", rpc::Language::JULIA);
mod.set_const("JULIA", rpc::Language::JULIA); // Only defined in https://github.com/beacon-biosignals/ray

// enum WorkerType
// https://github.com/ray-project/ray/blob/ray-2.5.1/src/ray/protobuf/common.proto#L32
// https://github.com/ray-project/ray/blob/ray-2.31.0/src/ray/protobuf/common.proto#L32
mod.add_bits<rpc::WorkerType>("WorkerType", jlcxx::julia_type("CppEnum"));
mod.set_const("WORKER", rpc::WorkerType::WORKER);
mod.set_const("DRIVER", rpc::WorkerType::DRIVER);
mod.set_const("SPILL_WORKER", rpc::WorkerType::SPILL_WORKER);
mod.set_const("RESTORE_WORKER", rpc::WorkerType::RESTORE_WORKER);

// enum ErrorType
// https://github.com/ray-project/ray/blob/ray-2.5.1/src/ray/protobuf/common.proto#L142
// https://github.com/ray-project/ray/blob/ray-2.31.0/src/ray/protobuf/common.proto#L181
mod.add_bits<rpc::ErrorType>("ErrorType", jlcxx::julia_type("CppEnum"));
mod.set_const("WORKER_DIED", rpc::ErrorType::WORKER_DIED);
mod.set_const("ACTOR_DIED", rpc::ErrorType::ACTOR_DIED);
Expand All @@ -603,6 +617,8 @@ JLCXX_MODULE define_julia_module(jlcxx::Module& mod)
mod.set_const("OBJECT_FREED", rpc::ErrorType::OBJECT_FREED);
mod.set_const("OUT_OF_MEMORY", rpc::ErrorType::OUT_OF_MEMORY);
mod.set_const("NODE_DIED", rpc::ErrorType::NODE_DIED);
mod.set_const("END_OF_STREAMING_GENERATOR", rpc::ErrorType::END_OF_STREAMING_GENERATOR);
mod.set_const("ACTOR_UNAVAILABLE", rpc::ErrorType::ACTOR_UNAVAILABLE);

// Needed by FunctionDescriptorInterface
mod.add_bits<ray::rpc::FunctionDescriptor::FunctionDescriptorCase>("FunctionDescriptorCase");
Expand Down Expand Up @@ -634,15 +650,15 @@ JLCXX_MODULE define_julia_module(jlcxx::Module& mod)
mod.method("CallString", &CallString);

// class RayFunction
// https://github.com/ray-project/ray/blob/ray-2.5.1/src/ray/core_worker/common.h#L46
// https://github.com/ray-project/ray/blob/ray-2.31.0/src/ray/core_worker/common.h#L46
mod.add_type<RayFunction>("RayFunction")
.constructor<>()
.constructor<Language, const FunctionDescriptor &>()
.method("GetLanguage", &RayFunction::GetLanguage)
.method("GetFunctionDescriptor", &RayFunction::GetFunctionDescriptor);

// class Buffer
// https://github.com/ray-project/ray/blob/ray-2.5.1/src/ray/common/buffer.h
// https://github.com/ray-project/ray/blob/ray-2.31.0/src/ray/common/buffer.h
mod.add_type<Buffer>("Buffer")
.method("Data", &Buffer::Data)
.method("Size", &Buffer::Size)
Expand All @@ -668,7 +684,11 @@ JLCXX_MODULE define_julia_module(jlcxx::Module& mod)
// https://protobuf.dev/reference/cpp/api-docs/google.protobuf.message_lite/
mod.add_type<google::protobuf::Message>("Message")
.method("SerializeAsString", &google::protobuf::Message::SerializeAsString)
.method("ParseFromString", &google::protobuf::Message::ParseFromString);
// TODO: Work around CxxWrap.jl not currently wrapping `std::basic_string_view`
// .method("ParseFromString", &google::protobuf::Message::ParseFromString);
.method("ParseFromString", [](google::protobuf::Message &message, const std::string data) {
return message.ParseFromString(data);
});

// https://protobuf.dev/reference/cpp/api-docs/google.protobuf.util.json_util/
mod.method("JsonStringToMessage", [](const std::string json, google::protobuf::Message *message) {
Expand All @@ -681,7 +701,7 @@ JLCXX_MODULE define_julia_module(jlcxx::Module& mod)
});

// message Address
// https://github.com/ray-project/ray/blob/ray-2.5.1/src/ray/protobuf/common.proto#L86
// https://github.com/ray-project/ray/blob/ray-2.31.0/src/ray/protobuf/common.proto#L125
mod.add_type<rpc::Address>("Address", jlcxx::julia_base_type<google::protobuf::Message>())
.constructor<>()
.method("raylet_id", &rpc::Address::raylet_id)
Expand Down
2 changes: 1 addition & 1 deletion docs/src/developer-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ For those wanting to contribute to Ray.jl this guide will assist you in creating
Building the Ray.jl project requires the following tools to be installed. This list is provided for informational purposes and typically users should follow the platform specific install sections.

- [Julia](https://julialang.org/downloads/) version ≥ `v1.8`
- Python version ≥ `v3.7`
- Python version ≥ `v3.9`
- [Python venv](https://docs.python.org/3/library/venv.html)
- [Bazelisk](https://github.com/bazelbuild/bazelisk)
- GCC / G++ ≥ `v9`
Expand Down
2 changes: 2 additions & 0 deletions src/Ray.jl
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ export ActorPlacementGroupRemoved, ActorUnschedulableError, LocalRayletDiedError
include(joinpath("ray_julia_jll", "ray_julia_jll.jl"))
using .ray_julia_jll: ray_julia_jll, ray_julia_jll as ray_jll

const RAY_VERSION = parse(VersionNumber, unsafe_string(ray_jll.RayVersion()))

include("constants.jl")
include("function_manager.jl")
include("runtime_env.jl")
Expand Down
10 changes: 6 additions & 4 deletions src/ray_julia_jll/common.jl
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@ const STATUS_CODE_SYMBOLS = (:OK,
:ObjectAlreadySealed,
:ObjectStoreFull,
:TransientObjectStoreFull,
:GrpcUnavailable,
:GrpcUnknown,
:OutOfDisk,
:ObjectUnknownOwner,
:RpcError,
:OutOfResource,
:ObjectRefEndOfStream)
:ObjectRefEndOfStream,
:AuthError,
:InvalidArgument)

const LANGUAGE_SYMBOLS = (:PYTHON, :JAVA, :CPP, :JULIA)
const WORKER_TYPE_SYMBOLS = (:WORKER, :DRIVER, :SPILL_WORKER, :RESTORE_WORKER)
Expand Down Expand Up @@ -54,7 +54,9 @@ const ERROR_TYPE_SYMBOLS = (:WORKER_DIED,
:OUT_OF_DISK_ERROR,
:OBJECT_FREED,
:OUT_OF_MEMORY,
:NODE_DIED)
:NODE_DIED,
:END_OF_STREAMING_GENERATOR,
:ACTOR_UNAVAILABLE)

# Generate the following methods for our wrapped enum types:
# - A constructor allowing you to create a value via a `Symbol` (e.g. `StatusCode(:OK)`).
Expand Down
Loading