Skip to content

Dump fulltext index #2336

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 11 commits into from
Dec 7, 2024
2 changes: 2 additions & 0 deletions conf/pytest_parallel_infinity_conf.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ log_level = "trace"

[storage]
persistence_dir = "/var/infinity/persistence"
compact_interval = "10s"
cleanup_interval = "0s"

[buffer]
buffer_manager_size = "8GB"
Expand Down
110 changes: 109 additions & 1 deletion python/restart_test/test_memidx.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,9 +227,117 @@ def check():

part3()

def test_mem_ivf_recover(self, infinity_runner : InfinityRunner):
def test_mem_indexer(self, infinity_runner : InfinityRunner):
config1 = "test/data/config/restart_test/test_memidx/1.toml"
config2 = "test/data/config/restart_test/test_memidx/2.toml"
config3 = "test/data/config/restart_test/test_memidx/3.toml"
uri = common_values.TEST_LOCAL_HOST
infinity_runner.clear()

decorator1 = infinity_runner_decorator_factory(config1, uri, infinity_runner)

@decorator1
def part1(infinity_obj):
db_obj = infinity_obj.get_database("default_db")
table_obj = db_obj.create_table(
"test_mem_indexer",
{"c1" : {"type" : "int"}, "c2": {"type": "varchar"}},
)
res = table_obj.create_index(
"idx1",
index.IndexInfo(
"c2",
index.IndexType.FullText,
),
)
assert res.error_code == infinity.ErrorCode.OK

table_obj.insert([
{"c1" : 1, "c2" : "this is a test text"},
{"c1" : 2, "c2" : "this is not a test text"},
])
# trigger the dump in 3rd record
table_obj.insert([
{"c1" : 3, "c2" : "this is indeed a test text"},
])
table_obj.insert([
{"c1" : 4, "c2" : "this is definitely not a test text"},
{"c1" : 5, "c2" : "this is nothing but a test text"},
])

part1()

# config1 can hold 2 rows of identical fulltext mem index before dump
# 1. recover by dumpindex wal & memindex recovery
decorator2 = infinity_runner_decorator_factory(config2, uri, infinity_runner)

@decorator2
def part2(infinity_obj):
time.sleep(5)
db_obj = infinity_obj.get_database("default_db")
table_obj = db_obj.get_table("test_mem_indexer")
data_dict, data_type_dict = table_obj.output(["count(*)"]).to_result()
# print(data_dict)
assert data_dict["count(star)"] == [5]

data_dict, data_type_dict = (
table_obj.output(["c1"])
.match_text('c2', 'test text', 3)
.to_result()
)
# print(data_dict["c1"])
assert data_dict["c1"] == [1, 2, 3]

data_dict, data_type_dict = table_obj.output(["count(*)"]).to_result()
# print(data_dict)
assert data_dict["count(star)"] == [5]

# the 2nd dump
table_obj.insert([
{"c1" : 6, "c2" : "this is the exact opposite of a test text"},
])
time.sleep(5)
table_obj.insert([
{"c1" : 7, "c2" : "what is this?"},
{"c1" : 8, "c2" : "this is what?"},
{"c1" : 9, "c2" : "not a test text!"},
{"c1" : 10, "c2" : "what a this?"},
{"c1" : 11, "c2" : "this is you!"},
])

part2()

# 2. recover by delta ckp & dumpindex wal & memindex recovery
decorator3 = infinity_runner_decorator_factory(config3, uri, infinity_runner)

@decorator3
def part3(infinity_obj):
time.sleep(5)
db_obj = infinity_obj.get_database("default_db")
table_obj = db_obj.get_table("test_mem_indexer")

def check(rows):
data_dict, data_type_dict = (
table_obj.output(["c1"])
.match_text('c2', 'this what', 3)
.to_result()
)
# print(data_dict["c1"])
assert data_dict["c1"] == [7, 8, 10]

data_dict, data_type_dict = table_obj.output(["count(*)"]).to_result()
assert data_dict["count(star)"] == [rows]

check(11)
table_obj.insert([
{"c1" : 12, "c2" : "this is a text!"},
])
check(12)

# the 3rd dump
db_obj.drop_table("test_mem_indexer")

part3()

def test_optimize_from_different_database(self, infinity_runner: InfinityRunner):
infinity_runner.clear()
Expand Down
3 changes: 2 additions & 1 deletion src/storage/invertedindex/column_index_reader.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import segment_posting;
import index_segment_reader;
import posting_iterator;
import index_defines;
import memory_indexer;
// import memory_indexer;
import internal_types;
import segment_index_entry;
import chunk_index_entry;
Expand All @@ -32,6 +32,7 @@ namespace infinity {
struct TableEntry;
class TermDocIterator;
class Txn;
class MemoryIndexer;

export class ColumnIndexReader {
public:
Expand Down
33 changes: 22 additions & 11 deletions src/storage/invertedindex/column_inverter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ ColumnInverter::ColumnInverter(PostingWriterProvider posting_writer_provider, Ve

void ColumnInverter::InitAnalyzer(const String &analyzer_name) {
auto [analyzer, status] = AnalyzerPool::instance().GetAnalyzer(analyzer_name);
if(!status.ok()) {
if (!status.ok()) {
Status status = Status::UnexpectedError(fmt::format("Invalid analyzer: {}", analyzer_name));
RecoverableError(status);
}
Expand Down Expand Up @@ -203,11 +203,13 @@ void ColumnInverter::Sort() {
16);
}

void ColumnInverter::GeneratePosting() {
MemUsageChange ColumnInverter::GeneratePosting() {
u32 last_term_num = std::numeric_limits<u32>::max();
u32 last_doc_id = INVALID_DOCID;
StringRef last_term, term;
SharedPtr<PostingWriter> posting = nullptr;
MemUsageChange ret{true, 0};
Map<StringRef, PostingWriter *> modified_writers;
// printf("GeneratePosting() begin begin_doc_id_ %u, doc_count_ %u, merged_ %u", begin_doc_id_, doc_count_, merged_);
for (auto &i : positions_) {
if (last_term_num != i.term_num_) {
Expand All @@ -218,6 +220,9 @@ void ColumnInverter::GeneratePosting() {
}
term = GetTermFromNum(i.term_num_);
posting = posting_writer_provider_(String(term.data()));
if (modified_writers.find(term) == modified_writers.end()) {
modified_writers[term] = posting.get();
}
// printf("\nswitched-term-%d-<%s>\n", i.term_num_, term.data());
if (last_term_num != (u32)(-1)) {
assert(last_term_num < i.term_num_);
Expand All @@ -242,6 +247,12 @@ void ColumnInverter::GeneratePosting() {
// printf(" EndDocument3-%u\n", last_doc_id);
}
// printf("GeneratePosting() end begin_doc_id_ %u, doc_count_ %u, merged_ %u", begin_doc_id_, doc_count_, merged_);
for (auto kv : modified_writers) {
PostingWriter *writer = kv.second;
ret.Add(writer->GetSizeChange());
}
LOG_TRACE(fmt::format("MemUsageChange : {}, {}", ret.is_add_, ret.mem_));
return ret;
}

void ColumnInverter::SortForOfflineDump() {
Expand All @@ -258,7 +269,7 @@ void ColumnInverter::SortForOfflineDump() {
// ----------------------------------------------------------------------------------------------------------------------------+
// Data within each group

void ColumnInverter::SpillSortResults(FILE *spill_file, u64 &tuple_count, UniquePtr<BufWriter>& buf_writer) {
void ColumnInverter::SpillSortResults(FILE *spill_file, u64 &tuple_count, UniquePtr<BufWriter> &buf_writer) {
// spill sort results for external merge sort
// if (positions_.empty()) {
// return;
Expand All @@ -267,19 +278,19 @@ void ColumnInverter::SpillSortResults(FILE *spill_file, u64 &tuple_count, Unique
// size of this Run in bytes
u32 data_size = 0;
u64 data_size_pos = spill_file_tell;
buf_writer->Write((const char*)&data_size, sizeof(u32));
buf_writer->Write((const char *)&data_size, sizeof(u32));
spill_file_tell += sizeof(u32);

// number of tuples
u32 num_of_tuples = positions_.size();
tuple_count += num_of_tuples;
buf_writer->Write((const char*)&num_of_tuples, sizeof(u32));
buf_writer->Write((const char *)&num_of_tuples, sizeof(u32));
spill_file_tell += sizeof(u32);

// start offset for next spill
u64 next_start_offset = 0;
u64 next_start_offset_pos = spill_file_tell;
buf_writer->Write((const char*)&next_start_offset, sizeof(u64));
buf_writer->Write((const char *)&next_start_offset, sizeof(u64));
spill_file_tell += sizeof(u64);

u64 data_start_offset = spill_file_tell;
Expand All @@ -295,11 +306,11 @@ void ColumnInverter::SpillSortResults(FILE *spill_file, u64 &tuple_count, Unique
}
record_length = term.size() + sizeof(docid_t) + sizeof(u32) + 1;

buf_writer->Write((const char*)&record_length, sizeof(u32));
buf_writer->Write((const char *)&record_length, sizeof(u32));
buf_writer->Write(term.data(), term.size());
buf_writer->Write((const char*)&str_null, sizeof(char));
buf_writer->Write((const char*)&(i.doc_id_), sizeof(docid_t));
buf_writer->Write((const char*)&(i.term_pos_), sizeof(u32));
buf_writer->Write((const char *)&str_null, sizeof(char));
buf_writer->Write((const char *)&(i.doc_id_), sizeof(docid_t));
buf_writer->Write((const char *)&(i.term_pos_), sizeof(u32));
}
buf_writer->Flush();
// update data size
Expand All @@ -312,4 +323,4 @@ void ColumnInverter::SpillSortResults(FILE *spill_file, u64 &tuple_count, Unique
fseek(spill_file, next_start_offset, SEEK_SET);
}

} // namespace infinity
} // namespace infinity
5 changes: 3 additions & 2 deletions src/storage/invertedindex/column_inverter.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import internal_types;
import posting_writer;
import vector_with_lock;
import buf_writer;
import mem_usage_change;

namespace infinity {

Expand All @@ -52,7 +53,7 @@ public:

void Sort();

void GeneratePosting();
MemUsageChange GeneratePosting();

u32 GetDocCount() { return doc_count_; }

Expand All @@ -74,7 +75,7 @@ public:
}
};

void SpillSortResults(FILE *spill_file, u64 &tuple_count, UniquePtr<BufWriter>& buf_writer);
void SpillSortResults(FILE *spill_file, u64 &tuple_count, UniquePtr<BufWriter> &buf_writer);

private:
using TermBuffer = Vector<char>;
Expand Down
4 changes: 3 additions & 1 deletion src/storage/invertedindex/format/doc_list_encoder.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ public:

PostingByteSlice *GetDocListBuffer() { return &doc_list_buffer_; }

inline SizeT GetSizeInBytes() const { return doc_list_buffer_.GetSizeInBytes() + doc_skiplist_writer_->GetSizeInBytes(); }

private:
void AddDocument(docid_t doc_id, docpayload_t doc_payload, tf_t tf, u32 doc_len);

Expand Down Expand Up @@ -78,4 +80,4 @@ private:
friend class InMemDocListDecoderTest;
};

} // namespace infinity
} // namespace infinity
11 changes: 6 additions & 5 deletions src/storage/invertedindex/format/position_list_encoder.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,7 @@ namespace infinity {

export class PositionListEncoder {
public:
PositionListEncoder(const PostingFormatOption &format_option,
const PositionListFormat *pos_list_format = nullptr);
PositionListEncoder(const PostingFormatOption &format_option, const PositionListFormat *pos_list_format = nullptr);

~PositionListEncoder();

Expand All @@ -38,17 +37,19 @@ public:

const PositionListFormat *GetPositionListFormat() const { return pos_list_format_; }

inline SizeT GetSizeInBytes() const { return pos_list_buffer_.GetSizeInBytes() + pos_skiplist_writer_->GetSizeInBytes(); }

private:
void CreatePosSkipListWriter();
void AddPosSkipListItem(u32 total_pos_count, u32 compressed_pos_size, bool need_flush);
void FlushPositionBuffer();

private:
PostingByteSlice pos_list_buffer_;
pos_t last_pos_in_cur_doc_; // 4byte
u32 total_pos_count_; // 4byte
pos_t last_pos_in_cur_doc_; // 4byte
u32 total_pos_count_; // 4byte
PostingFormatOption format_option_;
bool is_own_format_; // 1byte
bool is_own_format_; // 1byte
UniquePtr<SkipListWriter> pos_skiplist_writer_;
const PositionListFormat *pos_list_format_;
};
Expand Down
2 changes: 2 additions & 0 deletions src/storage/invertedindex/format/posting_buffer.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ public:

u8 Size() const { return size_; }

inline SizeT GetSizeInBytes() const { return capacity_ * posting_fields_->GetTotalSize(); }

u8 GetRowCount() const { return posting_fields_->GetSize(); }

template <typename T>
Expand Down
4 changes: 3 additions & 1 deletion src/storage/invertedindex/format/posting_byte_slice.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ public:

SizeT EstimateDumpSize() const { return posting_writer_.GetSize(); }

inline SizeT GetSizeInBytes() const { return buffer_.GetSizeInBytes() + posting_writer_.GetSize(); }

protected:
SizeT DoFlush();

Expand All @@ -71,4 +73,4 @@ inline void PostingByteSlice::PushBack(u8 row, T value) {
buffer_.PushBack(row, value);
}

} // namespace infinity
} // namespace infinity
Loading