Skip to content

feat(interactive): Impl kafka wal writer and wal paresr #4518

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 18 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions .github/workflows/interactive.yml
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,28 @@ jobs:
export ENGINE_TYPE=hiactor
bash hqps_adhoc_test.sh ${INTERACTIVE_WORKSPACE} graph_algo RBO

- name: Run kafka wal test
env:
GS_TEST_DIR: ${{ github.workspace }}/gstest
run: |
git submodule update --init
cd ${GITHUB_WORKSPACE}/flex/build
cmake .. -DBUILD_KAFKA_WAL_WRITER_PARSER=ON && make -j$(nproc)
wget https://dlcdn.apache.org/kafka/3.9.0/kafka_2.13-3.9.0.tgz
tar -zxf kafka_2.13-3.9.0.tgz
cd kafka_2.13-3.9.0
KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"
bin/kafka-storage.sh format --standalone -t $KAFKA_CLUSTER_ID -c config/kraft/reconfig-server.properties
bin/kafka-server-start.sh config/kraft/reconfig-server.properties &

Comment on lines +382 to +383
Copy link
Preview

Copilot AI Mar 31, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider adding a delay (or a readiness check) after starting the Kafka server to ensure it is fully initialized before subsequent commands are executed.

Suggested change
bin/kafka-server-start.sh config/kraft/reconfig-server.properties &
bin/kafka-server-start.sh config/kraft/reconfig-server.properties &
# Wait for Kafka server to be ready
while ! bin/kafka-topics.sh --list --bootstrap-server localhost:9092; do
echo "Waiting for Kafka to be ready..."
sleep 5
done

Copilot uses AI. Check for mistakes.

bin/kafka-topics.sh --create --topic kafka-test --bootstrap-server localhost:9092
../tests/hqps/kafka_test localhost:9092 kafka-test
bin/kafka-topics.sh --delete --topic kafka-test --bootstrap-server localhost:9092
bin/kafka-topics.sh --create --topic kafka-test --bootstrap-server localhost:9092
../tests/hqps/kafka_wal_ingester_test .. localhost:9092 kafka-test
bin/kafka-topics.sh --delete --topic kafka-test --bootstrap-server localhost:9092
ps aux | grep kafka | grep -v grep | awk '{print $2}' | xargs kill -9

Comment on lines +390 to +391
Copy link
Preview

Copilot AI Mar 31, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The process termination command may kill unrelated processes. Consider refining the filter to specifically target the Kafka process started by the workflow.

Suggested change
ps aux | grep kafka | grep -v grep | awk '{print $2}' | xargs kill -9
kill -9 $(cat kafka_pid.txt)

Copilot uses AI. Check for mistakes.

- name: Run Gremlin test on modern graph
env:
GS_TEST_DIR: ${{ github.workspace }}/gstest
Expand Down
37 changes: 37 additions & 0 deletions docs/flex/interactive/development/dev_and_test.md
Original file line number Diff line number Diff line change
Expand Up @@ -262,3 +262,40 @@ In Interactive's execution engine, transactions such as `ReadTransaction`, `Upda
2. If a transaction returns `false` during the `commit()` process, the error occurred prior to applying the WAL to the graph data. This type of failure could arise during the construction of the WAL or during its writing phase.

3. It is important to note that errors can still occur when replaying the WAL to the graph database. Replaying might fail due to limitations in resources or due to unforeseen bugs. **However,** any errors encountered during this stage will be handled via exceptions or may result in process failure. Currently, there is no established mechanism to handle such failures. Future improvements should focus on implementing failover strategies, potentially allowing the GraphDB to continue replaying the WAL until it succeeds.

## Persisting WAL to kafka

Kafka-based WAL storages is also provided. Follows [kafka-quick-start](https://kafka.apache.org/quickstart).

### Install kafka

```bash
wget https://dlcdn.apache.org/kafka/3.9.0/kafka_2.13-3.9.0.tgz
tar -zxf kafka_2.13-3.9.0.tgz
cd kafka_2.13-3.9.0
```

### kafka with kraft

```bash
KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"
bin/kafka-storage.sh format --standalone -t $KAFKA_CLUSTER_ID -c config/kraft/reconfig-server.properties
bin/kafka-server-start.sh config/kraft/reconfig-server.properties
```

### Create topic

```bash
bin/kafka-topics.sh --create --topic kafka-test --bootstrap-server localhost:9092
# describe the topic
bin/kafka-topics.sh --describe --topic kafka-test --bootstrap-server localhost:9092
```

### Test KafkaWalWriter and KafkaWalParser

```bash
cd flex && mkdir build
cd build && cmake .. -DBUILD_TEST=ON && make -j
./tests/hqps/kafka_test localhost:902 kafka-test
# run the kafka tst
```
17 changes: 17 additions & 0 deletions flex/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ option(USE_PTHASH "Whether to use pthash" OFF)
option(OPTIMIZE_FOR_HOST "Whether to optimize on host" ON) # Whether to build optimized code on host
option(USE_STATIC_ARROW "Whether to use static arrow" OFF) # Whether to link arrow statically, default is OFF
option(BUILD_WITH_OTEL "Whether to build with opentelemetry-cpp" OFF) # Whether to build with opentelemetry-cpp, default is OFF

option(BUILD_KAFKA_WAL_WRITER_PARSER "Whether to build kafka wal writer and wal parser" OFF) # Whether to build kafka wal writer and wal parser, default is OFF
option(BUILD_WITH_OSS "Whether to build with oss support" OFF) # Whether to build with oss support, default is OFF

#print options
Expand Down Expand Up @@ -129,6 +131,21 @@ if (NOT yaml-cpp_FOUND)
message(FATAL_ERROR "yaml-cpp not found, please install the yaml-cpp library")
endif ()

#find CppKafka-------------------------------------------------------------------
if (BUILD_KAFKA_WAL_WRITER_PARSER)
find_package(CppKafka)
if (NOT CppKafka_FOUND)
message(FATAL_ERROR "cppkafka not found, please install cppkafka library")
else()
include_directories(SYSTEM ${CppKafka_INCLUDE_DIRS})
message(STATUS "cpp kafka include dir: ${CppKafka_INCLUDE_DIRS}")
set(CppKafka_LIBRARIES CppKafka::cppkafka)
add_definitions(-DBUILD_KAFKA_WAL_WRITER_PARSER)
message(STATUS "cppkafka found")
endif ()
endif()


#find boost----------------------------------------------------------------------
find_package(Boost REQUIRED COMPONENTS system filesystem
# required by folly
Expand Down
8 changes: 8 additions & 0 deletions flex/engines/graph_db/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,19 @@ file(GLOB_RECURSE GRAPH_DB_SRC_FILES "${CMAKE_CURRENT_SOURCE_DIR}/app/*.cc"
"${CMAKE_CURRENT_SOURCE_DIR}/database/wal/*.cc"
"${CMAKE_CURRENT_SOURCE_DIR}/app/builtin/*.cc")

if (NOT BUILD_KAFKA_WAL_WRITER_PARSER)
list(REMOVE_ITEM GRAPH_DB_SRC_FILES "${CMAKE_CURRENT_SOURCE_DIR}/database/wal/kafka_wal_writer.cc"
"${CMAKE_CURRENT_SOURCE_DIR}/database/wal/kafka_wal_parser.cc")
endif()

add_library(flex_graph_db SHARED ${GRAPH_DB_SRC_FILES})

target_include_directories(flex_graph_db PUBLIC $<BUILD_INTERFACE:${CMAKE_CURRENT_BINARY_DIR}>)
target_link_libraries(flex_graph_db flex_rt_mutable_graph ${LIBGRAPELITE_LIBRARIES} ${CMAKE_THREAD_LIBS_INIT})
target_link_libraries(flex_graph_db runtime_execute)
if (BUILD_KAFKA_WAL_WRITER_PARSER)
target_link_libraries(flex_graph_db ${CppKafka_LIBRARIES})
endif()
install_flex_target(flex_graph_db)

install(FILES ${CMAKE_CURRENT_SOURCE_DIR}/database/graph_db.h
Expand Down
205 changes: 205 additions & 0 deletions flex/engines/graph_db/app/kafka_wal_ingester_app.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,205 @@
/** Copyright 2020 Alibaba Group Holding Limited.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "flex/engines/graph_db/app/kafka_wal_ingester_app.h"
#include "cppkafka/cppkafka.h"
#include "flex/engines/graph_db/database/graph_db.h"
#include "flex/engines/graph_db/database/wal/kafka_wal_parser.h"

namespace gs {
#ifdef BUILD_KAFKA_WAL_WRITER_PARSER

struct WalIngester {
GraphDBSession& session_;
timestamp_t begin_;
timestamp_t end_;
timestamp_t ingested_plus_one_;
std::vector<std::string> data_;
// 0: not exist, 1: exist, 2: ingested
std::vector<uint8_t> states_;

void resize() {
size_t n = data_.size();
std::vector<std::string> new_data(n + 4096);
std::vector<uint8_t> new_states(n + 4096, 0);
size_t idx = (ingested_plus_one_ - begin_) % n;
for (size_t i = 0; i < n; ++i) {
new_data[i] = data_[idx];
new_states[i] = states_[idx];
if (states_[idx]) {
end_ = ingested_plus_one_ + i + 1;
}
++idx;
idx %= n;
}
data_ = std::move(new_data);
states_ = std::move(new_states);
begin_ = ingested_plus_one_;
}
WalIngester(GraphDBSession& session, timestamp_t cur)
: session_(session),
begin_(cur),
end_(cur),
ingested_plus_one_(cur),
data_(4096),
states_(4096, 0) {}

bool empty() const { return ingested_plus_one_ == end_; }

void ingest_impl(const std::string& data) {
auto header = reinterpret_cast<const WalHeader*>(data.data());
if (header->type == 0) {
InsertTransaction::IngestWal(
session_.graph(), header->timestamp,
const_cast<char*>(data.data()) + sizeof(WalHeader), header->length,
session_.allocator());
} else {
auto txn = session_.GetUpdateTransaction();
auto header = reinterpret_cast<const WalHeader*>(data.data());
txn.IngestWal(session_.graph(), session_.db().work_dir(),
header->timestamp,
const_cast<char*>(data.data()) + sizeof(WalHeader),
header->length, session_.allocator());
txn.Commit();
}
}

void ingest() {
size_t idx = (ingested_plus_one_ - begin_) % data_.size();
bool flag = false;
while (states_[idx] == 2 || states_[idx] == 1) {
if (states_[idx] == 1) {
ingest_impl(data_[idx]);
}
states_[idx] = 0;
++ingested_plus_one_;
++idx;
idx %= data_.size();
flag = true;
}
if (flag) {
session_.commit(ingested_plus_one_);
}
}
void push(const std::string& data) {
auto header = reinterpret_cast<const WalHeader*>(data.data());
if (header->timestamp < begin_) {
LOG(ERROR) << "Invalid timestamp: " << header->timestamp;
return;
}
size_t index;
size_t n = data_.size();
if (header->timestamp < end_) {
index = (header->timestamp - begin_) % n;
} else if (header->timestamp - begin_ < n) {
index = header->timestamp - begin_;
end_ = header->timestamp + 1;
} else {
ingest();
while (header->timestamp - ingested_plus_one_ + 1 > states_.size()) {
resize();
}
index = (header->timestamp - begin_) % states_.size();
end_ = header->timestamp + 1;
}
if (header->length == 0) {
states_[index] = 2;
} else if (header->type == 0) {
ingest_impl(data);
states_[index] = 2;
} else {
states_[index] = 1;
data_[index] = data;
}
}
};

class KafkaWalConsumer {
public:
static constexpr const std::chrono::milliseconds POLL_TIMEOUT =
std::chrono::milliseconds(100);

// always track all partitions and from begining
KafkaWalConsumer(WalIngester& ingester, cppkafka::Configuration config,
const std::string& topic_name, int32_t thread_num)
: ingester_(ingester) {
auto topic_partitions = get_all_topic_partitions(config, topic_name, false);
consumers_.reserve(topic_partitions.size());
for (size_t i = 0; i < topic_partitions.size(); ++i) {
consumers_.emplace_back(std::make_unique<cppkafka::Consumer>(config));
consumers_.back()->assign({topic_partitions[i]});
}
}

void poll() {
for (auto& consumer : consumers_) {
auto msgs = consumer->poll_batch(1024);
for (const auto& msg : msgs) {
if (msg) {
if (msg.get_error()) {
if (!msg.is_eof()) {
LOG(INFO) << "[+] Received error notification: "
<< msg.get_error();
}
} else {
std::string payload = msg.get_payload();
ingester_.push(payload);
consumer->commit(msg);
}
}
}
}
}

private:
std::vector<std::unique_ptr<cppkafka::Consumer>> consumers_;
WalIngester& ingester_;
};

void Ingest(const std::string& data, GraphDBSession& session) {}

bool KafkaWalIngesterApp::Query(GraphDBSession& graph, Decoder& input,
Encoder& output) {
cppkafka::Configuration config;
std::string topic_name;
while (!input.empty()) {
auto key = input.get_string();
auto value = input.get_string();
if (key == "topic_name") {
topic_name = value;
} else {
config.set(std::string(key), std::string(value));
}
}
LOG(INFO) << "Kafka brokers: " << config.get("metadata.broker.list");
timestamp_t cur_ts = graph.db().get_last_ingested_wal_ts() + 1;
gs::WalIngester ingester(graph, cur_ts);
gs::KafkaWalConsumer consumer(ingester, config, topic_name, 1);
while (graph.db().kafka_wal_ingester_state()) {
consumer.poll();
ingester.ingest();
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
while (!ingester.empty()) {
consumer.poll();
ingester.ingest();
}
return true;
}
AppWrapper KafkaWalIngesterAppFactory::CreateApp(const GraphDB& db) {
return AppWrapper(new KafkaWalIngesterApp(), NULL);
}
#endif
} // namespace gs
45 changes: 45 additions & 0 deletions flex/engines/graph_db/app/kafka_wal_ingester_app.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/** Copyright 2020 Alibaba Group Holding Limited.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#ifndef ENGINES_KAFKA_WAL_INGESTER_APP_H_
#define ENGINES_KAFKA_WAL_INGESTER_APP_H_

#include "flex/engines/graph_db/app/app_base.h"
#include "flex/engines/graph_db/database/graph_db_session.h"

namespace gs {
#ifdef BUILD_KAFKA_WAL_WRITER_PARSER
// Ingest wal from kafka
class KafkaWalIngesterApp : public WriteAppBase {
public:
KafkaWalIngesterApp() {}

AppType type() const override { return AppType::kBuiltIn; }

bool Query(GraphDBSession& graph, Decoder& input, Encoder& output) override;
};

class KafkaWalIngesterAppFactory : public AppFactoryBase {
public:
KafkaWalIngesterAppFactory() = default;
~KafkaWalIngesterAppFactory() = default;

AppWrapper CreateApp(const GraphDB& db) override;
};
#endif

} // namespace gs

#endif // ENGINES_KAFKA_WAL_INGESTER_APP_H_
Loading
Loading