@@ -62,6 +62,8 @@ class TriggerRegistrar : public cloe::TriggerRegistrar {
62
62
return manager_.make_trigger (source_, c);
63
63
}
64
64
65
+ // TODO: Should these queue_trigger becomes inserts? Because if they are coming from
66
+ // C++ then they should be from a single thread.
65
67
void insert_trigger (const Conf& c) override { manager_.queue_trigger (source_, c); }
66
68
void insert_trigger (TriggerPtr&& t) override { manager_.queue_trigger (std::move (t)); }
67
69
@@ -168,44 +170,54 @@ void Coordinator::register_event(const std::string& key, EventFactoryPtr&& ef,
168
170
std::bind (&Coordinator::execute_trigger, this , std::placeholders::_1, std::placeholders::_2));
169
171
}
170
172
171
- void Coordinator::execute_trigger (TriggerPtr&& t, const Sync& sync) {
173
+ cloe::CallbackResult Coordinator::execute_trigger (TriggerPtr&& t, const Sync& sync) {
172
174
logger ()->debug (" Execute trigger {}" , inline_json (*t));
173
- (t->action ())(sync , *executer_registrar_);
175
+ auto result = (t->action ())(sync , *executer_registrar_);
174
176
if (!t->is_conceal ()) {
175
- history_.emplace_back (HistoryTrigger{ sync .time (), std::move (t)} );
177
+ history_.emplace_back (sync .time (), std::move (t));
176
178
}
179
+ return result;
177
180
}
178
181
179
- Duration Coordinator::process (const Sync& sync) {
182
+ size_t Coordinator::process_pending_web_triggers (const Sync& sync) {
180
183
// The only thing we need to do here is distribute the triggers from the
181
184
// input queue into their respective storage locations. We are responsible
182
185
// for thread safety here!
183
- auto now = sync . time () ;
186
+ size_t count = 0 ;
184
187
std::unique_lock guard (input_mutex_);
185
188
while (!input_queue_.empty ()) {
186
- auto tp = std::move (input_queue_.front ());
189
+ store_trigger ( std::move (input_queue_.front ()), sync );
187
190
input_queue_.pop_front ();
188
- tp->set_since (now);
191
+ count++;
192
+ }
193
+ return count;
194
+ }
189
195
190
- // Decide where to put the trigger
191
- auto key = tp->event ().name ();
192
- if (storage_.count (key) == 0 ) {
193
- // This is a programming error, since we should not be able to come this
194
- // far at all.
195
- throw std::logic_error (" cannot insert trigger with unregistered event" );
196
- }
197
- try {
198
- logger ()->debug (" Insert trigger {}" , inline_json (*tp));
199
- storage_[key]->emplace (std::move (tp), sync );
200
- } catch (TriggerError& e) {
201
- logger ()->error (" Error inserting trigger: {}" , e.what ());
202
- if (!allow_errors_) {
203
- throw ;
204
- }
196
+ void Coordinator::store_trigger (TriggerPtr&& tp, const Sync& sync) {
197
+ tp->set_since (sync .time ());
198
+
199
+ // Decide where to put the trigger
200
+ auto key = tp->event ().name ();
201
+ if (storage_.count (key) == 0 ) {
202
+ // This is a programming error, since we should not be able to come this
203
+ // far at all.
204
+ throw std::logic_error (" cannot insert trigger with unregistered event" );
205
+ }
206
+ try {
207
+ logger ()->debug (" Insert trigger {}" , inline_json (*tp));
208
+ storage_[key]->emplace (std::move (tp), sync );
209
+ } catch (TriggerError& e) {
210
+ logger ()->error (" Error inserting trigger: {}" , e.what ());
211
+ if (!allow_errors_) {
212
+ throw ;
205
213
}
206
214
}
215
+ }
207
216
208
- return now;
217
+ Duration Coordinator::process (const Sync& sync) {
218
+ auto now = sync .time ();
219
+ process_pending_web_triggers (sync );
220
+ return sync .time ();
209
221
}
210
222
211
223
namespace {
0 commit comments