Skip to content

Commit 62e8d80

Browse files
Producer/Consumer table binary support (#801)
We want to make the Producer/Consumer table can support binary messages, The native C string (char *) will be replaced to pointers and its lengths in all paths. Meanwhile, the Python interfaces of SWIG can only handle the type, str, with UTF-8. So, we need to specialize the SWIG interfaces from bytes of Python to string of C++. --------- Signed-off-by: Ze Gan <[email protected]> Co-authored-by: Qi Luo <[email protected]>
1 parent e0f394c commit 62e8d80

11 files changed

+181
-74
lines changed

common/consumerstatetable.cpp

+3-3
Original file line numberDiff line numberDiff line change
@@ -68,16 +68,16 @@ void ConsumerStateTable::pops(std::deque<KeyOpFieldsValuesTuple> &vkco, const st
6868

6969
auto& ctx = ctx0->element[ie];
7070
assert(ctx->element[0]->type == REDIS_REPLY_STRING);
71-
std::string key = ctx->element[0]->str;
71+
std::string key(ctx->element[0]->str, ctx->element[0]->len);
7272
kfvKey(kco) = key;
7373

7474
assert(ctx->element[1]->type == REDIS_REPLY_ARRAY);
7575
auto ctx1 = ctx->element[1];
7676
for (size_t i = 0; i < ctx1->elements / 2; i++)
7777
{
7878
FieldValueTuple e;
79-
fvField(e) = ctx1->element[i * 2]->str;
80-
fvValue(e) = ctx1->element[i * 2 + 1]->str;
79+
fvField(e).assign(ctx1->element[i * 2]->str, ctx1->element[i * 2]->len);
80+
fvValue(e).assign(ctx1->element[i * 2 + 1]->str, ctx1->element[i * 2 + 1]->len);
8181
values.push_back(e);
8282
}
8383

common/luatable.cpp

+2-10
Original file line numberDiff line numberDiff line change
@@ -54,13 +54,9 @@ bool LuaTable::get(const vector<string> &luaKeys, vector<FieldValueTuple> &value
5454
args.emplace_back(v);
5555
}
5656

57-
// Transform data structure
58-
vector<const char *> args1;
59-
transform(args.begin(), args.end(), back_inserter(args1), [](const string &s) { return s.c_str(); } );
60-
6157
// Invoke redis command
6258
RedisCommand command;
63-
command.formatArgv((int)args1.size(), &args1[0], NULL);
59+
command.format(args);
6460
RedisReply r(m_db.get(), command, REDIS_REPLY_ARRAY);
6561
redisReply *reply = r.getContext();
6662

@@ -109,13 +105,9 @@ bool LuaTable::hget(const vector<string> &luaKeys, const string &field, string &
109105
args.emplace_back(v);
110106
}
111107

112-
// Transform data structure
113-
vector<const char *> args1;
114-
transform(args.begin(), args.end(), back_inserter(args1), [](const string &s) { return s.c_str(); } );
115-
116108
// Invoke redis command
117109
RedisCommand command;
118-
command.formatArgv((int)args1.size(), &args1[0], NULL);
110+
command.format(args);
119111
RedisReply r(m_db.get(), command);
120112
redisReply *reply = r.getContext();
121113

common/producerstatetable.cpp

+6-30
Original file line numberDiff line numberDiff line change
@@ -134,13 +134,9 @@ void ProducerStateTable::set(const string &key, const vector<FieldValueTuple> &v
134134
args.emplace_back(fvValue(iv));
135135
}
136136

137-
// Transform data structure
138-
vector<const char *> args1;
139-
transform(args.begin(), args.end(), back_inserter(args1), [](const string &s) { return s.c_str(); } );
140-
141137
// Invoke redis command
142138
RedisCommand command;
143-
command.formatArgv((int)args1.size(), &args1[0], NULL);
139+
command.format(args);
144140
m_pipe->push(command, REDIS_REPLY_NIL);
145141
if (!m_buffered)
146142
{
@@ -171,13 +167,9 @@ void ProducerStateTable::del(const string &key, const string &op /*= DEL_COMMAND
171167
args.emplace_back("''");
172168
args.emplace_back("''");
173169

174-
// Transform data structure
175-
vector<const char *> args1;
176-
transform(args.begin(), args.end(), back_inserter(args1), [](const string &s) { return s.c_str(); } );
177-
178170
// Invoke redis command
179171
RedisCommand command;
180-
command.formatArgv((int)args1.size(), &args1[0], NULL);
172+
command.format(args);
181173
m_pipe->push(command, REDIS_REPLY_NIL);
182174
if (!m_buffered)
183175
{
@@ -224,13 +216,9 @@ void ProducerStateTable::set(const std::vector<KeyOpFieldsValuesTuple>& values)
224216
}
225217
}
226218

227-
// Transform data structure
228-
vector<const char *> args1;
229-
transform(args.begin(), args.end(), back_inserter(args1), [](const string &s) { return s.c_str(); } );
230-
231219
// Invoke redis command
232220
RedisCommand command;
233-
command.formatArgv((int)args1.size(), &args1[0], NULL);
221+
command.format(args);
234222
m_pipe->push(command, REDIS_REPLY_NIL);
235223
if (!m_buffered)
236224
{
@@ -265,13 +253,9 @@ void ProducerStateTable::del(const std::vector<std::string>& keys)
265253
}
266254
args.emplace_back("G");
267255

268-
// Transform data structure
269-
vector<const char *> args1;
270-
transform(args.begin(), args.end(), back_inserter(args1), [](const string &s) { return s.c_str(); } );
271-
272256
// Invoke redis command
273257
RedisCommand command;
274-
command.formatArgv((int)args1.size(), &args1[0], NULL);
258+
command.format(args);
275259
m_pipe->push(command, REDIS_REPLY_NIL);
276260
if (!m_buffered)
277261
{
@@ -307,13 +291,9 @@ void ProducerStateTable::clear()
307291
args.emplace_back(getStateHashPrefix() + getTableName());
308292
args.emplace_back(getDelKeySetName());
309293

310-
// Transform data structure
311-
vector<const char *> args1;
312-
transform(args.begin(), args.end(), back_inserter(args1), [](const string &s) { return s.c_str(); } );
313-
314294
// Invoke redis command
315295
RedisCommand cmd;
316-
cmd.formatArgv((int)args1.size(), &args1[0], NULL);
296+
cmd.format(args);
317297
m_pipe->push(cmd, REDIS_REPLY_NIL);
318298
m_pipe->flush();
319299
}
@@ -466,13 +446,9 @@ void ProducerStateTable::apply_temp_view()
466446
SWSS_LOG_DEBUG("apply_view.lua is called with following argument list: %s", ss.str().c_str());
467447
}
468448

469-
// Transform data structure
470-
vector<const char *> args1;
471-
transform(args.begin(), args.end(), back_inserter(args1), [](const string &s) { return s.c_str(); } );
472-
473449
// Invoke redis command
474450
RedisCommand command;
475-
command.formatArgv((int)args1.size(), &args1[0], NULL);
451+
command.format(args);
476452
m_pipe->push(command, REDIS_REPLY_NIL);
477453
m_pipe->flush();
478454

common/redisapi.h

+1-9
Original file line numberDiff line numberDiff line change
@@ -83,16 +83,8 @@ static inline std::set<std::string> runRedisScript(RedisContext &ctx, const std:
8383
args.insert(args.end(), argv.begin(), argv.end());
8484
args.push_back("''");
8585

86-
// Convert to vector of char *
87-
std::vector<const char *> c_args;
88-
transform(
89-
args.begin(),
90-
args.end(),
91-
std::back_inserter(c_args),
92-
[](const std::string& s) { return s.c_str(); } );
93-
9486
RedisCommand command;
95-
command.formatArgv(static_cast<int>(c_args.size()), c_args.data(), NULL);
87+
command.format(args);
9688

9789
std::set<std::string> ret;
9890
try

common/rediscommand.cpp

+24-11
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,15 @@
11
#include <vector>
22
#include <hiredis/hiredis.h>
33
#include "rediscommand.h"
4+
#include "stringutility.h"
45

56
using namespace std;
67

78
namespace swss {
89

910
RedisCommand::RedisCommand()
10-
: temp(NULL)
11+
: temp(NULL),
12+
len(0)
1113
{
1214
}
1315

@@ -26,7 +28,7 @@ void RedisCommand::format(const char *fmt, ...)
2628

2729
va_list ap;
2830
va_start(ap, fmt);
29-
int len = redisvFormatCommand(&temp, fmt, ap);
31+
len = redisvFormatCommand(&temp, fmt, ap);
3032
va_end(ap);
3133
if (len == -1) {
3234
throw std::bad_alloc();
@@ -43,7 +45,7 @@ void RedisCommand::formatArgv(int argc, const char **argv, const size_t *argvlen
4345
temp = nullptr;
4446
}
4547

46-
int len = redisFormatCommandArgv(&temp, argc, argv, argvlen);
48+
len = redisFormatCommandArgv(&temp, argc, argv, argvlen);
4749
if (len == -1) {
4850
throw std::bad_alloc();
4951
}
@@ -52,11 +54,13 @@ void RedisCommand::formatArgv(int argc, const char **argv, const size_t *argvlen
5254
void RedisCommand::format(const vector<string> &commands)
5355
{
5456
vector<const char*> args;
57+
vector<size_t> lens;
5558
for (auto& command : commands)
5659
{
5760
args.push_back(command.c_str());
61+
lens.push_back(command.size());
5862
}
59-
formatArgv(static_cast<int>(args.size()), args.data(), NULL);
63+
formatArgv(static_cast<int>(args.size()), args.data(), lens.data());
6064
}
6165

6266
/* Format HSET key multiple field value command */
@@ -96,12 +100,9 @@ void RedisCommand::formatHDEL(const std::string& key, const std::vector<std::str
96100
{
97101
if (fields.empty()) throw std::invalid_argument("empty values");
98102

99-
std::vector<const char *> args = {"HDEL", key.c_str()};
100-
for (const std::string &f : fields)
101-
{
102-
args.push_back(f.c_str());
103-
}
104-
formatArgv(static_cast<int>(args.size()), args.data(), NULL);
103+
std::vector<string> args = {"HDEL", key};
104+
args.insert(args.end(), fields.begin(), fields.end());
105+
format(args);
105106
}
106107

107108
/* Format EXPIRE key field command */
@@ -122,14 +123,26 @@ void RedisCommand::formatDEL(const std::string& key)
122123
return format("DEL %s", key.c_str());
123124
}
124125

126+
int RedisCommand::appendTo(redisContext *ctx) const
127+
{
128+
return redisAppendFormattedCommand(ctx, c_str(), length());
129+
}
130+
131+
std::string RedisCommand::toPrintableString() const
132+
{
133+
return binary_to_printable(temp, len);
134+
}
135+
125136
const char *RedisCommand::c_str() const
126137
{
127138
return temp;
128139
}
129140

130141
size_t RedisCommand::length() const
131142
{
132-
return strlen(temp);
143+
if (len <= 0)
144+
return 0;
145+
return static_cast<size_t>(len);
133146
}
134147

135148
}

common/rediscommand.h

+12-4
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
#include <string>
77
#include <stdexcept>
88
#include <map>
9+
#include <hiredis/hiredis.h>
910

1011
namespace swss {
1112

@@ -17,6 +18,7 @@ typedef std::tuple<std::string, std::string, std::vector<FieldValueTuple> > KeyO
1718
#define kfvOp std::get<1>
1819
#define kfvFieldsValues std::get<2>
1920

21+
2022
class RedisCommand {
2123
public:
2224
RedisCommand();
@@ -64,12 +66,18 @@ class RedisCommand {
6466
/* Format DEL key command */
6567
void formatDEL(const std::string& key);
6668

69+
int appendTo(redisContext *ctx) const;
70+
71+
std::string toPrintableString() const;
72+
73+
protected:
6774
const char *c_str() const;
6875

6976
size_t length() const;
7077

7178
private:
7279
char *temp;
80+
int len;
7381
};
7482

7583
template<typename InputIterator>
@@ -80,15 +88,15 @@ void RedisCommand::formatHSET(const std::string &key,
8088

8189
const char* cmd = "HSET";
8290

83-
std::vector<const char*> args = { cmd, key.c_str() };
91+
std::vector<std::string> args = { cmd, key.c_str() };
8492

8593
for (auto i = start; i != stop; i++)
8694
{
87-
args.push_back(fvField(*i).c_str());
88-
args.push_back(fvValue(*i).c_str());
95+
args.push_back(fvField(*i));
96+
args.push_back(fvValue(*i));
8997
}
9098

91-
formatArgv((int)args.size(), args.data(), NULL);
99+
format(args);
92100
}
93101

94102
}

common/redispipeline.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ class RedisPipeline {
4949
case REDIS_REPLY_STATUS:
5050
case REDIS_REPLY_INTEGER:
5151
{
52-
int rc = redisAppendFormattedCommand(m_db->getContext(), command.c_str(), command.length());
52+
int rc = command.appendTo(m_db->getContext());
5353
if (rc != REDIS_OK)
5454
{
5555
// The only reason of error is REDIS_ERR_OOM (Out of memory)

common/redisreply.cpp

+6-6
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ inline void guard(FUNC func, const char* command)
7878

7979
RedisReply::RedisReply(RedisContext *ctx, const RedisCommand& command)
8080
{
81-
int rc = redisAppendFormattedCommand(ctx->getContext(), command.c_str(), command.length());
81+
int rc = command.appendTo(ctx->getContext());
8282
if (rc != REDIS_OK)
8383
{
8484
// The only reason of error is REDIS_ERR_OOM (Out of memory)
@@ -89,9 +89,9 @@ RedisReply::RedisReply(RedisContext *ctx, const RedisCommand& command)
8989
rc = redisGetReply(ctx->getContext(), (void**)&m_reply);
9090
if (rc != REDIS_OK)
9191
{
92-
throw RedisError("Failed to redisGetReply with " + string(command.c_str()), ctx->getContext());
92+
throw RedisError("Failed to redisGetReply with " + command.toPrintableString(), ctx->getContext());
9393
}
94-
guard([&]{checkReply();}, command.c_str());
94+
guard([&]{checkReply();}, command.toPrintableString().c_str());
9595
}
9696

9797
RedisReply::RedisReply(RedisContext *ctx, const string& command)
@@ -109,19 +109,19 @@ RedisReply::RedisReply(RedisContext *ctx, const string& command)
109109
{
110110
throw RedisError("Failed to redisGetReply with " + command, ctx->getContext());
111111
}
112-
guard([&]{checkReply();}, command.c_str());
112+
guard([&]{checkReply();}, binary_to_printable(command.c_str(), command.length()).c_str());
113113
}
114114

115115
RedisReply::RedisReply(RedisContext *ctx, const RedisCommand& command, int expectedType)
116116
: RedisReply(ctx, command)
117117
{
118-
guard([&]{checkReplyType(expectedType);}, command.c_str());
118+
guard([&]{checkReplyType(expectedType);}, command.toPrintableString().c_str());
119119
}
120120

121121
RedisReply::RedisReply(RedisContext *ctx, const string& command, int expectedType)
122122
: RedisReply(ctx, command)
123123
{
124-
guard([&]{checkReplyType(expectedType);}, command.c_str());
124+
guard([&]{checkReplyType(expectedType);}, binary_to_printable(command.c_str(), command.length()).c_str());
125125
}
126126

127127
RedisReply::RedisReply(redisReply *reply) :

0 commit comments

Comments
 (0)