Skip to content

Impl scan filter #1250

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Dec 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion dingo-store-proto
87 changes: 87 additions & 0 deletions src/client_v2/document_index.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,13 @@

#include <cstdint>

#include "client_v2/coordinator.h"
#include "client_v2/pretty.h"
#include "common/helper.h"
#include "fmt/format.h"

const int kBatchSize = 3;

namespace client_v2 {

void SetUpDocumentIndexSubCommands(CLI::App& app) {
Expand All @@ -28,6 +31,7 @@ void SetUpDocumentIndexSubCommands(CLI::App& app) {
SetUpDocumentBatchAdd(app);
SetUpDocumentDelete(app);
SetUpDocumentSearch(app);
SetUpDocumentSearchAll(app);
SetUpDocumentBatchQuery(app);
SetUpDocumentScanQuery(app);
SetUpDocumentGetMaxId(app);
Expand Down Expand Up @@ -324,6 +328,69 @@ void SendDocumentSearch(DocumentSearchOptions const& opt) {
Pretty::Show(response);
}

void SendDocumentSearchAll(DocumentSearchOptions const& opt) {
// dingodb::pb::document::DocumentSearchRequest request;
// dingodb::pb::document::DocumentSearchResponse response;

if (opt.query_string.empty()) {
std::cout << "query_string is empty" << std::endl;
return;
}

auto response = SendSearchAllByStreamMode(opt);
std::cout << "search all documents response:" << response.DebugString() << std::endl;
Pretty::Show(response);
}

dingodb::pb::document::DocumentSearchAllResponse SendSearchAllByStreamMode(DocumentSearchOptions const& opt) {
dingodb::pb::document::DocumentSearchAllRequest request;
dingodb::pb::document::DocumentSearchAllResponse response;
auto* parameter = request.mutable_parameter();
parameter->set_query_string(opt.query_string);
parameter->set_without_scalar_data(opt.without_scalar);
parameter->set_query_unlimited(true);
request.mutable_stream_meta()->set_limit(kBatchSize);
if (opt.doc_id > 0) {
parameter->add_document_ids(opt.doc_id);
}
*(request.mutable_context()) = RegionRouter::GetInstance().GenConext(opt.region_id);

for (;;) {
dingodb::pb::document::DocumentSearchAllResponse sub_response;
// maybe current store interaction is not store node, so need reset.
InteractionManager::GetInstance().ResetStoreInteraction();
auto status = InteractionManager::GetInstance().SendRequestWithContext("DocumentService", "DocumentSearchAll",
request, sub_response);
std::cout << "search all request: " << request.DebugString() << ", sub_response: " << sub_response.DebugString()
<< std::endl;
if (!status.ok()) {
response.mutable_error()->set_errcode(dingodb::pb::error::Errno(status.error_code()));
response.mutable_error()->set_errmsg(status.error_str());
break;
}

if (sub_response.error().errcode() != dingodb::pb::error::OK) {
*response.mutable_error() = sub_response.error();
break;
}

// set request stream id
if (!sub_response.stream_meta().stream_id().empty()) {
request.mutable_stream_meta()->set_stream_id(sub_response.stream_meta().stream_id());
}

// copy data
for (int i = 0; i < sub_response.document_with_scores_size(); ++i) {
response.add_document_with_scores()->Swap(&sub_response.mutable_document_with_scores()->at(i));
}
if (!sub_response.stream_meta().has_more()) {
break;
}
}

return response;
}

void SendDocumentBatchQuery(DocumentBatchQueryOptions const& opt) {
dingodb::pb::document::DocumentBatchQueryRequest request;
dingodb::pb::document::DocumentBatchQueryResponse response;
Expand Down Expand Up @@ -645,6 +712,26 @@ void RunDocumentSearch(DocumentSearchOptions const& opt) {
client_v2::SendDocumentSearch(opt);
}

void SetUpDocumentSearchAll(CLI::App& app) {
auto opt = std::make_shared<DocumentSearchOptions>();
auto* cmd = app.add_subcommand("DocumentSearchAll", "Document search all documents")->group("Document Commands");
cmd->add_option("--coor_url", opt->coor_url, "Coordinator url, default:file://./coor_list");
cmd->add_option("--region_id", opt->region_id, "Request parameter region id")->required();
cmd->add_option("--query_string", opt->query_string, "Request parameter query_string")->required();
cmd->add_option("--without_scalar", opt->without_scalar, "Request parameter without_scalar")
->default_val(false)
->default_str("false");
cmd->add_option("--doc_id", opt->doc_id, "Request parameter alive id");
cmd->callback([opt]() { RunDocumentSearchAll(*opt); });
}

void RunDocumentSearchAll(DocumentSearchOptions const& opt) {
if (!SetUpStore(opt.coor_url, {}, opt.region_id)) {
exit(-1);
}
client_v2::SendDocumentSearchAll(opt);
}

void SetUpDocumentBatchQuery(CLI::App& app) {
auto opt = std::make_shared<DocumentBatchQueryOptions>();
auto* cmd = app.add_subcommand("DocumentBatchQuery", "Document batch query")->group("Document Commands");
Expand Down
5 changes: 5 additions & 0 deletions src/client_v2/document_index.h
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,9 @@ struct DocumentSearchOptions {
void SetUpDocumentSearch(CLI::App &app);
void RunDocumentSearch(DocumentSearchOptions const &opt);

void SetUpDocumentSearchAll(CLI::App &app);
void RunDocumentSearchAll(DocumentSearchOptions const &opt);

struct DocumentBatchQueryOptions {
std::string coor_url;
int64_t region_id;
Expand Down Expand Up @@ -156,13 +159,15 @@ void SendDocumentAdd(DocumentAddOptions const &opt);
void SendDocumentBatchAdd(DocumentAddOptions const &opt);
void SendDocumentDelete(DocumentDeleteOptions const &opt);
void SendDocumentSearch(DocumentSearchOptions const &opt);
void SendDocumentSearchAll(DocumentSearchOptions const &opt);
void SendDocumentBatchQuery(DocumentBatchQueryOptions const &opt);
void SendDocumentGetMaxId(DocumentGetMaxIdOptions const &opt);
void SendDocumentGetMinId(DocumentGetMinIdOptions const &opt);
void SendDocumentScanQuery(DocumentScanQueryOptions const &opt);
void SendDocumentCount(DocumentCountOptions const &opt);
void SendDocumentGetRegionMetrics(DocumentGetRegionMetricsOptions const &opt);

dingodb::pb::document::DocumentSearchAllResponse SendSearchAllByStreamMode(DocumentSearchOptions const &opt);
} // namespace client_v2

#endif // DINGODB_CLIENT_DOCUMENT_INDEX_H_
24 changes: 24 additions & 0 deletions src/client_v2/pretty.cc
Original file line number Diff line number Diff line change
Expand Up @@ -922,6 +922,30 @@ void Pretty::Show(dingodb::pb::document::DocumentSearchResponse& response) {

PrintTable(rows);
}

void Pretty::Show(dingodb::pb::document::DocumentSearchAllResponse& response) {
if (ShowError(response.error())) {
return;
}
if (response.document_with_scores_size() == 0) {
std::cout << "Not search document ." << std::endl;
return;
}
std::vector<std::vector<ftxui::Element>> rows = {{
ftxui::paragraph("DocumentId"),
ftxui::paragraph("Score"),
}};
for (auto const& document_with_score : response.document_with_scores()) {
std::vector<ftxui::Element> row = {
ftxui::paragraph(fmt::format("{}", document_with_score.document_with_id().id())),
ftxui::paragraph(fmt::format("{}", document_with_score.score())),
};
rows.push_back(row);
}

PrintTable(rows);
}

void Pretty::Show(dingodb::pb::document::DocumentBatchQueryResponse& response) {
if (ShowError(response.error())) {
return;
Expand Down
1 change: 1 addition & 0 deletions src/client_v2/pretty.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ class Pretty {

static void Show(dingodb::pb::meta::CreateIndexResponse &response);
static void Show(dingodb::pb::document::DocumentSearchResponse &response);
static void Show(dingodb::pb::document::DocumentSearchAllResponse &response);
static void Show(dingodb::pb::document::DocumentBatchQueryResponse &response);
static void Show(dingodb::pb::document::DocumentGetBorderIdResponse &response);
static void Show(dingodb::pb::document::DocumentScanQueryResponse &response);
Expand Down
30 changes: 17 additions & 13 deletions src/document/document_index.cc
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,7 @@ butil::Status DocumentIndex::Delete(const std::vector<int64_t>& delete_ids) {
}

butil::Status DocumentIndex::Search(uint32_t topk, const std::string& query_string, bool use_range_filter,
int64_t start_id, int64_t end_id, bool use_id_filter,
int64_t start_id, int64_t end_id, bool use_id_filter, bool query_unlimited,
const std::vector<uint64_t>& alive_ids,
const std::vector<std::string>& column_names,
std::vector<pb::common::DocumentWithScore>& results) {
Expand All @@ -376,7 +376,7 @@ butil::Status DocumentIndex::Search(uint32_t topk, const std::string& query_stri
return butil::Status(pb::error::EILLEGAL_PARAMTETERS, err_msg);
}

if (topk == 0) {
if (!query_unlimited && topk == 0) {
return butil::Status(pb::error::EILLEGAL_PARAMTETERS, "topk must be greater than 0");
}

Expand All @@ -393,8 +393,9 @@ butil::Status DocumentIndex::Search(uint32_t topk, const std::string& query_stri
// }
// }

auto search_result = ffi_bm25_search_with_column_names(index_path_, query_string, topk, alive_ids, use_id_filter,
use_range_filter, start_id, end_id, column_names);
auto search_result =
ffi_bm25_search_with_column_names(index_path_, query_string, topk, alive_ids, use_id_filter, use_range_filter,
start_id, end_id, column_names, query_unlimited);

if (search_result.error_code == 0) {
for (const auto& row_id_with_score : search_result.result) {
Expand Down Expand Up @@ -1096,15 +1097,16 @@ butil::Status DocumentIndexWrapper::Search(const pb::common::Range& region_range
if (sibling_document_index != nullptr) {
DINGO_LOG(INFO) << fmt::format("[document_index.wrapper][id({})] search document in sibling document index.", Id());
std::vector<pb::common::DocumentWithScore> results_1;
auto status = sibling_document_index->Search(parameter.top_n(), parameter.query_string(), false, 0, INT64_MAX,
use_id_filter, alive_ids, column_names, results_1);
auto status =
sibling_document_index->Search(parameter.top_n(), parameter.query_string(), false, 0, INT64_MAX, use_id_filter,
parameter.query_unlimited(), alive_ids, column_names, results_1);
if (!status.ok()) {
return status;
}

std::vector<pb::common::DocumentWithScore> results_2;
status = document_index->Search(parameter.top_n(), parameter.query_string(), false, 0, INT64_MAX, use_id_filter,
alive_ids, column_names, results_2);
parameter.query_unlimited(), alive_ids, column_names, results_2);
if (!status.ok()) {
return status;
}
Expand All @@ -1120,21 +1122,23 @@ butil::Status DocumentIndexWrapper::Search(const pb::common::Range& region_range

DINGO_LOG(INFO) << fmt::format(
"[document_index.wrapper][id({})] search document in document index with range_filter, range({}) "
"query_string({}) top_n({}) min_document_id({}) max_document_id({})",
"query_string({}) top_n({}, query_unlimited({})) min_document_id({}) max_document_id({})",
Id(), DocumentCodec::DebugRange(false, region_range), parameter.query_string(), parameter.top_n(),
min_document_id, max_document_id);
parameter.query_unlimited(), min_document_id, max_document_id);

// use range filter
return document_index->Search(parameter.top_n(), parameter.query_string(), true, min_document_id, max_document_id,
use_id_filter, alive_ids, column_names, results);
use_id_filter, parameter.query_unlimited(), alive_ids, column_names, results);
}

DINGO_LOG(INFO) << fmt::format(
"[document_index.wrapper][id({})] search document in document index, range({}) query_string({}) top_n({})", Id(),
DocumentCodec::DebugRange(false, region_range), parameter.query_string(), parameter.top_n());
"[document_index.wrapper][id({})] search document in document index, range({}) query_string({}) top_n({}), "
"query_unlimited({})",
Id(), DocumentCodec::DebugRange(false, region_range), parameter.query_string(), parameter.top_n(),
parameter.query_unlimited());

return document_index->Search(parameter.top_n(), parameter.query_string(), false, 0, INT64_MAX, use_id_filter,
alive_ids, column_names, results);
parameter.query_unlimited(), alive_ids, column_names, results);
}

// For document index, all node need to hold the index, so this function always return true.
Expand Down
3 changes: 2 additions & 1 deletion src/document/document_index.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ class DocumentIndex {
butil::Status Load(const std::string& path);

butil::Status Search(uint32_t topk, const std::string& query_string, bool use_range_filter, int64_t start_id,
int64_t end_id, bool use_id_filter, const std::vector<uint64_t>& alive_ids,
int64_t end_id, bool use_id_filter, bool query_unlimited,
const std::vector<uint64_t>& alive_ids,
const std::vector<std::string>& column_names,
std::vector<pb::common::DocumentWithScore>& results);

Expand Down
39 changes: 39 additions & 0 deletions src/document/document_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@

namespace dingodb {

DECLARE_int64(stream_message_max_limit_size);

butil::Status DocumentReader::QueryDocumentWithId(int64_t ts, const pb::common::Range& region_range,
int64_t partition_id, int64_t document_id, bool with_scalar_data,
bool with_table_data, std::vector<std::string>& selected_scalar_keys,
Expand Down Expand Up @@ -129,6 +131,28 @@ butil::Status DocumentReader::DocumentSearch(std::shared_ptr<Engine::DocumentRea
return butil::Status();
}

butil::Status DocumentReader::DocumentSearchAll(std::shared_ptr<Engine::DocumentReader::Context> ctx, bool& has_more,
std::vector<pb::common::DocumentWithScore>& results) {
auto status = butil::Status();
auto stream = ctx->stream;
auto stream_state =
std::dynamic_pointer_cast<DocumentSearchAllStreamState>(stream->GetOrNewStreamState([&]() -> StreamStatePtr {
std::vector<pb::common::DocumentWithScore> all_results;
status = SearchDocument(ctx->ts, ctx->partition_id, ctx->document_index, ctx->region_range, ctx->parameter,
all_results);
if (!status.ok()) {
return nullptr;
}
return DocumentSearchAllStreamState::New(std::move(all_results));
}));
if (!status.ok()) {
DINGO_LOG(ERROR) << "Document search all failed: " << Helper::PrintStatus(status);
return status;
}
has_more = stream_state->Batch(stream->Limit(), results);
return status;
}

butil::Status DocumentReader::DocumentBatchQuery(std::shared_ptr<Engine::DocumentReader::Context> ctx,
std::vector<pb::common::DocumentWithId>& document_with_ids) {
for (auto document_id : ctx->document_ids) {
Expand Down Expand Up @@ -355,4 +379,19 @@ butil::Status DocumentReader::ScanDocumentId(std::shared_ptr<Engine::DocumentRea
return butil::Status::OK();
}

bool DocumentSearchAllStreamState::Batch(int32_t limit, std::vector<pb::common::DocumentWithScore>& results) {
results.reserve(limit);
size_t count = 0;
size_t total_bytes = 0;
bool has_more;
while (current_ != end_ && count < limit && total_bytes < FLAGS_stream_message_max_limit_size) {
results.push_back(*current_);
total_bytes += current_->ByteSizeLong();
++current_;
++count;
}
has_more = (current_ != end_);
return has_more;
}

} // namespace dingodb
25 changes: 25 additions & 0 deletions src/document/document_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <vector>

#include "butil/status.h"
#include "common/stream.h"
#include "engine/engine.h"
#include "engine/raw_engine.h"
#include "mvcc/reader.h"
Expand All @@ -41,6 +42,8 @@ class DocumentReader {

butil::Status DocumentSearch(std::shared_ptr<Engine::DocumentReader::Context> ctx,
std::vector<pb::common::DocumentWithScore>& results);
butil::Status DocumentSearchAll(std::shared_ptr<Engine::DocumentReader::Context> ctx, bool& has_more,
std::vector<pb::common::DocumentWithScore>& results);

butil::Status DocumentBatchQuery(std::shared_ptr<Engine::DocumentReader::Context> ctx,
std::vector<pb::common::DocumentWithId>& document_with_ids);
Expand Down Expand Up @@ -72,6 +75,28 @@ class DocumentReader {
mvcc::ReaderPtr reader_;
};

class DocumentSearchAllStreamState;
using DocumentSearchAllStreamStatePtr = std::shared_ptr<DocumentSearchAllStreamState>;

class DocumentSearchAllStreamState : public StreamState {
public:
DocumentSearchAllStreamState(std::vector<pb::common::DocumentWithScore>& vec) {
results_ = vec;
current_ = results_.begin();
end_ = results_.end();
}
~DocumentSearchAllStreamState() override = default;
bool Batch(int32_t limit, std::vector<pb::common::DocumentWithScore>& results);
static DocumentSearchAllStreamStatePtr New(std::vector<pb::common::DocumentWithScore> vec) {
return std::make_shared<DocumentSearchAllStreamState>(vec);
}

private:
std::vector<pb::common::DocumentWithScore>::iterator current_;
std::vector<pb::common::DocumentWithScore>::iterator end_;
std::vector<pb::common::DocumentWithScore> results_;
};

} // namespace dingodb

#endif // DINGODB_DOCUMENT_READER_H_
Loading
Loading