Skip to content

Commit 9d36a77

Browse files
vsianwritinwaters
andcommitted
Dump fulltext index (infiniflow#2336)
### What problem does this PR solve? Dump `MemoryIndexer` when the memory usage of the index exceeds the threshold. ### Type of change - [x] Bug Fix (non-breaking change which fixes an issue) - [x] New Feature (non-breaking change which adds functionality) - [x] Test cases --------- Co-authored-by: writinwaters <[email protected]>
1 parent c33a073 commit 9d36a77

22 files changed

+341
-55
lines changed

conf/pytest_parallel_infinity_conf.toml

+2
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@ log_level = "trace"
1616

1717
[storage]
1818
persistence_dir = "/var/infinity/persistence"
19+
compact_interval = "10s"
20+
cleanup_interval = "0s"
1921

2022
[buffer]
2123
buffer_manager_size = "8GB"

python/restart_test/test_memidx.py

+109-1
Original file line numberDiff line numberDiff line change
@@ -227,9 +227,117 @@ def check():
227227

228228
part3()
229229

230-
def test_mem_ivf_recover(self, infinity_runner : InfinityRunner):
230+
def test_mem_indexer(self, infinity_runner : InfinityRunner):
231+
config1 = "test/data/config/restart_test/test_memidx/1.toml"
232+
config2 = "test/data/config/restart_test/test_memidx/2.toml"
233+
config3 = "test/data/config/restart_test/test_memidx/3.toml"
234+
uri = common_values.TEST_LOCAL_HOST
231235
infinity_runner.clear()
232236

237+
decorator1 = infinity_runner_decorator_factory(config1, uri, infinity_runner)
238+
239+
@decorator1
240+
def part1(infinity_obj):
241+
db_obj = infinity_obj.get_database("default_db")
242+
table_obj = db_obj.create_table(
243+
"test_mem_indexer",
244+
{"c1" : {"type" : "int"}, "c2": {"type": "varchar"}},
245+
)
246+
res = table_obj.create_index(
247+
"idx1",
248+
index.IndexInfo(
249+
"c2",
250+
index.IndexType.FullText,
251+
),
252+
)
253+
assert res.error_code == infinity.ErrorCode.OK
254+
255+
table_obj.insert([
256+
{"c1" : 1, "c2" : "this is a test text"},
257+
{"c1" : 2, "c2" : "this is not a test text"},
258+
])
259+
# trigger the dump in 3rd record
260+
table_obj.insert([
261+
{"c1" : 3, "c2" : "this is indeed a test text"},
262+
])
263+
table_obj.insert([
264+
{"c1" : 4, "c2" : "this is definitely not a test text"},
265+
{"c1" : 5, "c2" : "this is nothing but a test text"},
266+
])
267+
268+
part1()
269+
270+
# config1 can hold 2 rows of identical fulltext mem index before dump
271+
# 1. recover by dumpindex wal & memindex recovery
272+
decorator2 = infinity_runner_decorator_factory(config2, uri, infinity_runner)
273+
274+
@decorator2
275+
def part2(infinity_obj):
276+
time.sleep(5)
277+
db_obj = infinity_obj.get_database("default_db")
278+
table_obj = db_obj.get_table("test_mem_indexer")
279+
data_dict, data_type_dict = table_obj.output(["count(*)"]).to_result()
280+
# print(data_dict)
281+
assert data_dict["count(star)"] == [5]
282+
283+
data_dict, data_type_dict = (
284+
table_obj.output(["c1"])
285+
.match_text('c2', 'test text', 3)
286+
.to_result()
287+
)
288+
# print(data_dict["c1"])
289+
assert data_dict["c1"] == [1, 2, 3]
290+
291+
data_dict, data_type_dict = table_obj.output(["count(*)"]).to_result()
292+
# print(data_dict)
293+
assert data_dict["count(star)"] == [5]
294+
295+
# the 2nd dump
296+
table_obj.insert([
297+
{"c1" : 6, "c2" : "this is the exact opposite of a test text"},
298+
])
299+
time.sleep(5)
300+
table_obj.insert([
301+
{"c1" : 7, "c2" : "what is this?"},
302+
{"c1" : 8, "c2" : "this is what?"},
303+
{"c1" : 9, "c2" : "not a test text!"},
304+
{"c1" : 10, "c2" : "what a this?"},
305+
{"c1" : 11, "c2" : "this is you!"},
306+
])
307+
308+
part2()
309+
310+
# 2. recover by delta ckp & dumpindex wal & memindex recovery
311+
decorator3 = infinity_runner_decorator_factory(config3, uri, infinity_runner)
312+
313+
@decorator3
314+
def part3(infinity_obj):
315+
time.sleep(5)
316+
db_obj = infinity_obj.get_database("default_db")
317+
table_obj = db_obj.get_table("test_mem_indexer")
318+
319+
def check(rows):
320+
data_dict, data_type_dict = (
321+
table_obj.output(["c1"])
322+
.match_text('c2', 'this what', 3)
323+
.to_result()
324+
)
325+
# print(data_dict["c1"])
326+
assert data_dict["c1"] == [7, 8, 10]
327+
328+
data_dict, data_type_dict = table_obj.output(["count(*)"]).to_result()
329+
assert data_dict["count(star)"] == [rows]
330+
331+
check(11)
332+
table_obj.insert([
333+
{"c1" : 12, "c2" : "this is a text!"},
334+
])
335+
check(12)
336+
337+
# the 3rd dump
338+
db_obj.drop_table("test_mem_indexer")
339+
340+
part3()
233341

234342
def test_optimize_from_different_database(self, infinity_runner: InfinityRunner):
235343
infinity_runner.clear()

src/storage/invertedindex/column_index_reader.cppm

+2-1
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import segment_posting;
2222
import index_segment_reader;
2323
import posting_iterator;
2424
import index_defines;
25-
import memory_indexer;
25+
// import memory_indexer;
2626
import internal_types;
2727
import segment_index_entry;
2828
import chunk_index_entry;
@@ -32,6 +32,7 @@ namespace infinity {
3232
struct TableEntry;
3333
class TermDocIterator;
3434
class Txn;
35+
class MemoryIndexer;
3536

3637
export class ColumnIndexReader {
3738
public:

src/storage/invertedindex/column_inverter.cpp

+22-11
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ ColumnInverter::ColumnInverter(PostingWriterProvider posting_writer_provider, Ve
5252

5353
void ColumnInverter::InitAnalyzer(const String &analyzer_name) {
5454
auto [analyzer, status] = AnalyzerPool::instance().GetAnalyzer(analyzer_name);
55-
if(!status.ok()) {
55+
if (!status.ok()) {
5656
Status status = Status::UnexpectedError(fmt::format("Invalid analyzer: {}", analyzer_name));
5757
RecoverableError(status);
5858
}
@@ -203,11 +203,13 @@ void ColumnInverter::Sort() {
203203
16);
204204
}
205205

206-
void ColumnInverter::GeneratePosting() {
206+
MemUsageChange ColumnInverter::GeneratePosting() {
207207
u32 last_term_num = std::numeric_limits<u32>::max();
208208
u32 last_doc_id = INVALID_DOCID;
209209
StringRef last_term, term;
210210
SharedPtr<PostingWriter> posting = nullptr;
211+
MemUsageChange ret{true, 0};
212+
Map<StringRef, PostingWriter *> modified_writers;
211213
// printf("GeneratePosting() begin begin_doc_id_ %u, doc_count_ %u, merged_ %u", begin_doc_id_, doc_count_, merged_);
212214
for (auto &i : positions_) {
213215
if (last_term_num != i.term_num_) {
@@ -218,6 +220,9 @@ void ColumnInverter::GeneratePosting() {
218220
}
219221
term = GetTermFromNum(i.term_num_);
220222
posting = posting_writer_provider_(String(term.data()));
223+
if (modified_writers.find(term) == modified_writers.end()) {
224+
modified_writers[term] = posting.get();
225+
}
221226
// printf("\nswitched-term-%d-<%s>\n", i.term_num_, term.data());
222227
if (last_term_num != (u32)(-1)) {
223228
assert(last_term_num < i.term_num_);
@@ -242,6 +247,12 @@ void ColumnInverter::GeneratePosting() {
242247
// printf(" EndDocument3-%u\n", last_doc_id);
243248
}
244249
// printf("GeneratePosting() end begin_doc_id_ %u, doc_count_ %u, merged_ %u", begin_doc_id_, doc_count_, merged_);
250+
for (auto kv : modified_writers) {
251+
PostingWriter *writer = kv.second;
252+
ret.Add(writer->GetSizeChange());
253+
}
254+
LOG_TRACE(fmt::format("MemUsageChange : {}, {}", ret.is_add_, ret.mem_));
255+
return ret;
245256
}
246257

247258
void ColumnInverter::SortForOfflineDump() {
@@ -258,7 +269,7 @@ void ColumnInverter::SortForOfflineDump() {
258269
// ----------------------------------------------------------------------------------------------------------------------------+
259270
// Data within each group
260271

261-
void ColumnInverter::SpillSortResults(FILE *spill_file, u64 &tuple_count, UniquePtr<BufWriter>& buf_writer) {
272+
void ColumnInverter::SpillSortResults(FILE *spill_file, u64 &tuple_count, UniquePtr<BufWriter> &buf_writer) {
262273
// spill sort results for external merge sort
263274
// if (positions_.empty()) {
264275
// return;
@@ -267,19 +278,19 @@ void ColumnInverter::SpillSortResults(FILE *spill_file, u64 &tuple_count, Unique
267278
// size of this Run in bytes
268279
u32 data_size = 0;
269280
u64 data_size_pos = spill_file_tell;
270-
buf_writer->Write((const char*)&data_size, sizeof(u32));
281+
buf_writer->Write((const char *)&data_size, sizeof(u32));
271282
spill_file_tell += sizeof(u32);
272283

273284
// number of tuples
274285
u32 num_of_tuples = positions_.size();
275286
tuple_count += num_of_tuples;
276-
buf_writer->Write((const char*)&num_of_tuples, sizeof(u32));
287+
buf_writer->Write((const char *)&num_of_tuples, sizeof(u32));
277288
spill_file_tell += sizeof(u32);
278289

279290
// start offset for next spill
280291
u64 next_start_offset = 0;
281292
u64 next_start_offset_pos = spill_file_tell;
282-
buf_writer->Write((const char*)&next_start_offset, sizeof(u64));
293+
buf_writer->Write((const char *)&next_start_offset, sizeof(u64));
283294
spill_file_tell += sizeof(u64);
284295

285296
u64 data_start_offset = spill_file_tell;
@@ -295,11 +306,11 @@ void ColumnInverter::SpillSortResults(FILE *spill_file, u64 &tuple_count, Unique
295306
}
296307
record_length = term.size() + sizeof(docid_t) + sizeof(u32) + 1;
297308

298-
buf_writer->Write((const char*)&record_length, sizeof(u32));
309+
buf_writer->Write((const char *)&record_length, sizeof(u32));
299310
buf_writer->Write(term.data(), term.size());
300-
buf_writer->Write((const char*)&str_null, sizeof(char));
301-
buf_writer->Write((const char*)&(i.doc_id_), sizeof(docid_t));
302-
buf_writer->Write((const char*)&(i.term_pos_), sizeof(u32));
311+
buf_writer->Write((const char *)&str_null, sizeof(char));
312+
buf_writer->Write((const char *)&(i.doc_id_), sizeof(docid_t));
313+
buf_writer->Write((const char *)&(i.term_pos_), sizeof(u32));
303314
}
304315
buf_writer->Flush();
305316
// update data size
@@ -312,4 +323,4 @@ void ColumnInverter::SpillSortResults(FILE *spill_file, u64 &tuple_count, Unique
312323
fseek(spill_file, next_start_offset, SEEK_SET);
313324
}
314325

315-
} // namespace infinity
326+
} // namespace infinity

src/storage/invertedindex/column_inverter.cppm

+3-2
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import internal_types;
2828
import posting_writer;
2929
import vector_with_lock;
3030
import buf_writer;
31+
import mem_usage_change;
3132

3233
namespace infinity {
3334

@@ -52,7 +53,7 @@ public:
5253

5354
void Sort();
5455

55-
void GeneratePosting();
56+
MemUsageChange GeneratePosting();
5657

5758
u32 GetDocCount() { return doc_count_; }
5859

@@ -74,7 +75,7 @@ public:
7475
}
7576
};
7677

77-
void SpillSortResults(FILE *spill_file, u64 &tuple_count, UniquePtr<BufWriter>& buf_writer);
78+
void SpillSortResults(FILE *spill_file, u64 &tuple_count, UniquePtr<BufWriter> &buf_writer);
7879

7980
private:
8081
using TermBuffer = Vector<char>;

src/storage/invertedindex/format/doc_list_encoder.cppm

+3-1
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,8 @@ public:
5151

5252
PostingByteSlice *GetDocListBuffer() { return &doc_list_buffer_; }
5353

54+
inline SizeT GetSizeInBytes() const { return doc_list_buffer_.GetSizeInBytes() + doc_skiplist_writer_->GetSizeInBytes(); }
55+
5456
private:
5557
void AddDocument(docid_t doc_id, docpayload_t doc_payload, tf_t tf, u32 doc_len);
5658

@@ -78,4 +80,4 @@ private:
7880
friend class InMemDocListDecoderTest;
7981
};
8082

81-
} // namespace infinity
83+
} // namespace infinity

src/storage/invertedindex/format/position_list_encoder.cppm

+6-5
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,7 @@ namespace infinity {
1818

1919
export class PositionListEncoder {
2020
public:
21-
PositionListEncoder(const PostingFormatOption &format_option,
22-
const PositionListFormat *pos_list_format = nullptr);
21+
PositionListEncoder(const PostingFormatOption &format_option, const PositionListFormat *pos_list_format = nullptr);
2322

2423
~PositionListEncoder();
2524

@@ -38,17 +37,19 @@ public:
3837

3938
const PositionListFormat *GetPositionListFormat() const { return pos_list_format_; }
4039

40+
inline SizeT GetSizeInBytes() const { return pos_list_buffer_.GetSizeInBytes() + pos_skiplist_writer_->GetSizeInBytes(); }
41+
4142
private:
4243
void CreatePosSkipListWriter();
4344
void AddPosSkipListItem(u32 total_pos_count, u32 compressed_pos_size, bool need_flush);
4445
void FlushPositionBuffer();
4546

4647
private:
4748
PostingByteSlice pos_list_buffer_;
48-
pos_t last_pos_in_cur_doc_; // 4byte
49-
u32 total_pos_count_; // 4byte
49+
pos_t last_pos_in_cur_doc_; // 4byte
50+
u32 total_pos_count_; // 4byte
5051
PostingFormatOption format_option_;
51-
bool is_own_format_; // 1byte
52+
bool is_own_format_; // 1byte
5253
UniquePtr<SkipListWriter> pos_skiplist_writer_;
5354
const PositionListFormat *pos_list_format_;
5455
};

src/storage/invertedindex/format/posting_buffer.cppm

+2
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@ public:
3737

3838
u8 Size() const { return size_; }
3939

40+
inline SizeT GetSizeInBytes() const { return capacity_ * posting_fields_->GetTotalSize(); }
41+
4042
u8 GetRowCount() const { return posting_fields_->GetSize(); }
4143

4244
template <typename T>

src/storage/invertedindex/format/posting_byte_slice.cppm

+3-1
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,8 @@ public:
5757

5858
SizeT EstimateDumpSize() const { return posting_writer_.GetSize(); }
5959

60+
inline SizeT GetSizeInBytes() const { return buffer_.GetSizeInBytes() + posting_writer_.GetSize(); }
61+
6062
protected:
6163
SizeT DoFlush();
6264

@@ -71,4 +73,4 @@ inline void PostingByteSlice::PushBack(u8 row, T value) {
7173
buffer_.PushBack(row, value);
7274
}
7375

74-
} // namespace infinity
76+
} // namespace infinity

0 commit comments

Comments
 (0)