Skip to content

feat(interactive): Impl kafka wal writer and wal paresr #4518

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 18 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions .github/workflows/interactive.yml
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,27 @@ jobs:
export ENGINE_TYPE=hiactor
bash hqps_adhoc_test.sh ${INTERACTIVE_WORKSPACE} graph_algo RBO

- name: Run kafka wal test
env:
GS_TEST_DIR: ${{ github.workspace }}/gstest
run: |
cd ${GITHUB_WORKSPACE}/flex/build
cmake .. -DBUILD_KAFKA_WAL_WRITER_PARSER=ON && make -j$(nproc)
wget https://dlcdn.apache.org/kafka/3.9.0/kafka_2.13-3.9.0.tgz
tar -zxf kafka_2.13-3.9.0.tgz
cd kafka_2.13-3.9.0
KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"
bin/kafka-storage.sh format --standalone -t $KAFKA_CLUSTER_ID -c config/kraft/reconfig-server.properties
bin/kafka-server-start.sh config/kraft/reconfig-server.properties &

Comment on lines +382 to +383
Copy link
Preview

Copilot AI Mar 31, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider adding a delay (or a readiness check) after starting the Kafka server to ensure it is fully initialized before subsequent commands are executed.

Suggested change
bin/kafka-server-start.sh config/kraft/reconfig-server.properties &
bin/kafka-server-start.sh config/kraft/reconfig-server.properties &
# Wait for Kafka server to be ready
while ! bin/kafka-topics.sh --list --bootstrap-server localhost:9092; do
echo "Waiting for Kafka to be ready..."
sleep 5
done

Copilot uses AI. Check for mistakes.

bin/kafka-topics.sh --create --topic kafka-test --bootstrap-server localhost:9092
../tests/hqps/kafka_test localhost:9092 kafka-test
bin/kafka-topics.sh --delete --topic kafka-test --bootstrap-server localhost:9092
bin/kafka-topics.sh --create --topic kafka-test --bootstrap-server localhost:9092
../tests/hqps/kafka_wal_ingester_test .. localhost:9092 kafka-test
bin/kafka-topics.sh --delete --topic kafka-test --bootstrap-server localhost:9092
ps aux | grep kafka | grep -v grep | awk '{print $2}' | xargs kill -9

Comment on lines +390 to +391
Copy link
Preview

Copilot AI Mar 31, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The process termination command may kill unrelated processes. Consider refining the filter to specifically target the Kafka process started by the workflow.

Suggested change
ps aux | grep kafka | grep -v grep | awk '{print $2}' | xargs kill -9
kill -9 $(cat kafka_pid.txt)

Copilot uses AI. Check for mistakes.

- name: Run Gremlin test on modern graph
env:
GS_TEST_DIR: ${{ github.workspace }}/gstest
Expand Down
37 changes: 37 additions & 0 deletions docs/flex/interactive/development/dev_and_test.md
Original file line number Diff line number Diff line change
Expand Up @@ -262,3 +262,40 @@ In Interactive's execution engine, transactions such as `ReadTransaction`, `Upda
2. If a transaction returns `false` during the `commit()` process, the error occurred prior to applying the WAL to the graph data. This type of failure could arise during the construction of the WAL or during its writing phase.

3. It is important to note that errors can still occur when replaying the WAL to the graph database. Replaying might fail due to limitations in resources or due to unforeseen bugs. **However,** any errors encountered during this stage will be handled via exceptions or may result in process failure. Currently, there is no established mechanism to handle such failures. Future improvements should focus on implementing failover strategies, potentially allowing the GraphDB to continue replaying the WAL until it succeeds.

## Persisting WAL to kafka

Kafka-based WAL storages is also provided. Follows [kafka-quick-start](https://kafka.apache.org/quickstart).

### Install kafka

```bash
wget https://dlcdn.apache.org/kafka/3.9.0/kafka_2.13-3.9.0.tgz
tar -zxf kafka_2.13-3.9.0.tgz
cd kafka_2.13-3.9.0
```

### kafka with kraft

```bash
KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"
bin/kafka-storage.sh format --standalone -t $KAFKA_CLUSTER_ID -c config/kraft/reconfig-server.properties
bin/kafka-server-start.sh config/kraft/reconfig-server.properties
```

### Create topic

```bash
bin/kafka-topics.sh --create --topic kafka-test --bootstrap-server localhost:9092
# describe the topic
bin/kafka-topics.sh --describe --topic kafka-test --bootstrap-server localhost:9092
```

### Test KafkaWalWriter and KafkaWalParser

```bash
cd flex && mkdir build
cd build && cmake .. -DBUILD_TEST=ON && make -j
./tests/hqps/kafka_test localhost:902 kafka-test
# run the kafka tst
```
17 changes: 17 additions & 0 deletions flex/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ option(USE_PTHASH "Whether to use pthash" OFF)
option(OPTIMIZE_FOR_HOST "Whether to optimize on host" ON) # Whether to build optimized code on host
option(USE_STATIC_ARROW "Whether to use static arrow" OFF) # Whether to link arrow statically, default is OFF
option(BUILD_WITH_OTEL "Whether to build with opentelemetry-cpp" OFF) # Whether to build with opentelemetry-cpp, default is OFF

option(BUILD_KAFKA_WAL_WRITER_PARSER "Whether to build kafka wal writer and wal parser" OFF) # Whether to build kafka wal writer and wal parser, default is OFF
option(BUILD_WITH_OSS "Whether to build with oss support" OFF) # Whether to build with oss support, default is OFF

#print options
Expand Down Expand Up @@ -129,6 +131,21 @@ if (NOT yaml-cpp_FOUND)
message(FATAL_ERROR "yaml-cpp not found, please install the yaml-cpp library")
endif ()

#find CppKafka-------------------------------------------------------------------
if (BUILD_KAFKA_WAL_WRITER_PARSER)
find_package(CppKafka)
if (NOT CppKafka_FOUND)
message(FATAL_ERROR "cppkafka not found, please install cppkafka library")
else()
include_directories(SYSTEM ${CppKafka_INCLUDE_DIRS})
message(STATUS "cpp kafka include dir: ${CppKafka_INCLUDE_DIRS}")
set(CppKafka_LIBRARIES CppKafka::cppkafka)
add_definitions(-DBUILD_KAFKA_WAL_WRITER_PARSER)
message(STATUS "cppkafka found")
endif ()
endif()


#find boost----------------------------------------------------------------------
find_package(Boost REQUIRED COMPONENTS system filesystem
# required by folly
Expand Down
8 changes: 8 additions & 0 deletions flex/engines/graph_db/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,19 @@ file(GLOB_RECURSE GRAPH_DB_SRC_FILES "${CMAKE_CURRENT_SOURCE_DIR}/app/*.cc"
"${CMAKE_CURRENT_SOURCE_DIR}/database/wal/*.cc"
"${CMAKE_CURRENT_SOURCE_DIR}/app/builtin/*.cc")

if (NOT BUILD_KAFKA_WAL_WRITER_PARSER)
list(REMOVE_ITEM GRAPH_DB_SRC_FILES "${CMAKE_CURRENT_SOURCE_DIR}/database/wal/kafka_wal_writer.cc"
"${CMAKE_CURRENT_SOURCE_DIR}/database/wal/kafka_wal_parser.cc")
endif()

add_library(flex_graph_db SHARED ${GRAPH_DB_SRC_FILES})

target_include_directories(flex_graph_db PUBLIC $<BUILD_INTERFACE:${CMAKE_CURRENT_BINARY_DIR}>)
target_link_libraries(flex_graph_db flex_rt_mutable_graph ${LIBGRAPELITE_LIBRARIES} ${CMAKE_THREAD_LIBS_INIT})
target_link_libraries(flex_graph_db runtime_execute)
if (BUILD_KAFKA_WAL_WRITER_PARSER)
target_link_libraries(flex_graph_db ${CppKafka_LIBRARIES})
endif()
install_flex_target(flex_graph_db)

install(FILES ${CMAKE_CURRENT_SOURCE_DIR}/database/graph_db.h
Expand Down
122 changes: 122 additions & 0 deletions flex/engines/graph_db/app/kafka_wal_ingester_app.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
/** Copyright 2020 Alibaba Group Holding Limited.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "flex/engines/graph_db/app/kafka_wal_ingester_app.h"
#include "cppkafka/cppkafka.h"
#include "flex/engines/graph_db/database/graph_db.h"
#include "flex/engines/graph_db/database/wal/kafka_wal_parser.h"

namespace gs {
#ifdef BUILD_KAFKA_WAL_WRITER_PARSER

class KafkaWalConsumer {
public:
struct CustomComparator {
inline bool operator()(const std::string& lhs, const std::string& rhs) {
const WalHeader* header1 = reinterpret_cast<const WalHeader*>(lhs.data());
const WalHeader* header2 = reinterpret_cast<const WalHeader*>(rhs.data());
return header1->timestamp > header2->timestamp;
}
};
static constexpr const std::chrono::milliseconds POLL_TIMEOUT =
std::chrono::milliseconds(100);

// always track all partitions and from begining
KafkaWalConsumer(cppkafka::Configuration config,
const std::string& topic_name, int32_t thread_num) {
auto topic_partitions = get_all_topic_partitions(config, topic_name, false);
consumers_.reserve(topic_partitions.size());
for (size_t i = 0; i < topic_partitions.size(); ++i) {
consumers_.emplace_back(std::make_unique<cppkafka::Consumer>(config));
consumers_.back()->assign({topic_partitions[i]});
}
}

std::string poll() {
for (auto& consumer : consumers_) {
auto msg = consumer->poll();
if (msg) {
if (msg.get_error()) {
if (!msg.is_eof()) {
LOG(INFO) << "[+] Received error notification: " << msg.get_error();
}
} else {
std::string payload = msg.get_payload();
message_queue_.push(payload);
consumer->commit(msg);
}
}
}
if (message_queue_.empty()) {
return "";
}
std::string payload = message_queue_.top();
message_queue_.pop();
return payload;
}

private:
std::vector<std::unique_ptr<cppkafka::Consumer>> consumers_;
std::priority_queue<std::string, std::vector<std::string>, CustomComparator>
message_queue_;
};

bool KafkaWalIngesterApp::Query(GraphDBSession& graph, Decoder& input,
Encoder& output) {
cppkafka::Configuration config;
std::string topic_name;
while (!input.empty()) {
auto key = input.get_string();
auto value = input.get_string();
if (key == "topic_name") {
topic_name = value;
} else {
config.set(std::string(key), std::string(value));
}
}
LOG(INFO) << "Kafka brokers: " << config.get("metadata.broker.list");

gs::KafkaWalConsumer consumer(config, topic_name, 1);
// TODO: how to make it stop
while (graph.db().kafka_wal_ingester_state()) {
auto res = consumer.poll();
if (res.empty()) {
std::this_thread::sleep_for(gs::KafkaWalConsumer::POLL_TIMEOUT);
continue;
}
auto header = reinterpret_cast<const WalHeader*>(res.data());
if (header->type == 0) {
auto txn = graph.GetInsertTransaction();
txn.IngestWal(graph.graph(), txn.timestamp(),
const_cast<char*>(res.data()) + sizeof(WalHeader),
header->length, txn.allocator());
txn.Commit();
} else if (header->type == 1) {
auto txn = graph.GetUpdateTransaction();
txn.IngestWal(graph.graph(), graph.db().work_dir(), txn.timestamp(),
const_cast<char*>(res.data()) + sizeof(WalHeader),
header->length, txn.allocator());
txn.Commit();
} else {
LOG(ERROR) << "Unknown wal type: " << header->type;
}
}
return true;
}
AppWrapper KafkaWalIngesterAppFactory::CreateApp(const GraphDB& db) {
return AppWrapper(new KafkaWalIngesterApp(), NULL);
}
#endif
} // namespace gs
45 changes: 45 additions & 0 deletions flex/engines/graph_db/app/kafka_wal_ingester_app.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/** Copyright 2020 Alibaba Group Holding Limited.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#ifndef ENGINES_KAFKA_WAL_INGESTER_APP_H_
#define ENGINES_KAFKA_WAL_INGESTER_APP_H_

#include "flex/engines/graph_db/app/app_base.h"
#include "flex/engines/graph_db/database/graph_db_session.h"

namespace gs {
#ifdef BUILD_KAFKA_WAL_WRITER_PARSER
// Ingest wal from kafka
class KafkaWalIngesterApp : public WriteAppBase {
public:
KafkaWalIngesterApp() {}

AppType type() const override { return AppType::kBuiltIn; }

bool Query(GraphDBSession& graph, Decoder& input, Encoder& output) override;
};

class KafkaWalIngesterAppFactory : public AppFactoryBase {
public:
KafkaWalIngesterAppFactory() = default;
~KafkaWalIngesterAppFactory() = default;

AppWrapper CreateApp(const GraphDB& db) override;
};
#endif

} // namespace gs

#endif // ENGINES_KAFKA_WAL_INGESTER_APP_H_
55 changes: 54 additions & 1 deletion flex/engines/graph_db/database/graph_db.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@
#include "flex/engines/graph_db/runtime/utils/cypher_runner_impl.h"
#include "flex/utils/yaml_utils.h"

#ifdef BUILD_KAFKA_WAL_WRITER_PARSER
#include "flex/engines/graph_db/app/kafka_wal_ingester_app.h"
#endif

#include "flex/third_party/httplib.h"

namespace gs {
Expand All @@ -52,7 +56,8 @@ struct SessionLocalContext {
char _padding2[4096 - sizeof(GraphDBSession) % 4096];
};

GraphDB::GraphDB() = default;
GraphDB::GraphDB()
: monitor_thread_running_(false), compact_thread_running_(false) {}
GraphDB::~GraphDB() {
if (compact_thread_running_) {
compact_thread_running_ = false;
Expand Down Expand Up @@ -274,6 +279,54 @@ void GraphDB::Close() {
std::fill(app_factories_.begin(), app_factories_.end(), nullptr);
}

#ifdef BUILD_KAFKA_WAL_WRITER_PARSER
bool GraphDB::kafka_wal_ingester_state() const {
return kafka_wal_ingester_thread_running_.load();
}

void GraphDB::start_kafka_wal_ingester(const cppkafka::Configuration& config,
const std::string& topic_name) {
if (kafka_wal_ingester_thread_running_) {
kafka_wal_ingester_thread_running_ = false;
if (kafka_wal_ingester_thread_.joinable()) {
kafka_wal_ingester_thread_.join();
}
}
kafka_wal_ingester_thread_running_ = true;

kafka_wal_ingester_thread_ = std::thread([&]() {
std::vector<char> buffer;
gs::Encoder encoder(buffer);
encoder.put_string("topic_name");
encoder.put_string(topic_name);
if (config.has_property("metadata.broker.list")) {
encoder.put_string("metadata.broker.list");
encoder.put_string(config.get("metadata.broker.list"));
}
if (config.has_property("group.id")) {
encoder.put_string("group.id");
encoder.put_string(config.get("group.id"));
}
if (config.has_property("enable.auto.commit")) {
encoder.put_string("enable.auto.commit");
encoder.put_string(config.get("enable.auto.commit"));
}
if (config.has_property("auto.offset.reset")) {
encoder.put_string("auto.offset.reset");
encoder.put_string(config.get("auto.offset.reset"));
}
gs::Decoder decoder(buffer.data(), buffer.size());
KafkaWalIngesterApp().Query(GetSession(0), decoder, encoder);
});
}

void GraphDB::stop_kafka_wal_ingester() {
kafka_wal_ingester_thread_running_ = false;
kafka_wal_ingester_thread_.join();
}

#endif

ReadTransaction GraphDB::GetReadTransaction(int thread_id) {
return contexts_[thread_id].session.GetReadTransaction();
}
Expand Down
Loading
Loading