Skip to content

Commit

Permalink
#14 refactor DolphinDB interface functions
Browse files Browse the repository at this point in the history
  - detect potential problems with variadic function calls
  - avoided potential memory leak
  • Loading branch information
Freddie Wu committed Aug 17, 2023
1 parent 80e0a80 commit 3029d23
Show file tree
Hide file tree
Showing 2 changed files with 212 additions and 161 deletions.
297 changes: 171 additions & 126 deletions kdb/src/kdb.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,13 @@
#include "Util.h"
#include "kdb.h"

#define PLUGIN_NAME "[PLUGIN::KDB]"

using namespace std;

#define PLUGIN_NAME "[PLUGIN::KDB]"
////////////////////////////////////////////////////////////////////////////////
// Constants

const char PATH_SEP = '/';

// parameters for gzip process
Expand Down Expand Up @@ -50,28 +54,104 @@ const int KDB_DATE_GAP = 10957;
const long long KDB_DATETIME_GAP = KDB_DATE_GAP * SEC_PER_DAY * 1000LL;
const long long KDB_NANOTIMESTAMP_GAP = KDB_DATETIME_GAP * 1000000LL;

static Mutex LOCK_KDB;
////////////////////////////////////////////////////////////////////////////////
// DolphinDB Interops
namespace /*anonymous*/ {

Mutex LOCK_KDB;

int arg2Int(const ConstantSP& arg, const char* argName = nullptr,
const string& usage = "", const char* caller = nullptr
) {
assert(arg.get());
if(arg->getType() != DT_INT) {
const auto ref = (caller == nullptr) ? __FUNCTION__ : caller;
const auto var = (argName == nullptr) ? "Arg" : argName;
throw IllegalArgumentException(ref,
usage + PLUGIN_NAME ": " + var + " should be an integer.");
}
return arg->getInt();
}

string arg2String(const ConstantSP& arg, const char* argName = nullptr,
const string& usage = "", const char* caller = nullptr
) {
assert(arg.get());
if(arg->getType() != DT_STRING) {
const auto ref = (caller == nullptr) ? __FUNCTION__ : caller;
const auto var = (argName == nullptr) ? "Arg" : argName;
throw IllegalArgumentException(ref,
usage + PLUGIN_NAME ": " + var + " should be a string.");
}
return arg->getString();
}

ConstantSP safeOp(const ConstantSP &arg, std::function<ConstantSP(Connection *)> &&f) {
if(arg->getType() == DT_RESOURCE) {
Connection* arg2Connection(const ConstantSP& arg,
const string& usage = "", const char* caller = nullptr
) {
assert(arg.get());
const auto ref = (caller == nullptr) ? __FUNCTION__ : caller;
if(arg->getType() != DT_RESOURCE) {
throw IllegalArgumentException(ref,
usage + PLUGIN_NAME ": Invalid connection object.");
}
const string desc = arg->getString();
if(desc.find("kdb+ connection") == desc.npos) {
throw IllegalArgumentException(__FUNCTION__, PLUGIN_NAME ": Invalid kdb+ connection object.");
if(desc.find(Connection::marker) != desc.npos) {
throw IllegalArgumentException(ref,
usage + PLUGIN_NAME ": Invalid kdb+ connection object.");
}
const auto conn = reinterpret_cast<Connection *>(arg->getLong());
Connection* conn = reinterpret_cast<Connection*>(arg->getLong());
return conn;
}

string& normalizePath(string& path) {
# ifdef WINDOWS
// Replace backward slash with forward slash
path = Util::replace(path, '\\', PATH_SEP);
# endif
while(!path.empty() && path.back() == PATH_SEP) {
path.pop_back();
}
return path;
}

ConstantSP safeOp(const ConstantSP &arg, std::function<ConstantSP(Connection *)> &&f) {
Connection* conn = arg2Connection(arg, "", __FUNCTION__);
if(conn != nullptr) {
return f(conn);
} else {
throw IllegalArgumentException(__FUNCTION__,
PLUGIN_NAME ": Connection object already closed.");
}
}
throw IllegalArgumentException(__FUNCTION__, PLUGIN_NAME ": Invalid connection object.");
}

void KDeleter::operator()(K k) const {
if(k) r0(k);
}
void kdbConnectionOnClose(Heap *heap, vector<ConstantSP> &args) {
assert(args.size() >= 1);

// Use unique_ptr<> to manage conn until it is reset.
unique_ptr<Connection> conn;
try {
conn.reset(arg2Connection(args[0], __FUNCTION__));
} catch(IllegalArgumentException& iae) {
throw RuntimeException(iae.what());
}

if(conn) {
conn.reset();
args[0]->setLong(0);
}
}

}//namespace /*anonymouse*/

////////////////////////////////////////////////////////////////////////////////
// kdb+ Data Parsers
namespace /*anonymous*/ {

S KDB_S(const string &str) {
return const_cast<S>(str.c_str());
}

bool KDB_validList(const K k) {
return k && k->n >= 0 && k->t >= KDB_LIST && kG(k);
}
Expand Down Expand Up @@ -340,14 +420,15 @@ namespace /*anonymous*/ {

}//namespace /*anonymous*/

static void kdbConnectionOnClose(Heap *heap, vector<ConstantSP> &args) {
const auto conn = reinterpret_cast<Connection*>(args[0]->getLong());
if(conn != nullptr) {
delete conn;
args[0]->setLong(0);
}
////////////////////////////////////////////////////////////////////////////////
// Class Implementations

void KDeleter::operator()(K k) const {
if(k) r0(k);
}

const char* const Connection::marker = "kdb+ connection";

Connection::Connection(const string& hostStr, const int port, const string& usernamePassword): host_(hostStr), port_(port) {
const auto handle = khpunc(KDB_S(hostStr), port, KDB_S(usernamePassword), KDB_TIMEOUT, KDB_CAPABILITY);

Expand Down Expand Up @@ -506,6 +587,9 @@ string Connection::str() const {
return host_ + ":" + to_string(port_);
}

////////////////////////////////////////////////////////////////////////////////
// ??????

/*
* we can't get endian info from binary file
* the machine that persisted kdb+ file and the machine that load file into DolphinDB
Expand Down Expand Up @@ -1287,8 +1371,8 @@ void run() override {
using GetColRunnableSP = SmartPointer<GetColRunnable>;

TableSP loadSplayedTable(string tablePath, vector<string>& symList, string symName) {
if(tablePath.size() > 0 && tablePath[tablePath.size()-1] != '/') {
tablePath.push_back('/');
if(tablePath.size() > 0 && tablePath[tablePath.size()-1] != PATH_SEP) {
tablePath.push_back(PATH_SEP);
}
// read .d file, get column names
string dotD = tablePath + ".d";
Expand Down Expand Up @@ -1395,144 +1479,105 @@ TableSP loadSplayedTable(string tablePath, vector<string>& symList, string symNa
return table;
}

////////////////////////////////////////////////////////////////////////////////
// DolphinDB Interface

ConstantSP kdbConnect(Heap *heap, vector<ConstantSP> &args){
const auto usage = string("Usage: connect(host, port, usernamePassword). ");
if(args[0]->getType() != DT_STRING){
throw IllegalArgumentException(__FUNCTION__, usage + "[PLUGIN::KDB]: host should be a string.");
}
string hostStr = args[0]->getString();
const string usage = "Usage: connect(host, port, usernamePassword). ";
assert(args.size() >= 3-1);

if(args[1]->getType() != DT_INT){
throw IllegalArgumentException(__FUNCTION__, usage + "[PLUGIN::KDB]: port should be a integer.");
}
I port = args[1]->getInt();
const string hostStr = arg2String(args[0], "host", usage, __FUNCTION__);
const int port = arg2Int(args[1], "port", usage, __FUNCTION__);

string usrStr = "";
if(args.size() == 3) {
if(args[2]->getType() != DT_STRING){
throw IllegalArgumentException(__FUNCTION__, usage + "[PLUGIN::KDB]: usernamePassword should be a string.");
}
usrStr = args[2]->getString();
if(args.size() >= 3) {
usrStr = arg2String(args[2], "usernamePassword", usage, __FUNCTION__);
}

Connection* cup = new Connection(hostStr, port, usrStr);
string desc = "kdb+ connection to [" + cup->str() + "]";
FunctionDefSP onClose(Util::createSystemProcedure("kdb+ connection onClose()", kdbConnectionOnClose, 1, 1));
return Util::createResource((long long)cup, desc, onClose, heap->currentSession());
// Use unique_ptr<> to manage cup until Util::createResource() takes over.
/*
//FIXME: make_unique<>() is available only after C++14...
auto cup = make_unique<Connection>(hostStr, port, usrStr);
/*/
unique_ptr<Connection> cup{new Connection(hostStr, port, usrStr)};
//*/
const string desc = Connection::marker + (" to [" + cup->str() + ']');
FunctionDefSP onClose{
Util::createSystemProcedure("kdb+ connection onClose()", &kdbConnectionOnClose, 1, 1)
};
// FIXME: Still not quite safe!
// If Util::createResource throws, cup will be dangling forever.
static_assert(sizeof(long long) >= sizeof(Connection*),
"ensure enough space to store the pointer");
return Util::createResource(
reinterpret_cast<long long>(cup.release()), desc, onClose, heap->currentSession());
}

ConstantSP kdbLoadTable(Heap *heap, vector<ConstantSP> &args){
const auto usage = string("Usage: loadTable(handle, tablePath, symPath). ");
const string usage = "Usage: loadTable(handle, tablePath, symPath). ";
assert(args.size() >= 3-1);

if(args[1]->getType() != DT_STRING) {
throw IllegalArgumentException(__FUNCTION__, usage + "[PLUGIN::KDB]: tablePath should be a string.");
}
string tablePath = args[1]->getString();
#ifdef WINDOWS
//replace forward slash with back slash
tablePath = Util::replace(tablePath, '\\', '/');
#endif
while(tablePath.size() > 0 && tablePath.back() == '/') {
tablePath.pop_back();
}
/* if user use loadTable, file may not be at same machine with dolphindb
* so the existence verification is useless
*/
// if(!Util::exists(tablePath) && !Util::existsDir(tablePath)) {
// throw IllegalArgumentException(__FUNCTION__, usage + "[PLUGIN::KDB]: tablePath [" + tablePath + "] doesn't exist.");
// }
string tablePath = arg2String(args[1], "tablePath", usage, __FUNCTION__);
tablePath = normalizePath(tablePath);

string symFilePath = "";
if(args.size() == 3) {
if(args[2]->getType() != DT_STRING) {
throw IllegalArgumentException(__FUNCTION__, usage + "[PLUGIN::KDB]: symPath should be a string.");
}
symFilePath = args[2]->getString();
}
#ifdef WINDOWS
//replace forward slash with back slash
symFilePath = Util::replace(symFilePath, '\\', '/');
#endif
while(symFilePath.size() > 0 && symFilePath.back() == '/') {
symFilePath.pop_back();
if(args.size() >= 3) {
symFilePath = arg2String(args[2], "symPath", usage, __FUNCTION__);
}
// if(symFilePath!= "" && !Util::exists(symFilePath)) {
// if(Util::existsDir(symFilePath)) {
// throw IllegalArgumentException(__FUNCTION__, usage + "[PLUGIN::KDB]: symPath [" +
// symFilePath + "] should be a single file, not a directory.");
// }
// throw IllegalArgumentException(__FUNCTION__, usage + "[PLUGIN::KDB]: symPath [" + symFilePath + "] doesn't exist.");
// }
return safeOp(args[0], [&](Connection *conn) {return conn->getTable(tablePath, symFilePath);});
symFilePath = normalizePath(symFilePath);

return safeOp(args[0],
[&](Connection *conn) { return conn->getTable(tablePath, symFilePath); }
);
}

ConstantSP kdbLoadFile(Heap *heap, vector<ConstantSP> &args){
const auto usage = string("Usage: loadFile(tablePath, symPath). ");
if(args[0]->getType() != DT_STRING) {
throw IllegalArgumentException(__FUNCTION__, usage + "[PLUGIN::KDB]: tablePath should be a string.");
}
string tablePath = args[0]->getString();
#ifdef WINDOWS
//replace forward slash with back slash
tablePath = Util::replace(tablePath, '\\', '/');
#endif
while(tablePath.size() > 0 && tablePath.back() == '/') {
tablePath.pop_back();
}
if(!Util::exists(tablePath) && !Util::existsDir(tablePath)) {
throw IllegalArgumentException(__FUNCTION__, usage + "[PLUGIN::KDB]: tablePath [" + tablePath + "] doesn't exist.");
assert(args.size() >= 2-1);

string tablePath = arg2String(args[0], "tablePath", usage, __FUNCTION__);
tablePath = normalizePath(tablePath);
if(!(Util::exists(tablePath) || Util::existsDir(tablePath))) {
throw IllegalArgumentException(__FUNCTION__,
usage + PLUGIN_NAME ": tablePath [" + tablePath + "] does not exist.");
}

string symFilePath = "";
if(args.size() == 2) {
if(args[1]->getType() != DT_STRING) {
throw IllegalArgumentException(__FUNCTION__, usage + "[PLUGIN::KDB]: symPath should be a string.");
}
symFilePath = args[1]->getString();
if(args.size() >= 2) {
symFilePath = arg2String(args[1], "symPath", usage, __FUNCTION__);
}
#ifdef WINDOWS
//replace forward slash with back slash
symFilePath = Util::replace(symFilePath, '\\', '/');
#endif
while(symFilePath.size() > 0 && symFilePath.back() == '/') {
symFilePath.pop_back();
}
if(symFilePath!= "" && !Util::exists(symFilePath)) {
if(Util::existsDir(symFilePath)) {
throw IllegalArgumentException(__FUNCTION__, usage + "[PLUGIN::KDB]: symPath [" +
symFilePath + "] should be a single file, not a directory.");
}
throw IllegalArgumentException(__FUNCTION__, usage + "[PLUGIN::KDB]: symPath [" + symFilePath + "] doesn't exist.");
symFilePath = normalizePath(symFilePath);
if(!(symFilePath.empty() || Util::exists(symFilePath))) {
const char* extra = Util::existsDir(symFilePath)
? "should be a file, not a directory"
: "does not exist";
throw IllegalArgumentException(__FUNCTION__,
usage + PLUGIN_NAME ": symPath [" + symFilePath + "] " + extra + '.');
}

vector<string> symList;
if(symFilePath.size() > 0) {
if(!symFilePath.empty()) {
symList = loadSym(symFilePath);
}

vector<string> fields;
Util::split(symFilePath, '/', fields);
Util::split(symFilePath, PATH_SEP, fields);
string symName = "";
if(!(fields.size() == 0)) {
if(!fields.empty()) {
symName = fields.back();
}

return loadSplayedTable(tablePath, symList, symName);
}

ConstantSP kdbClose(Heap *heap, vector<ConstantSP> &args){
const auto usage = string("Usage: close(handle). ");
if (args[0]->getType() == DT_RESOURCE) {
string desc = args[0]->getString();
if(desc.find("kdb+ connection") == desc.npos) {
throw IllegalArgumentException(__FUNCTION__, "[PLUGIN::KDB]: Invalid kdb+ connection object.");
}
Connection* conn = (Connection*)(args[0]->getLong());
if(conn!= nullptr) {
delete conn;
args[0]->setLong(0);
} else {
throw IllegalArgumentException(__FUNCTION__, "[PLUGIN::KDB]: Invalid connection object.");
}
} else {
throw IllegalArgumentException(__FUNCTION__, "[PLUGIN::KDB]: Invalid connection object.");
ConstantSP kdbClose(Heap *heap, vector<ConstantSP> &args) {
const string usage = "Usage: close(handle). ";
assert(args.size() >= 1);

auto conn = arg2Connection(args[0], usage, __FUNCTION__);
if(conn != nullptr) {
delete conn;
args[0]->setLong(0);
}
return new Void();
}
Loading

0 comments on commit 3029d23

Please sign in to comment.