关于协议 新的Resp3协议引入了很多概念,包括了Map、BigNumber、Set等,为了更好的了解协议情况,直接贴出解析入口方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 /* Parse a reply pointed to by parser->curr_location. */ int parseReply(ReplyParser *parser, void *p_ctx) { switch (parser->curr_location[0]) { case '$': return parseBulk(parser, p_ctx); case '+': return parseSimpleString(parser, p_ctx); case '-': return parseError(parser, p_ctx); case ':': return parseLong(parser, p_ctx); case '*': return parseArray(parser, p_ctx); case '~': return parseSet(parser, p_ctx); case '%': return parseMap(parser, p_ctx); case '#': return parseBool(parser, p_ctx); case ',': return parseDouble(parser, p_ctx); case '_': return parseNull(parser, p_ctx); case '(': return parseBigNumber(parser, p_ctx); case '=': return parseVerbatimString(parser, p_ctx); case '|': return parseAttributes(parser, p_ctx); default: if (parser->callbacks.error) parser->callbacks.error(p_ctx); } return C_ERR; }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 static const ReplyParserCallbacks DefaultParserCallbacks = { .null_callback = callReplyNull, .bulk_string_callback = callReplyBulkString, .null_bulk_string_callback = callReplyNullBulkString, .null_array_callback = callReplyNullArray, .error_callback = callReplyError, .simple_str_callback = callReplySimpleStr, .long_callback = callReplyLong, .array_callback = callReplyArray, .set_callback = callReplySet, .map_callback = callReplyMap, .double_callback = callReplyDouble, .bool_callback = callReplyBool, .big_number_callback = callReplyBigNumber, .verbatim_string_callback = callReplyVerbatimString, .attribute_callback = callReplyAttribute, .error = callReplyParseError, };
实际的场景主要在于Module需要解析协议进行处理,以及Lua需要将协议转换为lua数据并执行。 从最初的代码中,可以看到解析主要依赖俩块数据:parser
协议内容 此处,针对不同类型的默认解析器进行讲解,方便后续理解
parseNull 解析 协议为 _
1 2 3 4 5 6 7 8 9 10 11 12 static int parseNull(ReplyParser *parser, void *p_ctx) { const char *proto = parser->curr_location; char *p = strchr(proto+1,'\r'); parser->curr_location = p + 2; /* for \r\n */ parser->callbacks.null_callback(p_ctx, proto, parser->curr_location - proto); return C_OK; } static void callReplyNull(void *ctx, const char *proto, size_t proto_len) { CallReply *rep = ctx; callReplySetSharedData(rep, REDISMODULE_REPLY_NULL, proto, proto_len, REPLY_FLAG_RESP3); }
parseBulk 解析 协议为 $
开头的数据,并解析出 2字节的长度信息以及实际的内容,如果长度为-1,则认为是空数据。与Null相同意思。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 static int parseBulk(ReplyParser *parser, void *p_ctx) { const char *proto = parser->curr_location; char *p = strchr(proto+1,'\r'); long long bulklen; parser->curr_location = p + 2; /* for \r\n */ string2ll(proto+1,p-proto-1,&bulklen); if (bulklen == -1) { parser->callbacks.null_bulk_string_callback(p_ctx, proto, parser->curr_location - proto); } else { const char *str = parser->curr_location; parser->curr_location += bulklen; parser->curr_location += 2; /* for \r\n */ parser->callbacks.bulk_string_callback(p_ctx, str, bulklen, proto, parser->curr_location - proto); } return C_OK; }
parseError 解析 协议为 -
1 2 3 4 5 6 7 static int parseError(ReplyParser *parser, void *p_ctx) { const char *proto = parser->curr_location; char *p = strchr(proto+1,'\r'); parser->curr_location = p + 2; // for \r\n parser->callbacks.error_callback(p_ctx, proto+1, p-proto-1, proto, parser->curr_location - proto); return C_OK; }
parseSimpleString 解析 协议为 -
1 2 3 4 5 6 7 static int parseSimpleString(ReplyParser *parser, void *p_ctx) { const char *proto = parser->curr_location; char *p = strchr(proto+1,'\r'); parser->curr_location = p + 2; /* for \r\n */ parser->callbacks.simple_str_callback(p_ctx, proto+1, p-proto-1, proto, parser->curr_location - proto); return C_OK; }
parseLong 解析 协议为 :
开头的数据,并解析出 2字节的数据信息。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 static int parseLong(ReplyParser *parser, void *p_ctx) { const char *proto = parser->curr_location; char *p = strchr(proto+1,'\r'); parser->curr_location = p + 2; /* for \r\n */ long long val; string2ll(proto+1,p-proto-1,&val); parser->callbacks.long_callback(p_ctx, val, proto, parser->curr_location - proto); return C_OK; } static void callReplyLong(void *ctx, long long val, const char *proto, size_t proto_len) { CallReply *rep = ctx; callReplySetSharedData(rep, REDISMODULE_REPLY_INTEGER, proto, proto_len, 0); rep->val.ll = val; }
parseArray 解析 协议为 $
开头的数据,并解析出 2字节的元素个数信息,并最终调用callReplyParseCollection
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 static int parseArray(ReplyParser *parser, void *p_ctx) { const char *proto = parser->curr_location; char *p = strchr(proto+1,'\r'); long long len; string2ll(proto+1,p-proto-1,&len); p += 2; parser->curr_location = p; if (len == -1) { parser->callbacks.null_array_callback(p_ctx, proto, parser->curr_location - proto); } else { parser->callbacks.array_callback(parser, p_ctx, len, proto); } return C_OK; } static void callReplyArray(ReplyParser *parser, void *ctx, size_t len, const char *proto) { CallReply *rep = ctx; rep->type = REDISMODULE_REPLY_ARRAY; callReplyParseCollection(parser, rep, len, proto, 1); }
parseSet 解析 协议为 ~
开头的数据,并解析出 2字节的元素个数信息,并最终调用callReplyParseCollection
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 static int parseSet(ReplyParser *parser, void *p_ctx) { const char *proto = parser->curr_location; char *p = strchr(proto+1,'\r'); long long len; string2ll(proto+1,p-proto-1,&len); p += 2; parser->curr_location = p; parser->callbacks.set_callback(parser, p_ctx, len, proto); return C_OK; } static void callReplySet(ReplyParser *parser, void *ctx, size_t len, const char *proto) { CallReply *rep = ctx; rep->type = REDISMODULE_REPLY_SET; callReplyParseCollection(parser, rep, len, proto, 1); rep->flags |= REPLY_FLAG_RESP3; }
parseMap 解析 协议为 %
开头的数据,并解析出 2字节的元素个数信息,并最终调用callReplyParseCollection
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 static int parseMap(ReplyParser *parser, void *p_ctx) { const char *proto = parser->curr_location; char *p = strchr(proto+1,'\r'); long long len; string2ll(proto+1,p-proto-1,&len); p += 2; parser->curr_location = p; parser->callbacks.map_callback(parser, p_ctx, len, proto); return C_OK; } static void callReplyMap(ReplyParser *parser, void *ctx, size_t len, const char *proto) { CallReply *rep = ctx; rep->type = REDISMODULE_REPLY_MAP; callReplyParseCollection(parser, rep, len, proto, 2); rep->flags |= REPLY_FLAG_RESP3; }
parseDouble 解析 协议为 ,
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 static int parseDouble(ReplyParser *parser, void *p_ctx) { const char *proto = parser->curr_location; char *p = strchr(proto+1,'\r'); parser->curr_location = p + 2; /* for \r\n */ char buf[MAX_LONG_DOUBLE_CHARS+1]; size_t len = p-proto-1; double d; if (len <= MAX_LONG_DOUBLE_CHARS) { memcpy(buf,proto+1,len); buf[len] = '\0'; d = strtod(buf,NULL); /* We expect a valid representation. */ } else { d = 0; } parser->callbacks.double_callback(p_ctx, d, proto, parser->curr_location - proto); return C_OK; }
parseBool 解析 协议为 #
或者 f
1 2 3 4 5 6 7 8 9 10 11 12 13 static int parseBool(ReplyParser *parser, void *p_ctx) { const char *proto = parser->curr_location; char *p = strchr(proto+1,'\r'); parser->curr_location = p + 2; /* for \r\n */ parser->callbacks.bool_callback(p_ctx, proto[1] == 't', proto, parser->curr_location - proto); return C_OK; } static void callReplyBool(void *ctx, int val, const char *proto, size_t proto_len) { CallReply *rep = ctx; callReplySetSharedData(rep, REDISMODULE_REPLY_BOOL, proto, proto_len, REPLY_FLAG_RESP3); rep->val.ll = val; }
parseBigNumber 解析 协议为 (
1 2 3 4 5 6 7 8 9 10 11 12 13 14 static int parseBigNumber(ReplyParser *parser, void *p_ctx) { const char *proto = parser->curr_location; char *p = strchr(proto+1,'\r'); parser->curr_location = p + 2; /* for \r\n */ parser->callbacks.big_number_callback(p_ctx, proto+1, p-proto-1, proto, parser->curr_location - proto); return C_OK; } static void callReplyBigNumber(void *ctx, const char *str, size_t len, const char *proto, size_t proto_len) { CallReply *rep = ctx; callReplySetSharedData(rep, REDISMODULE_REPLY_BIG_NUMBER, proto, proto_len, REPLY_FLAG_RESP3); rep->len = len; rep->val.str = str; }
parseAttributes 解析 协议为 |
开头的数据,并解析出 2字节的元素个数信息,并最终调用callReplyParseCollection
进行递归解析协议内容,解析后,会再次调用 parseReply
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 static int parseAttributes(ReplyParser *parser, void *p_ctx) { const char *proto = parser->curr_location; char *p = strchr(proto+1,'\r'); long long len; string2ll(proto+1,p-proto-1,&len); p += 2; parser->curr_location = p; parser->callbacks.attribute_callback(parser, p_ctx, len, proto); return C_OK; } static void callReplyAttribute(ReplyParser *parser, void *ctx, size_t len, const char *proto) { CallReply *rep = ctx; rep->attribute = zcalloc(sizeof(CallReply)); /* Continue parsing the attribute reply */ rep->attribute->len = len; rep->attribute->type = REDISMODULE_REPLY_ATTRIBUTE; callReplyParseCollection(parser, rep->attribute, len, proto, 2); rep->attribute->flags |= REPLY_FLAG_PARSED | REPLY_FLAG_RESP3; rep->attribute->private_data = rep->private_data; /* Continue parsing the reply */ parseReply(parser, rep); /* In this case we need to fix the proto address and len, it should start from the attribute */ rep->proto = proto; rep->proto_len = parser->curr_location - proto; rep->flags |= REPLY_FLAG_RESP3; }
parseVerbatimString 解析 协议为 =
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 static int parseVerbatimString(ReplyParser *parser, void *p_ctx) { const char *proto = parser->curr_location; char *p = strchr(proto+1,'\r'); long long bulklen; parser->curr_location = p + 2; /* for \r\n */ string2ll(proto+1,p-proto-1,&bulklen); const char *format = parser->curr_location; parser->curr_location += bulklen; parser->curr_location += 2; /* for \r\n */ parser->callbacks.verbatim_string_callback(p_ctx, format, format + 4, bulklen - 4, proto, parser->curr_location - proto); return C_OK; } static void callReplyVerbatimString(void *ctx, const char *format, const char *str, size_t len, const char *proto, size_t proto_len) { CallReply *rep = ctx; callReplySetSharedData(rep, REDISMODULE_REPLY_VERBATIM_STRING, proto, proto_len, REPLY_FLAG_RESP3); rep->len = len; rep->val.verbatim_str.str = str; rep->val.verbatim_str.format = format; }
特殊源码 协议结构 单独来了解一下默认协议的表示结构体:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 struct CallReply { void *private_data; sds original_proto; /* Available only for root reply. */ const char *proto; size_t proto_len; int type; /* REPLY_... */ int flags; /* REPLY_FLAG... */ size_t len; /* Length of a string, or the number elements in an array. */ union { const char *str; /* String pointer for string and error replies. This * does not need to be freed, always points inside * a reply->proto buffer of the reply object or, in * case of array elements, of parent reply objects. */ struct { const char *str; const char *format; } verbatim_str; /* Reply value for verbatim string */ long long ll; /* Reply value for integer reply. */ double d; /* Reply value for double reply. */ struct CallReply *array; /* Array of sub-reply elements. used for set, array, map, and attribute */ } val; list *deferred_error_list; /* list of errors in sds form or NULL */ struct CallReply *attribute; /* attribute reply, NULL if not exists */ };
协议信息 1 2 3 4 5 6 7 void *private_data; sds original_proto; /* Available only for root reply. */ const char *proto; size_t proto_len; int type; /* REPLY_... */ int flags; /* REPLY_FLAG... */ size_t len; /* Length of a string, or the number elements in an array. */
数据部分 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 size_t len; /* Length of a string, or the number elements in an array. */ union { const char *str; /* String pointer for string and error replies. This * does not need to be freed, always points inside * a reply->proto buffer of the reply object or, in * case of array elements, of parent reply objects. */ struct { const char *str; const char *format; } verbatim_str; /* Reply value for verbatim string */ long long ll; /* Reply value for integer reply. */ double d; /* Reply value for double reply. */ struct CallReply *array; /* Array of sub-reply elements. used for set, array, map, and attribute */ } val; struct CallReply *attribute; /* attribute reply, NULL if not exists */
其中,我们将 attribute
析构处理 1 list *deferred_error_list; /* list of errors in sds form or NULL */
1 2 3 4 5 6 7 8 9 10 11 12 void freeCallReply(CallReply *rep) { if (!(rep->flags & REPLY_FLAG_ROOT)) { return; } if (rep->flags & REPLY_FLAG_PARSED) { freeCallReplyInternal(rep); } sdsfree(rep->original_proto); if (rep->deferred_error_list) listRelease(rep->deferred_error_list); zfree(rep); }
递归逻辑 本节单独解析 callReplyParseCollection
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 static void callReplyParseCollection(ReplyParser *parser, CallReply *rep, size_t len, const char *proto, size_t elements_per_entry) { rep->len = len; rep->val.array = zcalloc(elements_per_entry * len * sizeof(CallReply)); for (size_t i = 0; i < len * elements_per_entry; i += elements_per_entry) { for (size_t j = 0 ; j < elements_per_entry ; ++j) { rep->val.array[i + j].private_data = rep->private_data; parseReply(parser, rep->val.array + i + j); rep->val.array[i + j].flags |= REPLY_FLAG_PARSED; if (rep->val.array[i + j].flags & REPLY_FLAG_RESP3) { /* If one of the sub-replies is RESP3, then the current reply is also RESP3. */ rep->flags |= REPLY_FLAG_RESP3; } } } rep->proto = proto; rep->proto_len = parser->curr_location - proto; }