Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor CLS code #76

Draft
wants to merge 13 commits into
base: inmemoryfragment
Choose a base branch
from
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -68,3 +68,5 @@ GTAGS
\#*\#

skyhookdm-ceph/
test-cluster/
.vscode/
4 changes: 2 additions & 2 deletions .popper.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,6 @@ steps:
runs: [bash, -euxc]
args:
- |
cp build/lib/libcls_cls_sdk.so* /usr/lib64/rados-classes/
cp build/lib/libcls_arrow.so* /usr/lib64/rados-classes/
scripts/micro-osd.sh test-cluster /etc/ceph
build/bin/cls_sdk_test
build/bin/cls_arrow_test
11 changes: 11 additions & 0 deletions scripts/build-and-test.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
#!/bin/bash

mkdir -p build/
cd build/
cmake ..
make VERBOSE=1
cd ..

cp build/lib/libcls_arrow.so* /usr/lib64/rados-classes/
scripts/micro-osd.sh test-cluster /etc/ceph
build/bin/cls_arrow_test
2 changes: 1 addition & 1 deletion scripts/micro-osd.sh
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ osd data = ${OSD_DATA}
osd journal = ${OSD_DATA}.journal
osd journal size = 100
osd objectstore = memstore
osd class load list = lock log numops refcount replica_log statelog timeindex user version cls_sdk
osd class load list = lock log numops refcount replica_log statelog timeindex user version arrow
EOF

OSD_ID=$(ceph osd create)
Expand Down
23 changes: 13 additions & 10 deletions src/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,35 +1,38 @@
set(cls_dir ${CMAKE_INSTALL_LIBDIR}/rados-classes)

# cls_sdk
add_library(cls_cls_sdk SHARED cls_sdk.cc arrow_utils.cc)
set_target_properties(cls_cls_sdk PROPERTIES
# cls_arrow
add_library(cls_arrow SHARED cls_arrow.cc cls_arrow_utils.cc)
set_target_properties(cls_arrow PROPERTIES
VERSION "1.0.0"
SOVERSION "1"
INSTALL_RPATH ""
CXX_VISIBILITY_PRESET hidden
)
target_link_libraries(cls_cls_sdk
target_link_libraries(cls_arrow
arrow
arrow_dataset
)

install(TARGETS cls_cls_sdk DESTINATION ${cls_dir})
install(TARGETS cls_arrow DESTINATION ${cls_dir})

# cls_sdk_test
add_executable(cls_sdk_test
cls_sdk_test.cc
# cls_arrow_test
add_executable(cls_arrow_test
cls_arrow_test.cc
test_utils.cc
cls_arrow_utils.cc
)

target_link_libraries(cls_sdk_test
target_link_libraries(cls_arrow_test
arrow
arrow_dataset
${LIBRADOS_LIBRARIES}
${GTEST_MAIN_LIBRARIES}
${GTEST_INCLUDE_DIRS}
${GTEST_LIBRARIES}
)

install(TARGETS
cls_sdk_test
cls_arrow_test
RUNTIME DESTINATION bin
DESTINATION ${CMAKE_INSTALL_BINDIR}
)
170 changes: 0 additions & 170 deletions src/arrow_utils.cc

This file was deleted.

12 changes: 0 additions & 12 deletions src/arrow_utils.h

This file was deleted.

98 changes: 98 additions & 0 deletions src/cls_arrow.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
#include "rados/objclass.h"
#include "cls_arrow_utils.h"

#include <arrow/api.h>
#include <arrow/dataset/api.h>
#include <arrow/io/api.h>
#include <arrow/ipc/api.h>

CLS_VER(1, 0)
CLS_NAME(arrow)

cls_handle_t h_class;
cls_method_handle_t h_read;
cls_method_handle_t h_write;

static int write(cls_method_context_t hctx, ceph::buffer::list *in, ceph::buffer::list *out) {
int ret;

CLS_LOG(0, "create an object");
ret = cls_cxx_create(hctx, false);
if (ret < 0) {
CLS_ERR("ERROR: failed to create an object");
return ret;
}

CLS_LOG(0, "write data into the object");
ret = cls_cxx_write(hctx, 0, in->length(), in);
if (ret < 0) {
CLS_ERR("ERROR: failed to write to object");
return ret;
}

return 0;
}

static int read(cls_method_context_t hctx, ceph::buffer::list *in, ceph::buffer::list *out) {
int ret;
arrow::Status arrow_ret;

CLS_LOG(0, "deserializing scan request from the [in] bufferlist");
std::shared_ptr<arrow::dataset::Expression> filter;
std::shared_ptr<arrow::Schema> schema;
arrow_ret = deserialize_scan_request_from_bufferlist(&filter, &schema, *in);
if (!arrow_ret.ok()) {
CLS_ERR("ERROR: failed to extract expression and schema");
return -1;
}

CLS_LOG(0, "reading the the entire object into a bufferlist");
ceph::buffer::list bl;
ret = cls_cxx_read(hctx, 0, 0, &bl);
if (ret < 0) {
CLS_ERR("ERROR: failed to read an object");
return ret;
}

CLS_LOG(0, "reading the vector of record batches from the bufferlist");
arrow::RecordBatchVector batches;
arrow_ret = extract_batches_from_bufferlist(&batches, bl);
if (!arrow_ret.ok()) {
CLS_ERR("ERROR: failed to extract record batch vector from bufferlist");
return -1;
}

CLS_LOG(0, "applying scan operations over the vector of record batches");
std::shared_ptr<arrow::Table> result_table;
arrow_ret = scan_batches(filter, schema, batches, &result_table);
if (!arrow_ret.ok()) {
CLS_ERR("ERROR: failed to scan vector of record batches");
return -1;
}

CLS_LOG(0, "writing the resultant table into the [out] bufferlist");
ceph::buffer::list result_bl;
arrow_ret = write_table_to_bufferlist(result_table, result_bl);
if (!arrow_ret.ok()) {
CLS_ERR("ERROR: failed to write table to bufferlist");
return -1;
}
*out = result_bl;

return 0;
}

CLS_INIT(arrow)
{
CLS_LOG(0, "loading cls_arrow");

cls_register("arrow", &h_class);

cls_register_cxx_method(h_class, "read",
CLS_METHOD_RD | CLS_METHOD_WR, read,
&h_read);

cls_register_cxx_method(h_class, "write",
CLS_METHOD_RD | CLS_METHOD_WR, write,
&h_write);
}
Loading