Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extend rdbloader to support FD #12

Merged
merged 5 commits into from
Aug 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion api/librdb-api.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ typedef enum RdbRes {
RDB_ERR_GENERAL,

RDB_ERR_FAIL_ALLOC,
RDB_ERR_INVALID_CONFIGURATION,
RDB_ERR_FAILED_CREATE_PARSER,
RDB_ERR_FAILED_OPEN_LOG_FILE,
RDB_ERR_FAILED_READ_RDB_FILE,
Expand Down Expand Up @@ -380,13 +381,21 @@ _LIBRDB_API void RDB_dontPropagate(RdbParser *p);
* Parser setters & getters
****************************************************************/

_LIBRDB_API void RDB_setMaxRawLenHandling(RdbParser *p, size_t size);
_LIBRDB_API void RDB_setDeepIntegCheck(RdbParser *p, int deep);
_LIBRDB_API size_t RDB_getBytesProcessed(RdbParser *p);
_LIBRDB_API RdbState RDB_getState(RdbParser *p);
_LIBRDB_API int RDB_getNumHandlers(RdbParser *p, RdbHandlersLevel lvl);
_LIBRDB_API void RDB_IgnoreChecksum(RdbParser *p);

/* There could be relatively large strings stored within Redis, which are
* subsequently also present in the RDB. This is especially true for collections
* of strings. In situations like this, if the parser is configured to read
* raw data (using RDB_createHandlersRaw), it could potentially lead to memory
* problems in data path. By establishing a MaxRawSize threshold, the size of
* raw data can be restricted, and if this threshold is exceeded, the parser will
* terminate its operation. The default threshold is unlimited. */
_LIBRDB_API void RDB_setMaxRawSize(RdbParser *p, size_t maxSize);

/* logger */
_LIBRDB_API void RDB_setLogLevel(RdbParser *p, RdbLogLevel l);
_LIBRDB_API void RDB_setLogger(RdbParser *p, RdbLoggerCB f);
Expand Down
44 changes: 23 additions & 21 deletions api/librdb-ext-api.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,15 @@ typedef struct RdbxReaderFileDesc RdbxReaderFileDesc;
typedef struct RdbxFilterKey RdbxFilterKey;
typedef struct RdbxToJson RdbxToJson;
typedef struct RdbxToResp RdbxToResp;
typedef struct RdbxRespWriter RdbxRespWriter;
typedef struct RdbxRespToTcpLoader RdbxRespToTcpLoader;
typedef struct RdbxRespToRedisLoader RdbxRespToRedisLoader;

/****************************************************************
* Error codes
****************************************************************/

typedef enum {
RDBX_ERR_READER_FILE_GENERAL_ERROR = _RDB_ERR_EXTENSION_FIRST,
RDBX_ERR_RESP_FAILED_ALLOC,

/* rdb2json errors */
RDBX_ERR_FAILED_OPEN_FILE,
Expand All @@ -33,20 +33,17 @@ typedef enum {
RDBX_ERR_FILTER_FAILED_COMPILE_REGEX,

/* rdb2resp errors */
RDBX_ERR_RESP_INVALID_CONN_TYPE,
RDBX_ERR_RESP_FAILED_ALLOC,
RDBX_ERR_RESP_INIT_CONN_ERROR,

/* resp writer/loader */
RDBX_ERR_RESP_WRITE,
RDBX_ERR_RESP_READ,
RDBX_ERR_RESP2TCP_CREATE_SOCKET,
RDBX_ERR_RESP2TCP_INVALID_ADDRESS,
RDBX_ERR_RESP2TCP_FAILED_CONNECT,
RDBX_ERR_RESP2TCP_FAILED_READ,
RDBX_ERR_RESP2TCP_FAILED_WRITE,
RDBX_ERR_RESP2TCP_CONN_CLOSE,
RDBX_ERR_RESP2TCP_MAX_RETRIES,
RDBX_ERR_RESP2REDIS_CREATE_SOCKET,
RDBX_ERR_RESP2REDIS_INVALID_ADDRESS,
RDBX_ERR_RESP2REDIS_FAILED_CONNECT,
RDBX_ERR_RESP2REDIS_FAILED_READ,
RDBX_ERR_RESP2REDIS_FAILED_WRITE,
RDBX_ERR_RESP2REDIS_CONN_CLOSE,
RDBX_ERR_RESP2REDIS_MAX_RETRIES,
} RdbxRes;

/****************************************************************
Expand Down Expand Up @@ -129,19 +126,20 @@ _LIBRDB_API RdbxToResp *RDBX_createHandlersToResp(RdbParser *, RdbxToRespConf *)
*
* Create instance for writing RDB to RESP stream.
*
* Used by: RDBX_createRespToTcpLoader
* Used by: RDBX_createRespToRedisTcp
* RDBX_createRespToRedisFd
* RDBX_createRespFileWriter
* <user-defined-handlers>
****************************************************************/

struct RdbxRespWriter {
typedef struct RdbxRespWriter {
void *ctx;
void (*delete)(void *ctx);

/* return 0 on success. Otherwise 1 */
int (*writev) (void *ctx, const struct iovec *ioVec, int count, int startCmd, int endCmd);
int (*writev) (void *ctx, struct iovec *ioVec, int count, int startCmd, int endCmd);
int (*flush) (void *ctx);
};
} RdbxRespWriter;

_LIBRDB_API void RDBX_attachRespWriter(RdbxToResp *rdbToResp, RdbxRespWriter *writer);

Expand All @@ -158,15 +156,19 @@ _LIBRDB_API RdbxRespFileWriter *RDBX_createRespFileWriter(RdbParser *p,
/****************************************************************
* Create RESP to Redis TCP connection
*
* If provided path is NULL then write stdout
* Can configure pipeline depth of transmitted RESP commands. Set
* to 0 if to use default.
****************************************************************/
_LIBRDB_API RdbxRespToTcpLoader *RDBX_createRespToTcpLoader(RdbParser *p,
_LIBRDB_API RdbxRespToRedisLoader *RDBX_createRespToRedisTcp(RdbParser *p,
RdbxToResp *rdbToResp,
const char* hostname,
int port,
int pipelineDepth);
const char *hostname,
int port);

_LIBRDB_API RdbxRespToRedisLoader *RDBX_createRespToRedisFd(RdbParser *p,
RdbxToResp *rdbToResp,
int fd);

_LIBRDB_API void RDBX_setPipelineDepth(RdbxRespToRedisLoader *r2r, int depth);

#ifdef __cplusplus
}
Expand Down
3 changes: 1 addition & 2 deletions src/cli/rdb-cli.c
Original file line number Diff line number Diff line change
Expand Up @@ -159,8 +159,7 @@ static RdbRes formatRedis(RdbParser *parser, char *input, int argc, char **argv)
if ((rdbToResp = RDBX_createHandlersToResp(parser, &conf)) == NULL)
return RDB_ERR_GENERAL;


if (RDBX_createRespToTcpLoader(parser, rdbToResp, hostname, port, pipeDepthVal) == NULL)
if (RDBX_createRespToRedisTcp(parser, rdbToResp, hostname, port) == NULL)
return RDB_ERR_GENERAL;

return RDB_OK;
Expand Down
33 changes: 25 additions & 8 deletions src/ext/handlersToJson.c
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ static RdbxToJson *initRdbToJsonCtx(RdbParser *p, const char *filename, RdbxToJs

/* init RdbToJson context */
RdbxToJson *ctx = RDB_alloc(p, sizeof(RdbxToJson));
memset(ctx, 0, sizeof(RdbxToJson));
ctx->filename = RDB_alloc(p, strlen(filename)+1);
strcpy(ctx->filename, filename);
ctx->outfile = f;
Expand Down Expand Up @@ -192,11 +193,11 @@ static RdbRes toJsonNewDb(RdbParser *p, void *userData, int db) {
RdbxToJson *ctx = userData;

if (ctx->state == R2J_IDLE) {
if (!ctx->conf.flatten) ouput_fprintf(ctx, "[{\n");
if (!ctx->conf.flatten) ouput_fprintf(ctx, "{\n");
} else if (ctx->state == R2J_IN_DB) {
/* output json part */
if (!ctx->conf.flatten) {
ouput_fprintf(ctx, "},{\n");
ouput_fprintf(ctx, "\n},{\n");
} else {
ouput_fprintf(ctx, ",\n");
}
Expand All @@ -213,20 +214,35 @@ static RdbRes toJsonNewDb(RdbParser *p, void *userData, int db) {
return RDB_OK;
}

static RdbRes toJsonNewRdb(RdbParser *p, void *userData, int rdbVersion) {
UNUSED(rdbVersion);
RdbxToJson *ctx = userData;

if (ctx->state != R2J_IDLE) {
RDB_reportError(p, (RdbRes) RDBX_ERR_R2J_INVALID_STATE,
"toJsonNewRdb(): Invalid state value: %d", ctx->state);
return (RdbRes) RDBX_ERR_R2J_INVALID_STATE;
}

if (!ctx->conf.flatten) ouput_fprintf(ctx, "[");

return RDB_OK;
}

static RdbRes toJsonEndRdb(RdbParser *p, void *userData) {
RdbxToJson *ctx = userData;

if (ctx->state != R2J_IN_DB) {
if (ctx->state == R2J_IDLE) {
RDB_log(p, RDB_LOG_WRN, "RDB is empty.");
} else if (ctx->state == R2J_IN_DB) {
if (!ctx->conf.flatten) ouput_fprintf(ctx, "\n}");
} else {
RDB_reportError(p, (RdbRes) RDBX_ERR_R2J_INVALID_STATE,
"toJsonEndRdb(): Invalid state value: %d", ctx->state);
return (RdbRes) RDBX_ERR_R2J_INVALID_STATE;
}

/* output json part */
if (!ctx->conf.flatten)
ouput_fprintf(ctx, "\n}]\n");
else
ouput_fprintf(ctx, "\n");
if (!ctx->conf.flatten) ouput_fprintf(ctx, "]\n");

/* update new state */
ctx->state = R2J_IDLE;
Expand Down Expand Up @@ -384,6 +400,7 @@ RdbxToJson *RDBX_createHandlersToJson(RdbParser *p, const char *filename, RdbxTo
callbacks.common.handleNewKey = toJsonNewKey;
callbacks.common.handleEndKey = toJsonEndKey;
callbacks.common.handleNewDb = toJsonNewDb;
callbacks.common.handleStartRdb = toJsonNewRdb;
callbacks.common.handleEndRdb = toJsonEndRdb;

if (ctx->conf.level == RDB_LEVEL_DATA) {
Expand Down
Loading