Skip to content

feat(interactive): Implement service register and primary election #4589

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 119 additions & 0 deletions .github/workflows/interactive.yml
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,125 @@ jobs:
export ENGINE_TYPE=hiactor
bash hqps_adhoc_test.sh ${INTERACTIVE_WORKSPACE} modern_graph RBO gremlin

test-service-registry:
runs-on: ubuntu-20.04
needs: build-interactive
if: ${{ github.repository == 'alibaba/GraphScope' }}
container:
image: registry.cn-hongkong.aliyuncs.com/graphscope/graphscope-dev:v0.24.2-amd64
steps:
- uses: actions/checkout@v4

- uses: actions/cache@v4
with:
path: ~/.m2/repository
key: ${{ runner.os }}-maven-${{ hashFiles('**/pom.xml') }}
restore-keys: |
${{ runner.os }}-maven-

- uses: actions/cache@v4
with:
path: |
~/.cargo/bin/
~/.cargo/registry/index/
~/.cargo/registry/cache/
~/.cargo/git/db/
~/.cache/sccache
key: ${{ runner.os }}-cargo-${{ hashFiles('**/Cargo.lock') }}

- name: Install latest libgrape-lite
if: false
run: |
git clone --single-branch https://github.com/alibaba/libgrape-lite.git /tmp/libgrape-lite
cd /tmp/libgrape-lite
mkdir -p build && cd build
cmake ..
make -j$(nproc)
make install

- name: Download Artifacts
uses: actions/download-artifact@v4
with:
name: interactive_build-${{ github.sha }}

- name: Setup tmate session
if: false
uses: mxschmitt/action-tmate@v3

- name: Extract build artifacts
run: |
cd ${GITHUB_WORKSPACE}
tar zxf build.tar.gz -C flex && rm build.tar.gz

- name: Rebuild with ENABLE_SERVICE_REGISTER=ON
run: |
cd ${GITHUB_WORKSPACE}/flex
git submodule update --init
mkdir build && cd build
cmake .. -DENABLE_SERVICE_REGISTER=ON
sudo make -j4

- name: Test service registry
env:
FLEX_DATA_DIR: ${{ github.workspace }}/flex/interactive/examples/modern_graph
TMP_INTERACTIVE_WORKSPACE: /tmp/temp_workspace
run: |
rm -rf ${TMP_INTERACTIVE_WORKSPACE}
cd ${GITHUB_WORKSPACE}/flex/build/
SCHEMA_FILE=${GITHUB_WORKSPACE}/flex/interactive/examples/modern_graph/graph.yaml
BULK_LOAD_FILE=${GITHUB_WORKSPACE}/flex/interactive/examples/modern_graph/bulk_load.yaml
mkdir -p ${TMP_INTERACTIVE_WORKSPACE}/data/modern_graph/
cp ${SCHEMA_FILE} ${TMP_INTERACTIVE_WORKSPACE}/data/modern_graph/graph.yaml
GLOG_v=10 ./bin/bulk_loader -g ${SCHEMA_FILE} -l ${BULK_LOAD_FILE} -d ${TMP_INTERACTIVE_WORKSPACE}/data/modern_graph/indices/

# download etcd
ETCD_VER=v3.4.13
# choose either URL
GOOGLE_URL=https://storage.googleapis.com/etcd
GITHUB_URL=https://github.com/etcd-io/etcd/releases/download
DOWNLOAD_URL=${GOOGLE_URL}
rm -f /tmp/etcd-${ETCD_VER}-linux-amd64.tar.gz
rm -rf /tmp/etcd-download-test && mkdir -p /tmp/etcd-download-test
curl -L ${DOWNLOAD_URL}/${ETCD_VER}/etcd-${ETCD_VER}-linux-amd64.tar.gz -o /tmp/etcd-${ETCD_VER}-linux-amd64.tar.gz
tar xzvf /tmp/etcd-${ETCD_VER}-linux-amd64.tar.gz -C /tmp/etcd-download-test --strip-components=1
rm -f /tmp/etcd-${ETCD_VER}-linux-amd64.tar.gz
cd /tmp/etcd-download-test

./etcd &

cd ${GITHUB_WORKSPACE}/flex/tests/hqps
cat >> ./interactive_config_test.yaml <<EOF
master:
service_registry:
type: etcd
endpoint: http://localhost:2379
ttl: 5
EOF

GLOG_v=10 ${GITHUB_WORKSPACE}/flex/build/bin/interactive_server -c ./interactive_config_test.yaml \
-g ${TMP_INTERACTIVE_WORKSPACE}/data/modern_graph/graph.yaml --data-path ${TMP_INTERACTIVE_WORKSPACE}/data/modern_graph/indices/ \
-w ${TMP_INTERACTIVE_WORKSPACE} &
sleep 5

cd /tmp/etcd-download-test
# get the result of /default/interactive/service/modern_graph/primary
primary=$(./etcdctl get --prefix /default/interactive/service/modern_graph/primary)
if [ -z "$primary" ]; then
echo "Failed to get primary from etcd"
exit 1
fi
instance_list=$(./etcdctl get --prefix /default/interactive/service/modern_graph/instance_list)
# check cypher and 7687 is in the instance list
(echo $instance_list | grep cypher | grep 7687) || exit 1
(echo $instance_list | grep procedure | grep 10000) || exit 1

- name: Clean up
run: |
pkill -f etcd
pkill -f interactive_server
rm -rf /tmp/etcd-download-test


test-build-flex:
runs-on: ubuntu-22.04

Expand Down
6 changes: 6 additions & 0 deletions .gitmodules
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,9 @@
[submodule "flex/third_party/aliyun-oss-cpp-sdk"]
path = flex/third_party/aliyun-oss-cpp-sdk
url = https://github.com/aliyun/aliyun-oss-cpp-sdk.git
[submodule "flex/third_party/etcd-cpp-apiv3"]
path = flex/third_party/etcd-cpp-apiv3
url = https://github.com/etcd-cpp-apiv3/etcd-cpp-apiv3.git
[submodule "flex/third_party/cpprestsdk"]
path = flex/third_party/cpprestsdk
url = https://github.com/microsoft/cpprestsdk.git
26 changes: 26 additions & 0 deletions flex/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ option(OPTIMIZE_FOR_HOST "Whether to optimize on host" ON) # Whether to build op
option(USE_STATIC_ARROW "Whether to use static arrow" OFF) # Whether to link arrow statically, default is OFF
option(BUILD_WITH_OTEL "Whether to build with opentelemetry-cpp" OFF) # Whether to build with opentelemetry-cpp, default is OFF
option(BUILD_WITH_OSS "Whether to build with oss support" OFF) # Whether to build with oss support, default is OFF
option(ENABLE_SERVICE_REGISTER "Whether to enable service register" OFF) # Whether to enable service register, default is OFF

#print options
message(STATUS "Build test: ${BUILD_TEST}")
Expand All @@ -31,6 +32,7 @@ message(STATUS "Use pthash indexer : ${USE_PTHASH}")

include(CheckLibraryExists)
include(GNUInstallDirs)
include(CheckCXXCompilerFlag)
set(CMAKE_EXPORT_COMPILE_COMMANDS ON)
include_directories(${CMAKE_CURRENT_SOURCE_DIR})

Expand Down Expand Up @@ -95,6 +97,12 @@ endif()
set(CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} -O0 -g")
set(CMAKE_CXX_FLAGS_RELEASE "${CMAKE_CXX_FLAGS_RELEASE} -O3")

# if compiler support -Wno-format-truncation, add it
CHECK_CXX_COMPILER_FLAG("-Wno-format-truncation" COMPILER_SUPPORTS_WNO_FORMAT_TRUNCATION)
if(COMPILER_SUPPORTS_WNO_FORMAT_TRUNCATION)
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-format-truncation")
endif()

find_package(MPI REQUIRED)
include_directories(SYSTEM ${MPI_CXX_INCLUDE_PATH})

Expand Down Expand Up @@ -200,6 +208,24 @@ if (BUILD_DOC)
endif(DOXYGEN_FOUND)
endif()

if (ENABLE_SERVICE_REGISTER)
# check whether submodule etcd-cpp-apiv3 exists
if (NOT EXISTS ${CMAKE_CURRENT_SOURCE_DIR}/third_party/etcd-cpp-apiv3/CMakeLists.txt)
message(FATAL_ERROR "etcd-cpp-apiv3 submodule not found, please run `git submodule update --init --recursive` to fetch the submodule")
endif()
include("cmake/BuildEtcdCpp.cmake")
include_directories(SYSTEM ${CPPREST_INCLUDE_DIR})
message(STATUS "Include directory: ${CPPREST_INCLUDE_DIR}")
foreach(dir ${ETCD_CPP_INCLUDE_DIR})
message(STATUS "Include directory: ${dir}")
include_directories(SYSTEM ${dir})
endforeach()
endif()

if (ENABLE_SERVICE_REGISTER)
add_definitions(-DENABLE_SERVICE_REGISTER)
endif()

find_package(RapidJSON REQUIRED)
if (DEFINED RapidJSON_INCLUDE_DIRS) # rapidjson > 1.1.0
include_directories(${RapidJSON_INCLUDE_DIRS})
Expand Down
35 changes: 35 additions & 0 deletions flex/cmake/BuildEtcdCpp.cmake
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# Copyright 2020-2023 Alibaba Group Holding Limited.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# This File is copied from https://github.com/v6d-io/v6d/blob/main/cmake/BuildEtcdCpp.cmake
# build cpprestsdk
set(WERROR OFF CACHE BOOL "Treat warnings as errors")
set(BUILD_TESTS OFF CACHE BOOL "Build tests.")
set(BUILD_SAMPLES OFF CACHE BOOL "Build sample applications.")
set(CPPREST_EXCLUDE_WEBSOCKETS ON CACHE BOOL "Exclude websockets functionality..")
add_subdirectory(third_party/cpprestsdk)
set(CPPREST_INCLUDE_DIR ${PROJECT_SOURCE_DIR}/third_party/cpprestsdk/Release/include)
set(CPPREST_LIB cpprest)

# disable a warning message inside cpprestsdk on Mac with llvm/clang
if(W_NO_UNUSED_BUT_SET_PARAMETER)
target_compile_options(cpprest PRIVATE -Wno-unused-but-set-parameter)
endif()

# build etcd-cpp-apiv3
add_subdirectory(third_party/etcd-cpp-apiv3)
set(ETCD_CPP_LIBRARIES etcd-cpp-api)
set(ETCD_CPP_INCLUDE_DIR ${PROJECT_SOURCE_DIR}/third_party/etcd-cpp-apiv3/
${PROJECT_BINARY_DIR}/third_party/etcd-cpp-apiv3/proto/gen
${PROJECT_BINARY_DIR}/third_party/etcd-cpp-apiv3/proto/gen/proto)
21 changes: 19 additions & 2 deletions flex/engines/http_server/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,10 +1,18 @@
find_package (Hiactor)
if (Hiactor_FOUND)
include (${Hiactor_CODEGEN_CMAKE_FILE})

# the included_paths depends on ENABLE_SERVICE_REGISTER
set (server_actor_autogen_include_paths ${Hiactor_INCLUDE_DIR},${CMAKE_CURRENT_SOURCE_DIR}/../../../,${CMAKE_CURRENT_BINARY_DIR}/../../utils/)
if (ENABLE_SERVICE_REGISTER)
# append
set (server_actor_autogen_include_paths ${server_actor_autogen_include_paths},${CPPREST_INCLUDE_DIR},${ETCD_CPP_INCLUDE_DIR})
endif ()
if (BUILD_WITH_OSS)
set (server_actor_autogen_include_paths ${server_actor_autogen_include_paths},${OSS_CPP_SDK_INCLUDE_DIR})
endif ()
hiactor_codegen (server_actor_autogen server_actor_autogen_files
SOURCE_DIR ${CMAKE_CURRENT_SOURCE_DIR}/
INCLUDE_PATHS ${Hiactor_INCLUDE_DIR},${CMAKE_CURRENT_SOURCE_DIR}/../../../,${CMAKE_CURRENT_BINARY_DIR}/../../utils/)
INCLUDE_PATHS ${server_actor_autogen_include_paths})


# get all .cc files in current directory, except for generated/
Expand All @@ -14,6 +22,12 @@ if (Hiactor_FOUND)
add_library(flex_server STATIC ${SERVER_FILES} ${server_actor_autogen_files})
add_dependencies(flex_server flex_utils) # Make sure flex_utils is built before flex_server
add_dependencies(flex_server server_actor_autogen)
if (BUILD_WITH_OSS)
add_dependencies(server_actor_autogen cpp-sdk)
endif()
if (ENABLE_SERVICE_REGISTER)
add_dependencies(server_actor_autogen ${ETCD_CPP_LIBRARIES} ${CPPREST_LIB})
endif()
target_compile_options (flex_server
PUBLIC
-Wno-attributes)
Expand All @@ -38,5 +52,8 @@ if (Hiactor_FOUND)
target_link_libraries(flex_server otel)
endif()
target_link_libraries(flex_server flex_metadata_store)
if (ENABLE_SERVICE_REGISTER)
target_link_libraries(flex_server ${ETCD_CPP_LIBRARIES})
endif()
install_without_export_flex_target(flex_server)
endif ()
4 changes: 2 additions & 2 deletions flex/engines/http_server/actor/admin_actor.act.cc
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,7 @@ seastar::future<admin_query_result> admin_actor::run_create_graph(
// query_param is the graph name
seastar::future<admin_query_result> admin_actor::run_get_graph_schema(
query_param&& query_param) {
LOG(INFO) << "Get Graph schema for graph_id: " << query_param.content;
VLOG(10) << "Get Graph schema for graph_id: " << query_param.content;
auto schema_res = metadata_store_->GetGraphMeta(query_param.content);

if (schema_res.ok()) {
Expand All @@ -395,7 +395,7 @@ seastar::future<admin_query_result> admin_actor::run_get_graph_schema(
// Get the metadata of a graph.
seastar::future<admin_query_result> admin_actor::run_get_graph_meta(
query_param&& query_param) {
LOG(INFO) << "Get Graph meta for graph_id: " << query_param.content;
VLOG(10) << "Get Graph meta for graph_id: " << query_param.content;
auto meta_res = metadata_store_->GetGraphMeta(query_param.content);

if (meta_res.ok()) {
Expand Down
71 changes: 70 additions & 1 deletion flex/engines/http_server/graph_db_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@ bool check_port_occupied(uint16_t port) {
}

ServiceConfig::ServiceConfig()
: bolt_port(DEFAULT_BOLT_PORT),
: namespace_(DEFAULT_NAMESPACE),
master_instance_name(DEFAULT_INSTANCE_NAME),
bolt_port(DEFAULT_BOLT_PORT),
admin_port(DEFAULT_ADMIN_PORT),
query_port(DEFAULT_QUERY_PORT),
shard_num(DEFAULT_SHARD_NUM),
Expand Down Expand Up @@ -127,6 +129,16 @@ void GraphDBService::init(const ServiceConfig& config) {
config.admin_port, config.get_exclusive_shard_id(),
config.admin_svc_max_content_length);
}
#ifdef ENABLE_SERVICE_REGISTER
LOG(INFO) << "Service registry endpoint: "
<< config.service_registry_endpoint;
if (!config.service_registry_endpoint.empty()) {
service_register_ = std::make_unique<ServiceRegister>(
config.service_registry_endpoint, config.namespace_,
config.master_instance_name, [this]() { return get_service_info(); },
config.service_registry_ttl);
}
#endif

initialized_.store(true);
service_config_ = config;
Expand Down Expand Up @@ -207,6 +219,11 @@ GraphDBService::~GraphDBService() {
if (metadata_store_) {
metadata_store_->Close();
}
#ifdef ENABLE_SERVICE_REGISTER
if (service_register_) {
service_register_->Stop();
}
#endif
}

const ServiceConfig& GraphDBService::get_service_config() const {
Expand Down Expand Up @@ -264,6 +281,14 @@ void GraphDBService::run_and_wait_for_exit() {
if (admin_hdl_) {
admin_hdl_->start();
}
#ifdef ENABLE_SERVICE_REGISTER
if (service_register_) {
LOG(INFO) << "Start service register thread";
service_register_->Start();
} else {
LOG(INFO) << "Service register is not started!";
}
#endif
if (service_config_.start_compiler) {
if (!start_compiler_subprocess()) {
LOG(FATAL) << "Failed to start compiler subprocess! exiting...";
Expand All @@ -278,6 +303,11 @@ void GraphDBService::run_and_wait_for_exit() {
if (admin_hdl_) {
admin_hdl_->stop();
}
#ifdef ENABLE_SERVICE_REGISTER
if (service_register_) {
service_register_->Stop();
}
#endif
actor_sys_->terminate();
}

Expand Down Expand Up @@ -333,6 +363,45 @@ bool GraphDBService::check_compiler_ready() const {
return true;
}

#ifdef ENABLE_SERVICE_REGISTER
std::pair<bool, AllServiceRegisterPayload> GraphDBService::get_service_info() {
auto ip = gs::get_local_ip();
AllServiceRegisterPayload payload;
if (!is_running()) {
LOG(INFO) << "Service is not running, skip service register.";
return std::make_pair(false, payload);
}

if (metadata_store_) {
auto cur_running_graph = metadata_store_->GetRunningGraph();
if (!cur_running_graph.ok()) {
LOG(ERROR) << "Failed to get running graph: "
<< cur_running_graph.status().error_message();
return std::make_pair(false, payload);
}
payload.graph_id = cur_running_graph.value();
} else {
// Try to get from current graph_db
auto& db = gs::GraphDB::get();
LOG(INFO) << "Get service info from current graph db: "
<< db.schema().GetGraphId();
payload.graph_id = db.schema().GetGraphId();
}

auto procedure_endpoint =
ip + ":" + std::to_string(service_config_.query_port);
auto cypher_endpoint = ip + ":" + std::to_string(service_config_.bolt_port);
ServiceMetrics procedure_metrics("0"); // TODO: get snapshot id
payload.services.emplace(std::make_pair(
"procedure",
ServiceRegisterPayload(procedure_endpoint, procedure_metrics)));
payload.services.emplace(std::make_pair(
"cypher", ServiceRegisterPayload(cypher_endpoint, procedure_metrics)));

return std::make_pair(true, payload);
}
#endif

bool GraphDBService::start_compiler_subprocess(
const std::string& graph_schema_path) {
if (!service_config_.start_compiler) {
Expand Down
Loading
Loading