Skip to content

Colin fix read overflow #496

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 10 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,7 @@ if (${COV_ENABLED})
endif()


if (NOT CMAKE_BUILD_TYPE)
set(CMAKE_BUILD_TYPE "Release" CACHE STRING "Choose the type of build." FORCE)
endif ()
set(CMAKE_BUILD_TYPE "Release" CACHE STRING "Choose the type of build." FORCE)

message("CMAKE BUILD TYPE " ${CMAKE_BUILD_TYPE})
if (CMAKE_BUILD_TYPE STREQUAL "Debug")
Expand Down
18 changes: 9 additions & 9 deletions cpp/src/common/allocator/byte_stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -305,14 +305,14 @@ class ByteStream {
void clear_wrapped_buf() { wrapped_page_.buf_ = nullptr; }

/* ================ Part 1: basic ================ */
FORCE_INLINE uint32_t remaining_size() const {
FORCE_INLINE int64_t remaining_size() const {
ASSERT(total_size_.load() >= read_pos_);
return total_size_.load() - read_pos_;
}
FORCE_INLINE bool has_remaining() const { return remaining_size() > 0; }

FORCE_INLINE void mark_read_pos() { marked_read_pos_ = read_pos_; }
FORCE_INLINE uint32_t get_mark_len() const {
FORCE_INLINE int64_t get_mark_len() const {
ASSERT(marked_read_pos_ <= read_pos_);
return read_pos_ - marked_read_pos_;
}
Expand Down Expand Up @@ -345,8 +345,8 @@ class ByteStream {
this->total_size_.store(other.total_size_.load());
}

FORCE_INLINE uint32_t total_size() const { return total_size_.load(); }
FORCE_INLINE uint32_t read_pos() const { return read_pos_; };
FORCE_INLINE int64_t total_size() const { return total_size_.load(); }
FORCE_INLINE int64_t read_pos() const { return read_pos_; };
FORCE_INLINE void wrapped_buf_advance_read_pos(uint32_t size) {
if (size + read_pos_ > total_size_.load()) {
read_pos_ = total_size_.load();
Expand Down Expand Up @@ -526,7 +526,7 @@ class ByteStream {

// get tail position <tail_, total_size_> atomically
Page *host_end = nullptr;
uint32_t host_total_size = 0;
int64_t host_total_size = 0;
while (true) {
host_end = host_.tail_.load();
host_total_size = host_.total_size_.load();
Expand Down Expand Up @@ -642,10 +642,10 @@ class ByteStream {
OptionalAtomic<Page *> head_;
OptionalAtomic<Page *> tail_;
Page *read_page_; // only one thread is allow to reader this ByteStream
OptionalAtomic<uint32_t> total_size_; // total size in byte
uint32_t read_pos_; // current reader position
uint32_t marked_read_pos_; // current reader position
uint32_t page_size_;
OptionalAtomic<int64_t> total_size_; // total size in byte
int64_t read_pos_; // current reader position
int64_t marked_read_pos_; // current reader position
int64_t page_size_;
AllocModID mid_;
Page wrapped_page_;
};
Expand Down
4 changes: 2 additions & 2 deletions cpp/src/file/read_file.cc
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ int ReadFile::open(const std::string &file_path) {
return ret;
}

int ReadFile::get_file_size(int32_t &file_size) {
int64_t ReadFile::get_file_size(int64_t &file_size) {
struct stat s;
if (fstat(fd_, &s) < 0) {
LOGE("fstat error, file_path=" << file_path_.c_str() << "fd=" << fd_
Expand Down Expand Up @@ -109,7 +109,7 @@ int ReadFile::check_file_magic() {
return ret;
}

int ReadFile::read(int32_t offset, char *buf, int32_t buf_size,
int ReadFile::read(int64_t offset, char *buf, int32_t buf_size,
int32_t &read_len) {
int ret = E_OK;
read_len = 0;
Expand Down
8 changes: 4 additions & 4 deletions cpp/src/file/read_file.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,19 +37,19 @@ class ReadFile {

int open(const std::string &file_path);
FORCE_INLINE bool is_opened() const { return fd_ > 0; }
FORCE_INLINE int32_t file_size() const { return file_size_; }
FORCE_INLINE int64_t file_size() const { return file_size_; }
FORCE_INLINE const std::string &file_path() const { return file_path_; }

/*
* try to reader @buf_size bytes from @offset of this file
* into @buf. @read_len return the actual len reader.
*/
int read(int32_t offset, char *buf, int32_t buf_size,
int read(int64_t offset, char *buf, int32_t buf_size,
int32_t &ret_read_len);
void close();

private:
int get_file_size(int32_t &file_size);
int64_t get_file_size(int64_t &file_size);
int check_file_magic();

private:
Expand All @@ -59,7 +59,7 @@ class ReadFile {
private:
std::string file_path_;
int fd_;
int32_t file_size_;
int64_t file_size_;
};

} // end namespace storage
Expand Down
6 changes: 3 additions & 3 deletions cpp/src/file/tsfile_io_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ int TsFileIOReader::load_tsfile_meta() {

int ret = E_OK;
uint32_t tsfile_meta_size = 0;
int32_t read_offset = 0;
int64_t read_offset = 0;
int32_t ret_read_len = 0;

// Step 1: reader the tsfile_meta_size
Expand Down Expand Up @@ -377,8 +377,8 @@ int TsFileIOReader::load_all_measurement_index_entry(
return ret;
}

int TsFileIOReader::read_device_meta_index(int32_t start_offset,
int32_t end_offset,
int TsFileIOReader::read_device_meta_index(int64_t start_offset,
int64_t end_offset,
common::PageArena &pa,
MetaIndexNode *&device_meta_index,
bool leaf) {
Expand Down
4 changes: 2 additions & 2 deletions cpp/src/file/tsfile_io_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ class TsFileIOReader {

int get_chunk_metadata_list(IDeviceID device_id, std::string measurement,
std::vector<ChunkMeta *> &chunk_meta_list);
int read_device_meta_index(int32_t start_offset, int32_t end_offset,
int read_device_meta_index(int64_t start_offset, int64_t end_offset,
common::PageArena &pa,
MetaIndexNode *&device_meta_index,
bool leaf);
Expand All @@ -86,7 +86,7 @@ class TsFileIOReader {
common::PageArena &pa);

private:
FORCE_INLINE int32_t file_size() const { return read_file_->file_size(); }
FORCE_INLINE int64_t file_size() const { return read_file_->file_size(); }

int load_tsfile_meta();

Expand Down
53 changes: 31 additions & 22 deletions cpp/src/reader/aligned_chunk_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -50,14 +50,18 @@ void AlignedChunkReader::reset() {
cur_time_page_header_.reset();
cur_value_page_header_.reset();

char *file_data_buf = time_in_stream_.get_wrapped_buf();
if (file_data_buf != nullptr) {
mem_free(file_data_buf);
if (time_in_stream_.total_size() != 0) {
char *file_data_buf = time_in_stream_.get_wrapped_buf();
if (file_data_buf != nullptr) {
mem_free(file_data_buf);
}
}
time_in_stream_.reset();
file_data_buf = value_in_stream_.get_wrapped_buf();
if (file_data_buf != nullptr) {
mem_free(file_data_buf);
if (value_in_stream_.total_size() != 0) {
char *file_data_buf = value_in_stream_.get_wrapped_buf();
if (file_data_buf != nullptr) {
mem_free(file_data_buf);
}
}
value_in_stream_.reset();
file_data_time_buf_size_ = 0;
Expand Down Expand Up @@ -87,17 +91,23 @@ void AlignedChunkReader::destroy() {
CompressorFactory::free(value_compressor_);
value_compressor_ = nullptr;
}
char *buf = time_in_stream_.get_wrapped_buf();
if (buf != nullptr) {
mem_free(buf);

if (time_in_stream_.total_size() != 0) {
char *file_data_buf = time_in_stream_.get_wrapped_buf();
if (file_data_buf != nullptr) {
mem_free(file_data_buf);
}
time_in_stream_.clear_wrapped_buf();
}
cur_time_page_header_.reset();
buf = value_in_stream_.get_wrapped_buf();
if (buf != nullptr) {
mem_free(buf);
time_in_stream_.reset();
if (value_in_stream_.total_size() != 0) {
char *file_data_buf = value_in_stream_.get_wrapped_buf();
if (file_data_buf != nullptr) {
mem_free(file_data_buf);
}
value_in_stream_.clear_wrapped_buf();
}
cur_time_page_header_.reset();
cur_value_page_header_.reset();
chunk_header_.~ChunkHeader();
}
Expand Down Expand Up @@ -300,10 +310,11 @@ int AlignedChunkReader::read_from_file_and_rewrap(
int ret = E_OK;
const int DEFAULT_READ_SIZE = 4096; // may use page_size + page_header_size
char *file_data_buf = in_stream_.get_wrapped_buf();
int offset = chunk_meta->offset_of_chunk_header_ + chunk_visit_offset;
int64_t offset = chunk_meta->offset_of_chunk_header_ + chunk_visit_offset;
int read_size =
(want_size < DEFAULT_READ_SIZE ? DEFAULT_READ_SIZE : want_size);
if (file_data_buf_size < read_size || (may_shrink && read_size < file_data_buf_size / 10)) {
if (file_data_buf_size < read_size ||
(may_shrink && read_size < file_data_buf_size / 10)) {
file_data_buf = (char *)mem_realloc(file_data_buf, read_size);
if (IS_NULL(file_data_buf)) {
return E_OOM;
Expand Down Expand Up @@ -367,7 +378,6 @@ int AlignedChunkReader::decode_cur_time_page_data() {
uint32_t time_compressed_buf_size = 0;
uint32_t time_uncompressed_buf_size = 0;


// Step 2: do uncompress
if (IS_SUCC(ret)) {
time_compressed_buf =
Expand Down Expand Up @@ -520,9 +530,9 @@ int AlignedChunkReader::decode_time_value_buf_into_tsblock(
uint32_t mask = 1 << 7; \
int64_t time = 0; \
CppType value; \
while ((time_decoder_->has_remaining() || time_in.has_remaining()) \
&& (value_decoder_->has_remaining() || \
value_in.has_remaining())){ \
while ( \
(time_decoder_->has_remaining() || time_in.has_remaining()) && \
(value_decoder_->has_remaining() || value_in.has_remaining())) { \
cur_value_index++; \
if (((value_page_col_notnull_bitmap_[cur_value_index / 8] & \
0xFF) & \
Expand All @@ -531,16 +541,15 @@ int AlignedChunkReader::decode_time_value_buf_into_tsblock(
if (ret != E_OK) { \
break; \
} \
ret = value_decoder_->read_##ReadType(value, \
value_in); \
ret = value_decoder_->read_##ReadType(value, value_in); \
if (ret != E_OK) { \
break; \
} \
continue; \
} \
if (UNLIKELY(!row_appender.add_row())) { \
ret = E_OVERFLOW; \
cur_value_index--; \
cur_value_index--; \
break; \
} else if (RET_FAIL(time_decoder_->read_int64(time, time_in))) { \
} else if (RET_FAIL(value_decoder_->read_##ReadType(value, \
Expand Down
9 changes: 9 additions & 0 deletions cpp/src/reader/aligned_chunk_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,15 @@ class AlignedChunkReader : public IChunkReader {
int get_next_page(common::TsBlock *tsblock, Filter *oneshoot_filter,
common::PageArena &pa) override;

bool should_skip(Filter *filter) override {
if (filter != nullptr && time_chunk_meta_ != nullptr &&
time_chunk_meta_->statistic_ != nullptr &&
!filter->satisfy(time_chunk_meta_->statistic_)) {
return true;
}
return false;
}

private:
FORCE_INLINE bool chunk_has_only_one_page(
const ChunkHeader &chunk_header) const {
Expand Down
4 changes: 2 additions & 2 deletions cpp/src/reader/device_meta_iterator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,8 @@ int DeviceMetaIterator::load_leaf_device(MetaIndexNode* meta_index_node) {
if (id_filter_ != nullptr /*TODO: !id_filter_->satisfy(device_id)*/) {
continue;
}
int32_t start_offset = child->get_offset();
int32_t end_offset = i + 1 < leaf_children.size()
int64_t start_offset = child->get_offset();
int64_t end_offset = i + 1 < leaf_children.size()
? leaf_children[i + 1]->get_offset()
: meta_index_node->end_offset_;
MetaIndexNode* child_node = nullptr;
Expand Down
1 change: 1 addition & 0 deletions cpp/src/reader/ichunk_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ class IChunkReader {
}

virtual ChunkHeader &get_chunk_header() { return chunk_header_; }
virtual bool should_skip(Filter* filter) { return false; }

protected:
ChunkHeader chunk_header_;
Expand Down
68 changes: 39 additions & 29 deletions cpp/src/reader/tsfile_series_scan_iterator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -42,44 +42,54 @@ int TsFileSeriesScanIterator::get_next(TsBlock *&ret_tsblock, bool alloc,
int ret = E_OK;
Filter *filter =
(oneshoot_filter != nullptr) ? oneshoot_filter : time_filter_;
if (!chunk_reader_->has_more_data()) {
while (true) {
if (!has_next_chunk()) {
return E_NO_MORE_DATA;
} else {
if (alloc) {
ret_tsblock = alloc_tsblock();
}

if (chunk_reader_->should_skip(filter)) {
chunk_reader_->reset();
}

while (true) {
if (!chunk_reader_->has_more_data()) {
while (true) {
if (!has_next_chunk()) {
return E_NO_MORE_DATA;
}
ChunkMeta *cm = nullptr;
ChunkMeta *time_cm = nullptr;
ChunkMeta *value_cm = nullptr;
if (!is_aligned_) {
cm = get_current_chunk_meta();
} else {
time_cm = time_chunk_meta_cursor_.get();
value_cm = value_chunk_meta_cursor_.get();
cm = time_cm;
}
advance_to_next_chunk();
if (filter != nullptr && cm->statistic_ != nullptr && !filter->satisfy(cm->statistic_)) {
continue;
}
chunk_reader_->reset();
if (!is_aligned_) {
ChunkMeta *cm = get_current_chunk_meta();
advance_to_next_chunk();
if (filter != nullptr && cm->statistic_ != nullptr &&
!filter->satisfy(cm->statistic_)) {
continue;
}
chunk_reader_->reset();
if (RET_FAIL(chunk_reader_->load_by_meta(cm))) {
return ret;
}
break;
} else {
ChunkMeta *value_cm = value_chunk_meta_cursor_.get();
ChunkMeta *time_cm = time_chunk_meta_cursor_.get();
advance_to_next_chunk();
if (filter != nullptr && value_cm->statistic_ != nullptr &&
!filter->satisfy(value_cm->statistic_)) {
continue;
if (RET_FAIL(chunk_reader_->load_by_aligned_meta(time_cm, value_cm))) {
return ret;
}
chunk_reader_->reset();
if (RET_FAIL(chunk_reader_->load_by_aligned_meta(
time_cm, value_cm))) {
}
break;
}
break;
}
}
}
if (IS_SUCC(ret)) {
if (alloc) {
ret_tsblock = alloc_tsblock();
}

ret = chunk_reader_->get_next_page(ret_tsblock, filter, *data_pa_);
if (ret == E_NO_MORE_DATA) {
continue;
} else {
return ret;
}
}
return ret;
}
Expand Down
2 changes: 2 additions & 0 deletions cpp/src/writer/time_chunk_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,8 @@ int TimeChunkWriter::end_encode_chunk() {
int64_t TimeChunkWriter::estimate_max_series_mem_size() {
return chunk_data_.total_size() +
time_page_writer_.estimate_max_mem_size() +
+first_page_data_.compressed_size_ +
(first_page_statistic_ != nullptr ? get_typed_statistic_sizeof(first_page_statistic_->get_type()) : 0) +
PageHeader::estimat_max_page_header_size_without_statistics() +
get_typed_statistic_sizeof(
time_page_writer_.get_statistic()->get_type());
Expand Down
3 changes: 2 additions & 1 deletion cpp/src/writer/value_chunk_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,8 @@ int ValueChunkWriter::end_encode_chunk() {
}

int64_t ValueChunkWriter::estimate_max_series_mem_size() {
return chunk_data_.total_size() +
return chunk_data_.total_size() + first_page_data_.compressed_size_ +
(first_page_statistic_ != nullptr ? get_typed_statistic_sizeof(first_page_statistic_->get_type()) : 0) +
value_page_writer_.estimate_max_mem_size() +
PageHeader::estimat_max_page_header_size_without_statistics() +
get_typed_statistic_sizeof(
Expand Down
Loading