Skip to content

Commit ac495be

Browse files
committed
fix
1 parent 2dfd958 commit ac495be

File tree

3 files changed

+28
-13
lines changed

3 files changed

+28
-13
lines changed

flex/engines/graph_db/app/kafka_wal_ingester_app.cc

Lines changed: 23 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ namespace gs {
2222
#ifdef BUILD_KAFKA_WAL_WRITER_PARSER
2323

2424
struct WalIngester {
25+
constexpr static size_t BUFFSIZ = 4096;
2526
GraphDBSession& session_;
2627
timestamp_t begin_;
2728
timestamp_t end_;
@@ -31,30 +32,32 @@ struct WalIngester {
3132
std::vector<uint8_t> states_;
3233

3334
void resize() {
34-
size_t n = data_.size();
35-
std::vector<std::string> new_data(n + 4096);
36-
std::vector<uint8_t> new_states(n + 4096, 0);
37-
size_t idx = (ingested_plus_one_ - begin_) % n;
38-
for (size_t i = 0; i < n; ++i) {
35+
size_t origin_len = data_.size();
36+
std::vector<std::string> new_data(origin_len + BUFFSIZ);
37+
std::vector<uint8_t> new_states(origin_len + BUFFSIZ, 0);
38+
size_t idx = (ingested_plus_one_ - begin_) % origin_len;
39+
for (size_t i = 0; i < origin_len; ++i) {
3940
new_data[i] = data_[idx];
4041
new_states[i] = states_[idx];
4142
if (states_[idx]) {
4243
end_ = ingested_plus_one_ + i + 1;
4344
}
4445
++idx;
45-
idx %= n;
46+
idx %= origin_len;
4647
}
4748
data_ = std::move(new_data);
4849
states_ = std::move(new_states);
4950
begin_ = ingested_plus_one_;
5051
}
52+
53+
timestamp_t last_ingested() const { return ingested_plus_one_ - 1; }
5154
WalIngester(GraphDBSession& session, timestamp_t cur)
5255
: session_(session),
5356
begin_(cur),
5457
end_(cur),
5558
ingested_plus_one_(cur),
56-
data_(4096),
57-
states_(4096, 0) {}
59+
data_(BUFFSIZ),
60+
states_(BUFFSIZ, 0) {}
5861

5962
bool empty() const { return ingested_plus_one_ == end_; }
6063

@@ -77,7 +80,8 @@ struct WalIngester {
7780
}
7881

7982
void ingest() {
80-
size_t idx = (ingested_plus_one_ - begin_) % data_.size();
83+
size_t len = data_.size();
84+
size_t idx = (ingested_plus_one_ - begin_) % len;
8185
bool flag = false;
8286
while (states_[idx] == 2 || states_[idx] == 1) {
8387
if (states_[idx] == 1) {
@@ -86,7 +90,7 @@ struct WalIngester {
8690
states_[idx] = 0;
8791
++ingested_plus_one_;
8892
++idx;
89-
idx %= data_.size();
93+
idx %= len;
9094
flag = true;
9195
}
9296
if (flag) {
@@ -187,7 +191,7 @@ bool KafkaWalIngesterApp::Query(GraphDBSession& graph, Decoder& input,
187191
timestamp_t cur_ts = graph.db().get_last_ingested_wal_ts() + 1;
188192
gs::WalIngester ingester(graph, cur_ts);
189193
gs::KafkaWalConsumer consumer(ingester, config, topic_name, 1);
190-
while (graph.db().kafka_wal_ingester_state()) {
194+
while (!force_stop_.load()) {
191195
consumer.poll();
192196
ingester.ingest();
193197
std::this_thread::sleep_for(std::chrono::milliseconds(100));
@@ -196,8 +200,16 @@ bool KafkaWalIngesterApp::Query(GraphDBSession& graph, Decoder& input,
196200
consumer.poll();
197201
ingester.ingest();
198202
}
203+
int64_t ts = ingester.last_ingested();
204+
ouput.put_long(ts);
205+
return true;
206+
}
207+
208+
bool KafkaWalIngesterApp::terminal() {
209+
force_stop_.store(true);
199210
return true;
200211
}
212+
201213
AppWrapper KafkaWalIngesterAppFactory::CreateApp(const GraphDB& db) {
202214
return AppWrapper(new KafkaWalIngesterApp(), NULL);
203215
}

flex/engines/graph_db/app/kafka_wal_ingester_app.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,14 @@ namespace gs {
2424
// Ingest wal from kafka
2525
class KafkaWalIngesterApp : public WriteAppBase {
2626
public:
27-
KafkaWalIngesterApp() {}
27+
KafkaWalIngesterApp() : force_stop_(false) {}
2828

2929
AppType type() const override { return AppType::kBuiltIn; }
3030

3131
bool Query(GraphDBSession& graph, Decoder& input, Encoder& output) override;
32+
33+
bool terminal() override;
34+
std::atomic<bool> force_stop_{false};
3235
};
3336

3437
class KafkaWalIngesterAppFactory : public AppFactoryBase {

flex/tests/hqps/kafka_wal_ingester_test.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ int main(int argc, char** argv) {
9898
}
9999
LOG(INFO) << db.GetReadTransaction(0).GetVertexNum(0);
100100

101-
std::this_thread::sleep_for(std::chrono::seconds(3));
101+
std::this_thread::sleep_for(std::chrono::seconds(4));
102102
{
103103
auto txn = db2.GetReadTransaction(0);
104104
CHECK(txn.GetVertexNum(0) == 195) << "Vertex num: " << txn.GetVertexNum(0);

0 commit comments

Comments
 (0)