Skip to content

Fix tag empty error and disorder timestamp. #489

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions cpp/src/common/statistic.h
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,11 @@ class Statistic {
ASSERT(false);
return 0;
}

int get_count() const { return count_; }

int64_t get_end_time() const { return end_time_; }

virtual int deserialize_from(common::ByteStream &in) {
int ret = common::E_OK;
if (RET_FAIL(common::SerializationUtil::read_var_uint(
Expand Down
12 changes: 9 additions & 3 deletions cpp/src/reader/block/single_device_tsblock_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -199,9 +199,15 @@ int SingleDeviceTsBlockReader::fill_ids() {
for (const auto& entry : id_column_contexts_) {
const auto& id_column_context = entry.second;
for (int32_t pos : id_column_context.pos_in_result_) {
common::String device_id(
device_query_task_->get_device_id()->get_segments().at(
id_column_context.pos_in_device_id_));
common::String device_id;
if (device_query_task_->get_device_id()->segment_num() <=
id_column_context.pos_in_device_id_) {
device_id = common::String("");
} else {
device_id = common::String(
device_query_task_->get_device_id()->get_segments().at(
id_column_context.pos_in_device_id_));
}
if (RET_FAIL(col_appenders_[pos + 1]->fill(
(char*)&device_id, sizeof(device_id),
current_block_->get_row_count()))) {
Comment on lines 201 to 213
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should distinguish null from empty string.
("table1", null, "tag1") is different from ("table1", "", "tag1")

Expand Down
4 changes: 4 additions & 0 deletions cpp/src/writer/time_page_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,10 @@ class TimePageWriter {

FORCE_INLINE int write(int64_t timestamp) {
int ret = common::E_OK;
if (statistic_->count_ != 0 && is_inited_ &&
timestamp <= statistic_->end_time_) {
return common::E_OUT_OF_ORDER;
}
if (RET_FAIL(time_encoder_->encode(timestamp, time_out_stream_))) {
} else {
statistic_->update(timestamp);
Expand Down
4 changes: 3 additions & 1 deletion cpp/src/writer/tsfile_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -730,7 +730,9 @@ int TsFileWriter::write_table(Tablet &tablet) {
return ret;
}
for (int i = start_idx; i < end_idx; i++) {
time_chunk_writer->write(tablet.timestamps_[i]);
if (RET_FAIL(time_chunk_writer->write(tablet.timestamps_[i]))) {
return ret;
}
}
uint32_t field_col_count = 0;
for (uint32_t i = 0; i < tablet.get_column_count(); ++i) {
Expand Down
116 changes: 109 additions & 7 deletions cpp/test/writer/table_view/tsfile_writer_table_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,56 @@ TEST_F(TsFileWriterTableTest, WriteTableTest) {
delete table_schema;
}

TEST_F(TsFileWriterTableTest, WriteDisorderTest) {
auto table_schema = gen_table_schema(0);
auto tsfile_table_writer_ =
std::make_shared<TsFileTableWriter>(&write_file_, table_schema);

int device_num = 1;
int num_timestamp_per_device = 10;
int offset = 0;
storage::Tablet tablet(table_schema->get_measurement_names(),
table_schema->get_data_types(),
device_num * num_timestamp_per_device);

char* literal = new char[std::strlen("device_id") + 1];
std::strcpy(literal, "device_id");
String literal_str(literal, std::strlen("device_id"));
for (int i = 0; i < device_num; i++) {
for (int l = 0; l < num_timestamp_per_device; l++) {
int row_index = i * num_timestamp_per_device + l;
// disordered timestamp.
tablet.add_timestamp(row_index, l > num_timestamp_per_device / 2
? l - num_timestamp_per_device
: offset + l);
auto column_schemas = table_schema->get_measurement_schemas();
for (const auto& column_schema : column_schemas) {
switch (column_schema->data_type_) {
case TSDataType::INT64:
tablet.add_value(row_index,
column_schema->measurement_name_,
static_cast<int64_t>(i));
break;
case TSDataType::STRING:
tablet.add_value(row_index,
column_schema->measurement_name_,
literal_str);
break;
default:
break;
}
}
}
}
delete[] literal;

ASSERT_EQ(tsfile_table_writer_->write_table(tablet),
common::E_OUT_OF_ORDER);
ASSERT_EQ(tsfile_table_writer_->flush(), common::E_OK);
ASSERT_EQ(tsfile_table_writer_->close(), common::E_OK);
delete table_schema;
}

TEST_F(TsFileWriterTableTest, WriteTableTestMultiFlush) {
auto table_schema = gen_table_schema(0);
auto tsfile_table_writer_ = std::make_shared<TsFileTableWriter>(
Expand Down Expand Up @@ -202,6 +252,62 @@ TEST_F(TsFileWriterTableTest, WriterWithMemoryThreshold) {
delete table_schema;
}

TEST_F(TsFileWriterTableTest, EmptyTagWrite) {
std::vector<MeasurementSchema*> measurement_schemas;
std::vector<ColumnCategory> column_categories;
measurement_schemas.resize(3);
measurement_schemas[0] = new MeasurementSchema("device1", STRING);
measurement_schemas[1] = new MeasurementSchema("device2", STRING);
measurement_schemas[2] = new MeasurementSchema("value", DOUBLE);
column_categories.emplace_back(ColumnCategory::TAG);
column_categories.emplace_back(ColumnCategory::TAG);
column_categories.emplace_back(ColumnCategory::FIELD);
TableSchema* table_schema =
new TableSchema("test_table", measurement_schemas, column_categories);
auto tsfile_table_writer =
std::make_shared<TsFileTableWriter>(&write_file_, table_schema);
Tablet tablet = Tablet(table_schema->get_measurement_names(),
table_schema->get_data_types());
tablet.set_table_name("test_table");
for (int i = 0; i < 100; i++) {
tablet.add_timestamp(i, static_cast<int64_t>(i));
tablet.add_value(i, "device1",
std::string("device" + std::to_string(i)).c_str());
tablet.add_value(i, "device2", "");
tablet.add_value(i, "value", i * 1.1);
}
tsfile_table_writer->write_table(tablet);
tsfile_table_writer->flush();
tsfile_table_writer->close();

TsFileReader reader = TsFileReader();
reader.open(write_file_.get_file_path());
ResultSet* ret = nullptr;
int ret_value =
reader.query("test_table", {"device1", "device2", "value"}, 0, 50, ret);
ASSERT_EQ(common::E_OK, ret_value);

ASSERT_EQ(ret_value, 0);
auto* table_result_set = (TableResultSet*)ret;
bool has_next = false;
int cur_line = 0;
while (IS_SUCC(table_result_set->next(has_next)) && has_next) {
cur_line++;
int64_t timestamp = table_result_set->get_value<int64_t>("time");
ASSERT_EQ(table_result_set->get_value<common::String*>("device1")
->to_std_string(),
"device" + to_string(timestamp));
ASSERT_EQ(table_result_set->get_value<double>("value"),
timestamp * 1.1);
}
ASSERT_EQ(cur_line, 51);
table_result_set->close();
reader.destroy_query_data_set(table_result_set);

reader.close();
delete table_schema;
}

TEST_F(TsFileWriterTableTest, WritehDataTypeMisMatch) {
auto table_schema = gen_table_schema(0);
auto tsfile_table_writer_ = std::make_shared<TsFileTableWriter>(
Expand All @@ -215,9 +321,6 @@ TEST_F(TsFileWriterTableTest, WritehDataTypeMisMatch) {
storage::Tablet tablet(table_schema->get_measurement_names(), datatypes,
device_num * num_timestamp_per_device);

char* literal = new char[std::strlen("device_id") + 1];
std::strcpy(literal, "device_id");
String literal_str(literal, std::strlen("device_id"));
for (int i = 0; i < device_num; i++) {
for (int l = 0; l < num_timestamp_per_device; l++) {
int row_index = i * num_timestamp_per_device + l;
Expand All @@ -236,17 +339,16 @@ TEST_F(TsFileWriterTableTest, WritehDataTypeMisMatch) {
static_cast<int32_t>(i));
break;
case TSDataType::STRING:
tablet.add_value(row_index,
column_schemas[idx]->measurement_name_,
literal_str);
tablet.add_value(
row_index, column_schemas[idx]->measurement_name_,
std::string("device" + to_string(i)).c_str());
break;
default:
break;
}
}
}
}
delete[] literal;
delete table_schema;

ASSERT_EQ(E_TYPE_NOT_MATCH, tsfile_table_writer_->write_table(tablet));
Expand Down
5 changes: 0 additions & 5 deletions cpp/test/writer/time_chunk_writer_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,6 @@ TEST_F(TimeChunkWriterTest, InitWithParameters) {
writer.destroy();
}

TEST_F(TimeChunkWriterTest, WriteBoolean) {
EXPECT_EQ(time_chunk_writer.write(true), E_OK);
EXPECT_EQ(time_chunk_writer.write(false), E_OK);
}

TEST_F(TimeChunkWriterTest, WriteLargeDataSet) {
for (int i = 0; i < 10000; ++i) {
time_chunk_writer.write(i);
Expand Down