Skip to content

Commit a215441

Browse files
authored
Improve Consumer interface to support ZMQ based Producer/Consumer table. (sonic-net#2562)
**What I did** Improve Consumer interface to support ZMQ based Producer/Consumer table. **Why I did it** To improve route create performance, swsscommon lib will add ZMQ based Producer/Consumer table. Because currently Consumer interface only support Redis based Producer/Consumer table, so improve this interface to support ZMQ based Producer/Consumer table. ZMQ based Producer/Consumer table can find in this PR: sonic-net/sonic-swss-common#715 **How I verified it** Pass all UT. **Details if related**
1 parent 867e355 commit a215441

File tree

2 files changed

+70
-54
lines changed

2 files changed

+70
-54
lines changed

orchagent/orch.cpp

+30-31
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ vector<Selectable *> Orch::getSelectables()
7070
return selectables;
7171
}
7272

73-
void Consumer::addToSync(const KeyOpFieldsValuesTuple &entry)
73+
void ConsumerBase::addToSync(const KeyOpFieldsValuesTuple &entry)
7474
{
7575
SWSS_LOG_ENTER();
7676

@@ -157,7 +157,7 @@ void Consumer::addToSync(const KeyOpFieldsValuesTuple &entry)
157157

158158
}
159159

160-
size_t Consumer::addToSync(const std::deque<KeyOpFieldsValuesTuple> &entries)
160+
size_t ConsumerBase::addToSync(const std::deque<KeyOpFieldsValuesTuple> &entries)
161161
{
162162
SWSS_LOG_ENTER();
163163

@@ -194,9 +194,7 @@ size_t Consumer::refillToSync(Table* table)
194194

195195
size_t Consumer::refillToSync()
196196
{
197-
ConsumerTableBase *consumerTable = getConsumerTable();
198-
199-
auto subTable = dynamic_cast<SubscriberStateTable *>(consumerTable);
197+
auto subTable = dynamic_cast<SubscriberStateTable *>(getSelectable());
200198
if (subTable != NULL)
201199
{
202200
size_t update_size = 0;
@@ -213,35 +211,14 @@ size_t Consumer::refillToSync()
213211
else
214212
{
215213
// consumerTable is either ConsumerStateTable or ConsumerTable
216-
auto db = consumerTable->getDbConnector();
217-
string tableName = consumerTable->getTableName();
214+
auto db = getDbConnector();
215+
string tableName = getTableName();
218216
auto table = Table(db, tableName);
219217
return refillToSync(&table);
220218
}
221219
}
222220

223-
void Consumer::execute()
224-
{
225-
SWSS_LOG_ENTER();
226-
227-
size_t update_size = 0;
228-
do
229-
{
230-
std::deque<KeyOpFieldsValuesTuple> entries;
231-
getConsumerTable()->pops(entries);
232-
update_size = addToSync(entries);
233-
} while (update_size != 0);
234-
235-
drain();
236-
}
237-
238-
void Consumer::drain()
239-
{
240-
if (!m_toSync.empty())
241-
m_orch->doTask(*this);
242-
}
243-
244-
string Consumer::dumpTuple(const KeyOpFieldsValuesTuple &tuple)
221+
string ConsumerBase::dumpTuple(const KeyOpFieldsValuesTuple &tuple)
245222
{
246223
string s = getTableName() + getConsumerTable()->getTableNameSeparator() + kfvKey(tuple)
247224
+ "|" + kfvOp(tuple);
@@ -253,7 +230,7 @@ string Consumer::dumpTuple(const KeyOpFieldsValuesTuple &tuple)
253230
return s;
254231
}
255232

256-
void Consumer::dumpPendingTasks(vector<string> &ts)
233+
void ConsumerBase::dumpPendingTasks(vector<string> &ts)
257234
{
258235
for (auto &tm : m_toSync)
259236
{
@@ -265,6 +242,28 @@ void Consumer::dumpPendingTasks(vector<string> &ts)
265242
}
266243
}
267244

245+
void Consumer::execute()
246+
{
247+
SWSS_LOG_ENTER();
248+
249+
size_t update_size = 0;
250+
auto table = static_cast<swss::ConsumerTableBase *>(getSelectable());
251+
do
252+
{
253+
std::deque<KeyOpFieldsValuesTuple> entries;
254+
table->pops(entries);
255+
update_size = addToSync(entries);
256+
} while (update_size != 0);
257+
258+
drain();
259+
}
260+
261+
void Consumer::drain()
262+
{
263+
if (!m_toSync.empty())
264+
m_orch->doTask(*this);
265+
}
266+
268267
size_t Orch::addExistingData(const string& tableName)
269268
{
270269
auto consumer = dynamic_cast<Consumer *>(getExecutor(tableName));
@@ -586,7 +585,7 @@ void Orch::logfileReopen()
586585
}
587586
}
588587

589-
void Orch::recordTuple(Consumer &consumer, const KeyOpFieldsValuesTuple &tuple)
588+
void Orch::recordTuple(ConsumerBase &consumer, const KeyOpFieldsValuesTuple &tuple)
590589
{
591590
string s = consumer.dumpTuple(tuple);
592591

orchagent/orch.h

+40-23
Original file line numberDiff line numberDiff line change
@@ -132,49 +132,66 @@ class Executor : public swss::Selectable
132132
swss::Selectable *getSelectable() const { return m_selectable; }
133133
};
134134

135-
class Consumer : public Executor {
135+
class ConsumerBase : public Executor {
136+
public:
137+
ConsumerBase(swss::Selectable *selectable, Orch *orch, const std::string &name)
138+
: Executor(selectable, orch, name)
139+
{
140+
}
141+
142+
virtual swss::TableBase *getConsumerTable() const = 0;
143+
144+
std::string getTableName() const
145+
{
146+
return getConsumerTable()->getTableName();
147+
}
148+
149+
std::string dumpTuple(const swss::KeyOpFieldsValuesTuple &tuple);
150+
void dumpPendingTasks(std::vector<std::string> &ts);
151+
152+
/* Store the latest 'golden' status */
153+
// TODO: hide?
154+
SyncMap m_toSync;
155+
156+
void addToSync(const swss::KeyOpFieldsValuesTuple &entry);
157+
158+
// Returns: the number of entries added to m_toSync
159+
size_t addToSync(const std::deque<swss::KeyOpFieldsValuesTuple> &entries);
160+
};
161+
162+
class Consumer : public ConsumerBase {
136163
public:
137164
Consumer(swss::ConsumerTableBase *select, Orch *orch, const std::string &name)
138-
: Executor(select, orch, name)
165+
: ConsumerBase(select, orch, name)
139166
{
140167
}
141168

142-
swss::ConsumerTableBase *getConsumerTable() const
169+
swss::TableBase *getConsumerTable() const override
143170
{
171+
// ConsumerTableBase is a subclass of TableBase
144172
return static_cast<swss::ConsumerTableBase *>(getSelectable());
145173
}
146174

147-
std::string getTableName() const
175+
const swss::DBConnector* getDbConnector() const
148176
{
149-
return getConsumerTable()->getTableName();
177+
auto table = static_cast<swss::ConsumerTableBase *>(getSelectable());
178+
return table->getDbConnector();
150179
}
151180

152181
int getDbId() const
153182
{
154-
return getConsumerTable()->getDbConnector()->getDbId();
183+
return getDbConnector()->getDbId();
155184
}
156185

157186
std::string getDbName() const
158187
{
159-
return getConsumerTable()->getDbConnector()->getDbName();
188+
return getDbConnector()->getDbName();
160189
}
161190

162-
std::string dumpTuple(const swss::KeyOpFieldsValuesTuple &tuple);
163-
void dumpPendingTasks(std::vector<std::string> &ts);
164-
165191
size_t refillToSync();
166192
size_t refillToSync(swss::Table* table);
167-
void execute();
168-
void drain();
169-
170-
/* Store the latest 'golden' status */
171-
// TODO: hide?
172-
SyncMap m_toSync;
173-
174-
void addToSync(const swss::KeyOpFieldsValuesTuple &entry);
175-
176-
// Returns: the number of entries added to m_toSync
177-
size_t addToSync(const std::deque<swss::KeyOpFieldsValuesTuple> &entries);
193+
void execute() override;
194+
void drain() override;
178195
};
179196

180197
typedef std::map<std::string, std::shared_ptr<Executor>> ConsumerMap;
@@ -215,12 +232,12 @@ class Orch
215232
virtual void doTask();
216233

217234
/* Run doTask against a specific executor */
218-
virtual void doTask(Consumer &consumer) = 0;
235+
virtual void doTask(Consumer &consumer) { };
219236
virtual void doTask(swss::NotificationConsumer &consumer) { }
220237
virtual void doTask(swss::SelectableTimer &timer) { }
221238

222239
/* TODO: refactor recording */
223-
static void recordTuple(Consumer &consumer, const swss::KeyOpFieldsValuesTuple &tuple);
240+
static void recordTuple(ConsumerBase &consumer, const swss::KeyOpFieldsValuesTuple &tuple);
224241

225242
void dumpPendingTasks(std::vector<std::string> &ts);
226243

0 commit comments

Comments
 (0)