Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RESP2REDIS: Should not fail trying to load empty module #53

Merged
merged 3 commits into from
Aug 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion api/librdb-ext-api.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ typedef enum {
RDBX_ERR_RESP2REDIS_FAILED_WRITE,
RDBX_ERR_RESP2REDIS_CONN_CLOSE,
RDBX_ERR_RESP2REDIS_MAX_RETRIES,
RDBX_ERR_RESP2REDIS_SET_TIMEOUT,
} RdbxRes;

/****************************************************************
Expand Down Expand Up @@ -198,14 +199,21 @@ _LIBRDB_API RdbxToResp *RDBX_createHandlersToResp(RdbParser *, RdbxToRespConf *)
* <user-defined-writer>
****************************************************************/

/* On start command pass command info. NULL otherwise. */
/* As streaming RESP protocol, when starting a new command, provide details
* about the command. Otherwise, pass NULL. This information will be used to log
* and report the command in case of a failure from Redis server. */
typedef struct RdbxRespWriterStartCmd {
/* Redis Command name (Ex: "SET", "RESTORE"). Owned by the caller. It is
* constant static string and Valid for ref behind the duration of the call. */
const char *cmd;

/* If key available as part of command. Else empty string.
* Owned by the caller. */
const char *key;

/* On restore command, size of serialized data. Otherwise, set to 0. */
size_t restoreSize;

} RdbxRespWriterStartCmd;

typedef struct RdbxRespWriter {
Expand Down
72 changes: 19 additions & 53 deletions src/ext/handlersToResp.c
Original file line number Diff line number Diff line change
Expand Up @@ -225,9 +225,7 @@ static inline RdbRes onWriteNewCmdDbg(RdbxToResp *ctx) {
if (ctx->debug.flags & RFLAG_ENUM_CMD_ID) {
char keyLenStr[32], cmdIdLenStr[32], cmdIdStr[32];

RdbxRespWriterStartCmd startCmd;
startCmd.cmd = "SET";
startCmd.key = KEY_CMD_ID_DBG;
RdbxRespWriterStartCmd startCmd = {"SET", KEY_CMD_ID_DBG, 0};

struct iovec iov[7];
/* write SET */
Expand Down Expand Up @@ -296,9 +294,7 @@ static inline RdbRes sendFirstRestoreFrag(RdbxToResp *ctx, RdbBulk frag, size_t
if (ctx->keyCtx.delBeforeWrite == DEL_KEY_BEFORE_BY_RESTORE_REPLACE)
extra_args++;

RdbxRespWriterStartCmd startCmd;
startCmd.cmd = "RESTORE";
startCmd.key = ctx->keyCtx.key;
RdbxRespWriterStartCmd startCmd = {"RESTORE", ctx->keyCtx.key, ctx->restoreCtx.restoreSize};

/* writev RESTORE */
char cmd[64];
Expand Down Expand Up @@ -326,9 +322,7 @@ static inline RdbRes sendFirstRestoreFragModuleAux(RdbxToResp *ctx, RdbBulk frag
struct iovec iov[3];
char lenStr[32];

RdbxRespWriterStartCmd startCmd;
startCmd.cmd = "RESTOREMODAUX";
startCmd.key = "";
RdbxRespWriterStartCmd startCmd = {"RESTOREMODAUX", "", ctx->restoreCtx.restoreSize};

/* writev RESTOREMODAUX */
iov[0].iov_base = ctx->restoreCtx.moduleAux.cmdPrefix;
Expand All @@ -354,9 +348,7 @@ static RdbRes toRespNewDb(RdbParser *p, void *userData, int dbid) {

int cnt = ll2string(dbidStr, sizeof(dbidStr), dbid);

RdbxRespWriterStartCmd startCmd;
startCmd.cmd = "SELECT";
startCmd.key = "";
RdbxRespWriterStartCmd startCmd = {"SELECT", "", 0};

IOV_CONST(&iov[0], "*2\r\n$6\r\nSELECT");
IOV_LENGTH(&iov[1], cnt, cntStr);
Expand Down Expand Up @@ -394,9 +386,7 @@ static RdbRes toRespNewKey(RdbParser *p, void *userData, RdbBulk key, RdbKeyInfo
struct iovec iov[4];
char keyLenStr[32];

RdbxRespWriterStartCmd startCmd;
startCmd.cmd = "DEL";
startCmd.key = ctx->keyCtx.key;
RdbxRespWriterStartCmd startCmd = {"DEL", ctx->keyCtx.key, 0};

IOV_CONST(&iov[0], "*2\r\n$3\r\nDEL");
IOV_LENGTH(&iov[1], ctx->keyCtx.keyLen, keyLenStr);
Expand All @@ -415,9 +405,7 @@ static RdbRes toRespEndKey(RdbParser *p, void *userData) {
/* key is in db. Set its expiration time */
if (ctx->keyCtx.info.expiretime != -1) {
struct iovec iov[6];
RdbxRespWriterStartCmd startCmd;
startCmd.cmd = "PEXPIREAT";
startCmd.key = ctx->keyCtx.key;
RdbxRespWriterStartCmd startCmd = {"PEXPIREAT", ctx->keyCtx.key, 0};

char keyLenStr[32], expireLenStr[32], expireStr[32];
/* PEXPIREAT */
Expand Down Expand Up @@ -448,9 +436,7 @@ static RdbRes toRespString(RdbParser *p, void *userData, RdbBulk string) {

struct iovec iov[7];

RdbxRespWriterStartCmd startCmd;
startCmd.cmd = "SET";
startCmd.key = ctx->keyCtx.key;
RdbxRespWriterStartCmd startCmd = {"SET", ctx->keyCtx.key, 0};

/* write SET */
IOV_CONST(&iov[0], "*3\r\n$3\r\nSET");
Expand All @@ -473,9 +459,7 @@ static RdbRes toRespList(RdbParser *p, void *userData, RdbBulk item) {
char keyLenStr[32], valLenStr[32];
int valLen = RDB_bulkLen(p, item);

RdbxRespWriterStartCmd startCmd;
startCmd.cmd = "RPUSH";
startCmd.key = ctx->keyCtx.key;
RdbxRespWriterStartCmd startCmd = {"RPUSH", ctx->keyCtx.key, 0};

/* write RPUSH */
IOV_CONST(&iov[0], "*3\r\n$5\r\nRPUSH");
Expand All @@ -500,9 +484,7 @@ static RdbRes toRespHash(RdbParser *p, void *userData, RdbBulk field, RdbBulk va
int fieldLen = RDB_bulkLen(p, field);
int valueLen = RDB_bulkLen(p, value);

RdbxRespWriterStartCmd hsetCmd;
hsetCmd.cmd = "HSET";
hsetCmd.key = ctx->keyCtx.key;
RdbxRespWriterStartCmd hsetCmd = {"HSET", ctx->keyCtx.key, 0};

/* write RPUSH */
IOV_CONST(&iov[0], "*4\r\n$4\r\nHSET");
Expand All @@ -520,9 +502,8 @@ static RdbRes toRespHash(RdbParser *p, void *userData, RdbBulk field, RdbBulk va

if (expireAt == -1) return RDB_OK;

RdbxRespWriterStartCmd hpexpireatCmd;
hpexpireatCmd.cmd = "HPEXPIREAT";
hpexpireatCmd.key = ctx->keyCtx.key;
RdbxRespWriterStartCmd hpexpireatCmd = {"HPEXPIREAT", ctx->keyCtx.key, 0};

/* write HPEXPIREAT */
IOV_CONST(&iov[0], "*6\r\n$10\r\nHPEXPIREAT");
/* write key */
Expand All @@ -545,9 +526,7 @@ static RdbRes toRespSet(RdbParser *p, void *userData, RdbBulk member) {

int valLen = RDB_bulkLen(p, member);

RdbxRespWriterStartCmd startCmd;
startCmd.cmd = "SADD";
startCmd.key = ctx->keyCtx.key;
RdbxRespWriterStartCmd startCmd = {"SADD", ctx->keyCtx.key, 0};

/* write RPUSH */
IOV_CONST(&iov[0], "*3\r\n$4\r\nSADD");
Expand All @@ -568,9 +547,7 @@ static RdbRes toRespZset(RdbParser *p, void *userData, RdbBulk member, double sc

int valLen = RDB_bulkLen(p, member);

RdbxRespWriterStartCmd startCmd;
startCmd.cmd = "ZADD";
startCmd.key = ctx->keyCtx.key;
RdbxRespWriterStartCmd startCmd = {"ZADD", ctx->keyCtx.key, 0};

/* write ZADD */
IOV_CONST(&iov[0], "*4\r\n$4\r\nZADD");
Expand Down Expand Up @@ -615,9 +592,7 @@ static RdbRes toRespFunction(RdbParser *p, void *userData, RdbBulk func) {

int funcLen = RDB_bulkLen(p, func);

RdbxRespWriterStartCmd startCmd;
startCmd.cmd = "FUNCTION";
startCmd.key = "";
RdbxRespWriterStartCmd startCmd = {"FUNCTION", "", 0};

if (ctx->conf.funcLibReplaceIfExist)
IOV_CONST(&iov[0], "*4\r\n$8\r\nFUNCTION\r\n$4\r\nLOAD\r\n$7\r\nREPLACE");
Expand All @@ -644,9 +619,7 @@ static RdbRes toRespStreamMetaData(RdbParser *p, void *userData, RdbStreamMeta *
* for the Stream type. (We don't use the MAXLEN 0 trick from aof.c
* because of Redis Enterprise CRDT compatibility issues - Can't XSETID "back") */

RdbxRespWriterStartCmd startCmd;
startCmd.cmd = "XGROUP CREATE";
startCmd.key = ctx->keyCtx.key;
RdbxRespWriterStartCmd startCmd = {"XGROUP CREATE", ctx->keyCtx.key, 0};

IOV_CONST(&iov[0], "*6\r\n$6\r\nXGROUP\r\n$6\r\nCREATE");
IOV_LENGTH(&iov[1], ctx->keyCtx.keyLen, keyLenStr);
Expand All @@ -671,9 +644,7 @@ static RdbRes toRespStreamMetaData(RdbParser *p, void *userData, RdbStreamMeta *
int idLen = snprintf(idStr, sizeof(idStr), "%lu-%lu",meta->lastID.ms,meta->lastID.seq);
int maxDelEntryIdLen = snprintf(maxDelEntryId, sizeof(maxDelEntryId), "%lu-%lu", meta->maxDelEntryID.ms, meta->maxDelEntryID.seq);

RdbxRespWriterStartCmd startCmd;
startCmd.cmd = "XSETID";
startCmd.key = ctx->keyCtx.key;
RdbxRespWriterStartCmd startCmd = {"XSETID", ctx->keyCtx.key, 0};

if ((ctx->keyCtx.info.opcode >= _RDB_TYPE_STREAM_LISTPACKS_2) && (ctx->targetRedisVerVal >= VER_VAL(7, 0))) {
IOV_CONST(&iov[0], "*7\r\n$6\r\nXSETID");
Expand Down Expand Up @@ -711,8 +682,7 @@ static RdbRes toRespStreamItem(RdbParser *p, void *userData, RdbStreamID *id, Rd

/* Start of (another) stream item? */
if ((ctx->streamCtx.xaddStartEndCounter % 2) == 0) {
startCmd.cmd = "XADD";
startCmd.key = ctx->keyCtx.key;
startCmd = (RdbxRespWriterStartCmd) {"XADD", ctx->keyCtx.key, 0};
startCmdRef = &startCmd;

/* writev XADD */
Expand Down Expand Up @@ -763,9 +733,7 @@ static RdbRes toRespStreamNewCGroup(RdbParser *p, void *userData, RdbBulk grpNam

int idLen = snprintf(idStr, sizeof(idStr), "%lu-%lu",meta->lastId.ms,meta->lastId.seq);

RdbxRespWriterStartCmd startCmd;
startCmd.cmd = "XGROUP";
startCmd.key = ctx->keyCtx.key;
RdbxRespWriterStartCmd startCmd = { "XGROUP", ctx->keyCtx.key, 0};

/* writev XGROUP */
if ( (meta->entriesRead>=0) && (ctx->targetRedisVerVal >= VER_VAL(7, 0))) {
Expand Down Expand Up @@ -845,9 +813,7 @@ static RdbRes toRespStreamConsumerPendingEntry(RdbParser *p, void *userData, Rdb
return (RdbRes) RDBX_ERR_STREAM_INTEG_CHECK;
}

RdbxRespWriterStartCmd startCmd;
startCmd.cmd = "XCLAIM";
startCmd.key = ctx->keyCtx.key;
RdbxRespWriterStartCmd startCmd = {"XCLAIM", ctx->keyCtx.key, 0};

/* writev XCLAIM */
IOV_CONST(&iov[iovs++], "*12\r\n$6\r\nXCLAIM");
Expand Down
15 changes: 11 additions & 4 deletions src/ext/readerResp.c
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,11 @@ static RespRes readRespReplyError(RespReaderCtx *ctx, RespReplyBuff *buffInfo) {
else
ctx->errorMsg[ctx->errorMsgLen - 1] = '\0';

res = RESP_REPLY_ERR;
/* Report the error. cb return 1 to propagate. 0 to mask */
if ((ctx->errCb) && (ctx->errCb(ctx->errCbCtx, ctx->errorMsg) == 0))
return RESP_REPLY_OK;

return RESP_REPLY_ERR;
}

return res;
Expand Down Expand Up @@ -450,9 +454,12 @@ static RespRes readRespReply(RespReaderCtx *ctx, RespReplyBuff *buffInfo) {
/*** non-static functions (public) ***/

void readRespInit(RespReaderCtx *ctx) {
ctx->type = 0;
ctx->errorMsgLen = 0;
ctx->countReplies = 0;
memset(ctx, 0, sizeof(RespReaderCtx));
}

void setErrorCb(RespReaderCtx *respReaderCtx, void *errorCbCtx, OnRespErrorCb cb) {
respReaderCtx->errCbCtx = errorCbCtx;
respReaderCtx->errCb = cb;
}

RespRes readRespReplies(RespReaderCtx *ctx, const char *buff, int buffLen) {
Expand Down
10 changes: 10 additions & 0 deletions src/ext/readerResp.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ typedef struct RespReplyBuff {
int at;
} RespReplyBuff;

/* cb to report on RESP error. Returns 1 to propagate. 0 to mask. */
typedef int (*OnRespErrorCb) (void *callerCtx, char *msg);

typedef struct {

/* PUBLIC: read-only */
Expand All @@ -33,8 +36,15 @@ typedef struct {
/* private bulk-array response state */
long long numBulksArray;

/* On RESP error callback */
void *errCbCtx;
OnRespErrorCb errCb;

} RespReaderCtx;

void readRespInit(RespReaderCtx *ctx);

/* Can register cb to decide whether to ignore given error or propagate it */
void setErrorCb(RespReaderCtx *respReaderCtx, void *errorCbCtx, OnRespErrorCb cb);

RespRes readRespReplies(RespReaderCtx *ctx, const char *buff, int buffLen);
Loading
Loading