Skip to content

Commit a3e3338

Browse files
committed
fix
1 parent 2dfd958 commit a3e3338

15 files changed

+314
-180
lines changed

flex/engines/graph_db/app/kafka_wal_ingester_app.cc

Lines changed: 25 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,13 @@
1616
#include "flex/engines/graph_db/app/kafka_wal_ingester_app.h"
1717
#include "cppkafka/cppkafka.h"
1818
#include "flex/engines/graph_db/database/graph_db.h"
19-
#include "flex/engines/graph_db/database/wal/kafka_wal_parser.h"
19+
#include "flex/engines/graph_db/database/wal/kafka_wal_utils.h"
2020

2121
namespace gs {
2222
#ifdef BUILD_KAFKA_WAL_WRITER_PARSER
2323

2424
struct WalIngester {
25+
constexpr static size_t BUFFSIZ = 4096;
2526
GraphDBSession& session_;
2627
timestamp_t begin_;
2728
timestamp_t end_;
@@ -31,30 +32,32 @@ struct WalIngester {
3132
std::vector<uint8_t> states_;
3233

3334
void resize() {
34-
size_t n = data_.size();
35-
std::vector<std::string> new_data(n + 4096);
36-
std::vector<uint8_t> new_states(n + 4096, 0);
37-
size_t idx = (ingested_plus_one_ - begin_) % n;
38-
for (size_t i = 0; i < n; ++i) {
35+
size_t origin_len = data_.size();
36+
std::vector<std::string> new_data(origin_len + BUFFSIZ);
37+
std::vector<uint8_t> new_states(origin_len + BUFFSIZ, 0);
38+
size_t idx = (ingested_plus_one_ - begin_) % origin_len;
39+
for (size_t i = 0; i < origin_len; ++i) {
3940
new_data[i] = data_[idx];
4041
new_states[i] = states_[idx];
4142
if (states_[idx]) {
4243
end_ = ingested_plus_one_ + i + 1;
4344
}
4445
++idx;
45-
idx %= n;
46+
idx %= origin_len;
4647
}
4748
data_ = std::move(new_data);
4849
states_ = std::move(new_states);
4950
begin_ = ingested_plus_one_;
5051
}
52+
53+
timestamp_t last_ingested() const { return ingested_plus_one_ - 1; }
5154
WalIngester(GraphDBSession& session, timestamp_t cur)
5255
: session_(session),
5356
begin_(cur),
5457
end_(cur),
5558
ingested_plus_one_(cur),
56-
data_(4096),
57-
states_(4096, 0) {}
59+
data_(BUFFSIZ),
60+
states_(BUFFSIZ, 0) {}
5861

5962
bool empty() const { return ingested_plus_one_ == end_; }
6063

@@ -77,7 +80,8 @@ struct WalIngester {
7780
}
7881

7982
void ingest() {
80-
size_t idx = (ingested_plus_one_ - begin_) % data_.size();
83+
size_t len = data_.size();
84+
size_t idx = (ingested_plus_one_ - begin_) % len;
8185
bool flag = false;
8286
while (states_[idx] == 2 || states_[idx] == 1) {
8387
if (states_[idx] == 1) {
@@ -86,7 +90,7 @@ struct WalIngester {
8690
states_[idx] = 0;
8791
++ingested_plus_one_;
8892
++idx;
89-
idx %= data_.size();
93+
idx %= len;
9094
flag = true;
9195
}
9296
if (flag) {
@@ -180,14 +184,15 @@ bool KafkaWalIngesterApp::Query(GraphDBSession& graph, Decoder& input,
180184
if (key == "topic_name") {
181185
topic_name = value;
182186
} else {
187+
LOG(INFO) << "Kafka config: " << key << " = " << value;
183188
config.set(std::string(key), std::string(value));
184189
}
185190
}
186191
LOG(INFO) << "Kafka brokers: " << config.get("metadata.broker.list");
187192
timestamp_t cur_ts = graph.db().get_last_ingested_wal_ts() + 1;
188193
gs::WalIngester ingester(graph, cur_ts);
189194
gs::KafkaWalConsumer consumer(ingester, config, topic_name, 1);
190-
while (graph.db().kafka_wal_ingester_state()) {
195+
while (!force_stop_.load()) {
191196
consumer.poll();
192197
ingester.ingest();
193198
std::this_thread::sleep_for(std::chrono::milliseconds(100));
@@ -196,8 +201,16 @@ bool KafkaWalIngesterApp::Query(GraphDBSession& graph, Decoder& input,
196201
consumer.poll();
197202
ingester.ingest();
198203
}
204+
int64_t ts = ingester.last_ingested();
205+
output.put_long(ts);
206+
return true;
207+
}
208+
209+
bool KafkaWalIngesterApp::terminal() {
210+
force_stop_.store(true);
199211
return true;
200212
}
213+
201214
AppWrapper KafkaWalIngesterAppFactory::CreateApp(const GraphDB& db) {
202215
return AppWrapper(new KafkaWalIngesterApp(), NULL);
203216
}

flex/engines/graph_db/app/kafka_wal_ingester_app.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,14 @@ namespace gs {
2424
// Ingest wal from kafka
2525
class KafkaWalIngesterApp : public WriteAppBase {
2626
public:
27-
KafkaWalIngesterApp() {}
27+
KafkaWalIngesterApp() : force_stop_(false) {}
2828

2929
AppType type() const override { return AppType::kBuiltIn; }
3030

3131
bool Query(GraphDBSession& graph, Decoder& input, Encoder& output) override;
32+
33+
bool terminal();
34+
std::atomic<bool> force_stop_{false};
3235
};
3336

3437
class KafkaWalIngesterAppFactory : public AppFactoryBase {

flex/engines/graph_db/database/graph_db.cc

Lines changed: 0 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -28,10 +28,6 @@
2828
#include "flex/engines/graph_db/runtime/utils/cypher_runner_impl.h"
2929
#include "flex/utils/yaml_utils.h"
3030

31-
#ifdef BUILD_KAFKA_WAL_WRITER_PARSER
32-
#include "flex/engines/graph_db/app/kafka_wal_ingester_app.h"
33-
#endif
34-
3531
#include "flex/third_party/httplib.h"
3632

3733
namespace gs {
@@ -58,7 +54,6 @@ struct SessionLocalContext {
5854

5955
GraphDB::GraphDB()
6056
: monitor_thread_running_(false),
61-
kafka_wal_ingester_thread_running_(false),
6257
last_ingested_wal_ts_(0),
6358
compact_thread_running_(false) {}
6459
GraphDB::~GraphDB() {
@@ -282,54 +277,6 @@ void GraphDB::Close() {
282277
std::fill(app_factories_.begin(), app_factories_.end(), nullptr);
283278
}
284279

285-
#ifdef BUILD_KAFKA_WAL_WRITER_PARSER
286-
bool GraphDB::kafka_wal_ingester_state() const {
287-
return kafka_wal_ingester_thread_running_.load();
288-
}
289-
290-
void GraphDB::start_kafka_wal_ingester(const cppkafka::Configuration& config,
291-
const std::string& topic_name) {
292-
if (kafka_wal_ingester_thread_running_) {
293-
kafka_wal_ingester_thread_running_ = false;
294-
if (kafka_wal_ingester_thread_.joinable()) {
295-
kafka_wal_ingester_thread_.join();
296-
}
297-
}
298-
kafka_wal_ingester_thread_running_ = true;
299-
300-
kafka_wal_ingester_thread_ = std::thread([&]() {
301-
std::vector<char> buffer;
302-
gs::Encoder encoder(buffer);
303-
encoder.put_string("topic_name");
304-
encoder.put_string(topic_name);
305-
if (config.has_property("metadata.broker.list")) {
306-
encoder.put_string("metadata.broker.list");
307-
encoder.put_string(config.get("metadata.broker.list"));
308-
}
309-
if (config.has_property("group.id")) {
310-
encoder.put_string("group.id");
311-
encoder.put_string(config.get("group.id"));
312-
}
313-
if (config.has_property("enable.auto.commit")) {
314-
encoder.put_string("enable.auto.commit");
315-
encoder.put_string(config.get("enable.auto.commit"));
316-
}
317-
if (config.has_property("auto.offset.reset")) {
318-
encoder.put_string("auto.offset.reset");
319-
encoder.put_string(config.get("auto.offset.reset"));
320-
}
321-
gs::Decoder decoder(buffer.data(), buffer.size());
322-
KafkaWalIngesterApp().Query(GetSession(0), decoder, encoder);
323-
});
324-
}
325-
326-
void GraphDB::stop_kafka_wal_ingester() {
327-
kafka_wal_ingester_thread_running_ = false;
328-
kafka_wal_ingester_thread_.join();
329-
}
330-
331-
#endif
332-
333280
ReadTransaction GraphDB::GetReadTransaction(int thread_id) {
334281
return contexts_[thread_id].session.GetReadTransaction();
335282
}

flex/engines/graph_db/database/graph_db.h

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -33,9 +33,6 @@
3333
#include "flex/storages/rt_mutable_graph/loader/loader_factory.h"
3434
#include "flex/storages/rt_mutable_graph/loading_config.h"
3535
#include "flex/storages/rt_mutable_graph/mutable_property_fragment.h"
36-
#ifdef BUILD_KAFKA_WAL_WRITER_PARSER
37-
#include "cppkafka/cppkafka.h"
38-
#endif
3936

4037
namespace gs {
4138

@@ -172,15 +169,6 @@ class GraphDB {
172169

173170
inline const GraphDBConfig& config() const { return config_; }
174171

175-
#ifdef BUILD_KAFKA_WAL_WRITER_PARSER
176-
bool kafka_wal_ingester_state() const;
177-
178-
void start_kafka_wal_ingester(const cppkafka::Configuration& config,
179-
const std::string& topic_name);
180-
181-
void stop_kafka_wal_ingester();
182-
#endif
183-
184172
uint64_t get_last_ingested_wal_ts() const { return last_ingested_wal_ts_; }
185173
void set_last_ingested_wal_ts(uint64_t ts) { last_ingested_wal_ts_ = ts; }
186174

@@ -219,8 +207,6 @@ class GraphDB {
219207
std::thread monitor_thread_;
220208
bool monitor_thread_running_;
221209

222-
std::thread kafka_wal_ingester_thread_;
223-
std::atomic<bool> kafka_wal_ingester_thread_running_;
224210
uint64_t last_ingested_wal_ts_;
225211

226212
timestamp_t last_compaction_ts_;

flex/engines/graph_db/database/wal/kafka_wal_parser.cc

Lines changed: 14 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -14,81 +14,31 @@
1414
*/
1515

1616
#include "flex/engines/graph_db/database/wal/kafka_wal_parser.h"
17+
#include "flex/engines/graph_db/database/wal/kafka_wal_utils.h"
1718
#include "flex/engines/graph_db/database/wal/wal.h"
1819

1920
namespace gs {
2021

21-
std::vector<cppkafka::TopicPartition> get_all_topic_partitions(
22-
const cppkafka::Configuration& config, const std::string& topic_name,
23-
bool from_beginning) {
24-
std::vector<cppkafka::TopicPartition> partitions;
25-
cppkafka::Consumer consumer(config); // tmp consumer
26-
LOG(INFO) << config.get("metadata.broker.list");
27-
LOG(INFO) << config.get("group.id");
28-
29-
LOG(INFO) << "Get metadata for topic " << topic_name;
30-
auto meta_vector = consumer.get_metadata().get_topics({topic_name});
31-
if (meta_vector.empty()) {
32-
LOG(WARNING) << "Failed to get metadata for topic " << topic_name
33-
<< ", maybe the topic does not exist";
34-
return {};
35-
}
36-
auto metadata = meta_vector.front().get_partitions();
37-
for (const auto& partition : metadata) {
38-
if (from_beginning) {
39-
partitions.push_back(
40-
cppkafka::TopicPartition(topic_name, partition.get_id(), 0));
41-
} else {
42-
partitions.push_back(cppkafka::TopicPartition(
43-
topic_name, partition.get_id())); // from the beginning
44-
}
45-
}
46-
return partitions;
47-
}
48-
4922
std::unique_ptr<IWalParser> KafkaWalParser::Make(const std::string& uri) {
5023
// uri should be like
5124
// "kafka://localhost:9092,localhost:9093/my_topic?group.id=my_consumer_group"
52-
const std::string prefix = "kafka://";
53-
if (uri.find(prefix) != 0) {
54-
LOG(FATAL) << "Invalid uri: " << uri;
55-
}
56-
57-
std::string hosts_part = uri.substr(prefix.length());
58-
size_t query_pos = hosts_part.find('/');
59-
std::string hosts;
60-
std::string query;
61-
cppkafka::Configuration config;
62-
63-
if (query_pos != std::string::npos) {
64-
hosts = hosts_part.substr(0, query_pos);
65-
query = hosts_part.substr(query_pos + 1);
66-
} else {
67-
LOG(FATAL) << "Invalid uri: " << uri;
25+
auto res = parse_uri(uri);
26+
if (!res) {
27+
LOG(FATAL) << "Failed to parse uri: " << uri;
28+
return nullptr;
6829
}
69-
config.set("metadata.broker.list", hosts);
70-
size_t top_pos = query.find('?');
30+
gs::Decoder decoder(res.value().data(), res.value().size());
7131
std::string topic_name;
72-
if (top_pos != std::string::npos) {
73-
topic_name = query.substr(0, top_pos);
74-
query = query.substr(top_pos + 1);
75-
} else {
76-
LOG(FATAL) << "Invalid uri: " << uri;
77-
}
78-
std::istringstream query_stream(query);
79-
std::string pair;
80-
while (std::getline(query_stream, pair, '&')) {
81-
size_t eq_pos = pair.find('=');
82-
if (eq_pos != std::string::npos) {
83-
std::string key = pair.substr(0, eq_pos);
84-
std::string value = pair.substr(eq_pos + 1);
85-
if (key == "group.id") {
86-
config.set("group.id", value);
87-
}
32+
cppkafka::Configuration config;
33+
while (!decoder.empty()) {
34+
auto key = decoder.get_string();
35+
auto value = decoder.get_string();
36+
if (key == "topic_name") {
37+
topic_name = value;
38+
} else {
39+
config.set(std::string(key), std::string(value));
8840
}
8941
}
90-
config.set("enable.auto.commit", false);
91-
9242
auto parser = std::unique_ptr<IWalParser>(new KafkaWalParser(config));
9343
parser->open(topic_name);
9444
return parser;

flex/engines/graph_db/database/wal/kafka_wal_parser.h

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,6 @@
2222

2323
namespace gs {
2424

25-
/*
26-
* Get all partitions of the given topic.
27-
*/
28-
std::vector<cppkafka::TopicPartition> get_all_topic_partitions(
29-
const cppkafka::Configuration& config, const std::string& topic_name,
30-
bool from_beginning = true);
31-
3225
class KafkaWalParser : public IWalParser {
3326
public:
3427
static constexpr const std::chrono::milliseconds POLL_TIMEOUT =

0 commit comments

Comments
 (0)