Skip to content

Commit

Permalink
Add support to HASH and SET datatypes
Browse files Browse the repository at this point in the history
- Support plain hash, ziplist hash, listpack hash, zipmap hash, plain set,
  intset and listpack set. 
- Added corresponding tests for json, resp and redis formaters.
- Data integrity doesn't check for duplication of elements that was read from 
  RDB (if it is important to client, then it is the one to implement it).
- As list data-type also uses ziplist and listpack - some refactor was made at 
  parserRaw.c.
- Copied implementation of zipmap from Redis to deps/redis/zipmap.c.
- Added comments to API.
  • Loading branch information
moticless authored Aug 13, 2023
1 parent 0a58bb5 commit 129bef5
Show file tree
Hide file tree
Showing 56 changed files with 2,923 additions and 479 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,7 @@ of callbacks, it is the duty of the application to configure for each RDB object
what level it is needed to get handled by calling `RDB_handleByLevel()`. Otherwise, the
parser will resolve it by parsing and calling handlers that are registered at lowest level.

As for the common callbacks to all levels (which includes `handleNewRdb`, `handleNewDb`,
As for the common callbacks to all levels (which includes `handleStartRdb`, `handleNewDb`,
`handleEndRdb`, `handleDbSize` and `handleAuxField`) if registered at different
levels then all of them will be called, one by one, starting from handlers that are
registered at the lowest level.
Expand Down
164 changes: 126 additions & 38 deletions api/librdb-api.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,18 @@ typedef enum RdbRes {
RDB_ERR_NOT_SUPPORTED_RDB_ENCODING_TYPE,
RDB_ERR_UNKNOWN_RDB_ENCODING_TYPE,
RDB_ERR_QUICK_LIST_INTEG_CHECK,
RDB_ERR_ZIP_LIST_INTEG_CHECK,
RDB_ERR_LIST_ZL_INTEG_CHECK,
RDB_ERR_SET_IS_INTEG_CHECK,
RDB_ERR_SET_LP_INTEG_CHECK,
RDB_ERR_HASH_LP_INTEG_CHECK,
RDB_ERR_HASH_ZM_INTEG_CHECK,
RDB_ERR_SSTYPE_INTEG_CHECK,
RDB_ERR_STRING_INVALID_STATE,
RDB_ERR_PLAIN_HASH_INVALID_STATE,
RDB_ERR_PLAIN_LIST_INVALID_STATE,
RDB_ERR_PLAIN_SET_INVALID_STATE,
RDB_ERR_QUICK_LIST_INVALID_STATE,
RDB_ERR_ZIP_LIST_INVALID_STATE,
RDB_ERR_SSTYPE_INVALID_STATE,
RDB_ERR_INVALID_BULK_ALLOC_TYPE,
RDB_ERR_INVALID_BULK_CLONE_REQUEST,
RDB_ERR_INVALID_BULK_LENGTH_REQUEST,
Expand Down Expand Up @@ -109,10 +116,10 @@ typedef enum RdbHandlersLevel {
} RdbHandlersLevel;

typedef enum RdbLogLevel {
RDB_LOG_ERROR,
RDB_LOG_WARNING,
RDB_LOG_INFO,
RDB_LOG_DEBUG
RDB_LOG_ERR,
RDB_LOG_WRN,
RDB_LOG_INF,
RDB_LOG_DBG
} RdbLogLevel;

/* for explanation, read "Memory management" section below */
Expand Down Expand Up @@ -159,75 +166,144 @@ typedef struct RdbStreamConsumerMeta {
} RdbStreamConsumerMeta;

/* misc function pointer typedefs */
typedef RdbStatus (*RdbReaderFunc) (RdbParser *p, void *readerData, void *buf, size_t len);
typedef RdbStatus (*RdbReaderFunc) (void *readerData, void *buf, size_t len);
typedef void (*RdbFreeFunc) (RdbParser *p, void *obj);
typedef void (*RdbLoggerCB) (RdbLogLevel l, const char *msg);

/****************************************************************
* Handlers callbacks struct
* Common Callbacks for Registration (handlers Level 0/1/2)
*
* Common Callbacks to all levels. If registered at different levels then all
* of them will be called, one by one, starting from handlers that are
* registered at the lowest level.
****************************************************************/

/* TODO: Pass "RdbParser *p" for each cb? User can hold it as well in "userData" */

/* avoid nested structures */
#define HANDLERS_COMMON_CALLBACKS \
RdbRes (*handleNewRdb)(RdbParser *p, void *userData, int rdbVersion); \
RdbRes (*handleNewDb)(RdbParser *p, void *userData, int db); \
RdbRes (*handleEndRdb)(RdbParser *p, void *userData); \
RdbRes (*handleDbSize)(RdbParser *p, void *userData, uint64_t db_size, uint64_t exp_size); \
RdbRes (*handleAuxField)(RdbParser *p, void *userData, RdbBulk auxkey, RdbBulk auxval); \
RdbRes (*handleNewKey)(RdbParser *p, void *userData, RdbBulk key, RdbKeyInfo *info); \
/* Definition of the common callback functions for handling various RDB parsing events */
#define HANDLERS_COMMON_CALLBACKS \
/* Callback once RDB header was parsed */ \
RdbRes (*handleStartRdb)(RdbParser *p, void *userData, int rdbVersion); \
/* Callback to indicate the completion of the parsing process for the dump file */ \
RdbRes (*handleEndRdb)(RdbParser *p, void *userData); \
/* Callback to indicate start parsing of database `dbnum` */ \
RdbRes (*handleNewDb)(RdbParser *p, void *userData, int dbnum); \
/* Callback per db before the keys, with the key count and the total voletaile key count */ \
RdbRes (*handleDbSize)(RdbParser *p, void *userData, uint64_t db_size, uint64_t exp_size); \
/* Callback in the beginning of the RDB with various keys and values. exists since redis 3.2 (RDB v7) */ \
RdbRes (*handleAuxField)(RdbParser *p, void *userData, RdbBulk auxkey, RdbBulk auxval); \
/* Callback on each new key along with additional info, such as, expire-time, LRU, etc. */ \
RdbRes (*handleNewKey)(RdbParser *p, void *userData, RdbBulk key, RdbKeyInfo *info); \
/* Callback on each completion of a key. Useful To finalize handled key. */ \
RdbRes (*handleEndKey)(RdbParser *p, void *userData);

/****************************************************************
* Rdb raw Callbacks for Registration (handlers Level 0)
*
* This level of callback registration is adequate mainly when it is required
* to RESTORE from RDB source and play it against live Redis server.
****************************************************************/

typedef struct RdbHandlersRawCallbacks {
HANDLERS_COMMON_CALLBACKS
/* Callback on start of serializing value of a new key */
RdbRes (*handleBegin)(RdbParser *p, void *userData, size_t size);
/* Callback on one or more chunks of serialized value of a key */
RdbRes (*handleFrag)(RdbParser *p, void *userData, RdbBulk frag);
/* Callback on end of serializing value of a key */
RdbRes (*handleEnd)(RdbParser *p, void *userData);

/*** TODO: RdbHandlersRawCallbacks: handleBeginModuleAux ***/
RdbRes (*handleBeginModuleAux)(RdbParser *p, void *userData, RdbBulk name, int encver, int when);

} RdbHandlersRawCallbacks;

/****************************************************************
* Rdb data-structures Callbacks for Registration (handlers Level 1)
*
* This level of callback registration is adequate mainly when it is required to
* analyze memory consumption, or other reason for inspection of "low-level"
* data structures of RDB file.
****************************************************************/

typedef struct RdbHandlersStructCallbacks {
HANDLERS_COMMON_CALLBACKS
RdbRes (*handleStringValue)(RdbParser *p, void *userData, RdbBulk str);
RdbRes (*handleListLP)(RdbParser *p, void *userData, RdbBulk listLP);
RdbRes (*handleListZL)(RdbParser *p, void *userData, RdbBulk listZL);
RdbRes (*handleListNode)(RdbParser *p, void *userData, RdbBulk node);

/*** TODO: RdbHandlersStructCallbacks: handlerHashListPack, handleSetIntset, handleZsetListPack, handleFunction ***/
RdbRes (*handlerHashLP)(RdbParser *p, void *userData, RdbBulk hashLp);
RdbRes (*handleSetIntset)(RdbParser *p, void *userData, RdbBulk intSet);
RdbRes (*handleSetLP)(RdbParser *p, void *userData, RdbBulk setLP);
RdbRes (*handleSetZL)(RdbParser *p, void *userData, RdbBulk setZL);
RdbRes (*handleZsetLP)(RdbParser *p, void *userData, RdbBulk zsetLP);
/* Callback to handle a string value of a key */
RdbRes (*handleString)(RdbParser *p, void *userData, RdbBulk str);

/* Callback to handle an item from a plain-list. Or plain node of RDB_TYPE_LIST_QUICKLIST_2 */
RdbRes (*handleListPlain)(RdbParser *p, void *userData, RdbBulk node);
/* Callback to handle a ziplist-based list value */
RdbRes (*handleListZL)(RdbParser *p, void *userData, RdbBulk ziplist);
/* Callback to handle a listpack-based list value */
RdbRes (*handleListLP)(RdbParser *p, void *userData, RdbBulk listpack);

/* Callback to handle a field-value pair within a plain-hash */
RdbRes (*handleHashPlain)(RdbParser *p, void *userData, RdbBulk field, RdbBulk value);
/* Callback to handle a ziplist-based hash value */
RdbRes (*handleHashZL)(RdbParser *p, void *userData, RdbBulk ziplist);
/* Callback to handle a listpack-based hash value */
RdbRes (*handleHashLP)(RdbParser *p, void *userData, RdbBulk listpack);
/* Callback to handle a zipmap-based hash value */
RdbRes (*handleHashZM)(RdbParser *p, void *userData, RdbBulk zipmap);

/* Callback to handle an item from a plain-set */
RdbRes (*handleSetPlain)(RdbParser *p, void *userData, RdbBulk item);
/* Callback to handle an intset-based set value */
RdbRes (*handleSetIS)(RdbParser *p, void *userData, RdbBulk intset);
/* Callback to handle a listpack-based set value */
RdbRes (*handleSetLP)(RdbParser *p, void *userData, RdbBulk listpack);

/*** TODO: RdbHandlersStructCallbacks: ***/

/* Callback to handle a ziplist-based sorted set value */
RdbRes (*handleZsetZL)(RdbParser *p, void *userData, RdbBulk ziplist);
/* Callback to handle a listpack-based sorted set value */
RdbRes (*handleZsetLP)(RdbParser *p, void *userData, RdbBulk listpack);
/* Callback to handle function code */
RdbRes (*handleFunction)(RdbParser *p, void *userData, RdbBulk func);

/*** TODO: RdbHandlersStructCallbacks: stream stuff ... ***/

/* Callback to handle a stream key with listpack value */
RdbRes (*handleStreamLP)(RdbParser *p, void *userData, RdbBulk nodekey, RdbBulk streamLP);

} RdbHandlersStructCallbacks;

/****************************************************************
* Redis Data Type Callbacks for Registration (handlers Level 2)
*
* This level of callback registration is adequate when the focus is solely on
* the logical data types within the database, such as to export the data to
* another framework or investigate the content of the data.
****************************************************************/

typedef struct RdbHandlersDataCallbacks {
HANDLERS_COMMON_CALLBACKS
/* Callback to handle the value of a string key */
RdbRes (*handleStringValue)(RdbParser *p, void *userData, RdbBulk str);
RdbRes (*handleListElement)(RdbParser *p, void *userData, RdbBulk str);
/* Callback to handle an item from a list */
RdbRes (*handleListItem)(RdbParser *p, void *userData, RdbBulk item);
/* Callback to handle a field-value pair within a hash */
RdbRes (*handleHashField)(RdbParser *p, void *userData, RdbBulk field, RdbBulk value);
/* Callback to handle a member within a set */
RdbRes (*handleSetMember)(RdbParser *p, void *userData, RdbBulk member);

/*** TODO: RdbHandlersDataCallbacks: handleZsetElement ***/

/*** TODO: RdbHandlersDataCallbacks: handleHashElement, handleSetElement, handleZsetElement ***/
RdbRes (*handleHashElement)(RdbParser *p, void *userData, RdbBulk field, RdbBulk elm, uint64_t totalNumElm);
RdbRes (*handleSetElement)(RdbParser *p, void *userData, RdbBulk elm, uint64_t totalNumElm);
RdbRes (*handleZsetElement)(RdbParser *p, void *userData, RdbBulk elm, double score, uint64_t totalNumElm);
/* Callback to handle a member within a sorted set along with its score */
RdbRes (*handleZsetMember)(RdbParser *p, void *userData, RdbBulk member, double score);

/*** TODO: RdbHandlersDataCallbacks: stream stuff ... ***/

/* Callback to handle metadata associated with a stream */
RdbRes (*handleStreamMetadata)(RdbParser *p, void *userData, RdbStreamMeta *meta);
RdbRes (*handleStreamElement)(RdbParser *p, void *userData, RdbStreamID *id, RdbBulk field, RdbBulk value, uint64_t totalNumElm);

/* Callback to handle an item within a stream along with its field and value */
RdbRes (*handleStreamItem)(RdbParser *p, void *userData, RdbStreamID *id, RdbBulk field, RdbBulk value);
/* Callback to handle the creation of a new consumer group within a stream */
RdbRes (*handleStreamNewCGroup)(RdbParser *p, void *userData, RdbBulk grpName, RdbStreamGroupMeta *meta);
/* Callback to handle a pending entry within a consumer group */
RdbRes (*handleStreamCGroupPendingEntry)(RdbParser *p, void *userData, RdbStreamPendingEntry *pendingEntry);

/* Callback to handle the creation of a new consumer within a stream */
RdbRes (*handleStreamNewConsumer)(RdbParser *p, void *userData, RdbBulk consName, RdbStreamConsumerMeta *meta);
/* Callback to handle a pending entry within a consumer */
RdbRes (*handleStreamConsumerPendingEntry)(RdbParser *p, void *userData, RdbStreamPendingEntry *pendingEntry);

} RdbHandlersDataCallbacks;
Expand All @@ -250,6 +326,13 @@ _LIBRDB_API RdbStatus RDB_parseBuff(RdbParser *p,

/****************************************************************
* Create Reader
*
* The built-in readers should be sufficient for most purposes. However, if they
* do not meet your specific needs, you can use the `RDB_createReaderRdb()`
* helper function to create a custom reader with its own reader function. The
* built-in reader file (readerFile.c) can serve as a code reference for this
* purpose.
*
* Used by: RDBX_createReaderFile
* <user-defined-reader>
****************************************************************/
Expand Down Expand Up @@ -309,6 +392,11 @@ _LIBRDB_API void RDB_setLogLevel(RdbParser *p, RdbLogLevel l);
_LIBRDB_API void RDB_setLogger(RdbParser *p, RdbLoggerCB f);
_LIBRDB_API void RDB_log(RdbParser *p, RdbLogLevel lvl, const char *format, ...);

/* Following function returns a hint for the total number of items in the current
* parsed key context - to assist with memory allocation or other optimizations.
* If hint is not available, then return -1. */
_LIBRDB_API int64_t RDB_getNumItemsHint(RdbParser *p);

/****************************************************************
* Pause the Parser
*
Expand Down
4 changes: 2 additions & 2 deletions api/librdb-ext-api.h
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,8 @@ typedef struct RdbxToRespConf {
/* TODO: support the option of expire, del, select db */

/* If supportRestore, then data-types will be translated to RESTORE with
* raw data instead of data-types commands. This is a performance, version
* (specific) aligned, optimization */
* raw data instead of data-types commands. This is a performance optimization
* that requires to be version aligned. */
int supportRestore;

/* TODO: support rdb2resp del key before write */
Expand Down
Loading

0 comments on commit 129bef5

Please sign in to comment.