Skip to content

Same name (case insensitive) in tablet / table. #490

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions cpp/src/common/schema.h
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,10 @@ class TableSchema {
}
}

size_t get_column_pos_index_num() const {
return column_pos_index_.size();
}

void update(ChunkGroupMeta *chunk_group_meta) {
for (auto iter = chunk_group_meta->chunk_meta_list_.begin();
iter != chunk_group_meta->chunk_meta_list_.end(); iter++) {
Expand Down
13 changes: 12 additions & 1 deletion cpp/src/common/tablet.cc
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ int Tablet::init() {
ins_res = schema_map_.insert(
std::make_pair(schema_vec_->at(c).measurement_name_, c));
if (!ins_res.second) {
ASSERT(false);
// maybe dup measurement_name
return E_INVALID_ARG;
}
Expand Down Expand Up @@ -131,6 +130,9 @@ void Tablet::destroy() {
}

int Tablet::add_timestamp(uint32_t row_index, int64_t timestamp) {
if (err_code_ != E_OK) {
return err_code_;
}
ASSERT(timestamps_ != NULL);
if (UNLIKELY(row_index >= static_cast<uint32_t>(max_row_num_))) {
ASSERT(false);
Expand Down Expand Up @@ -223,6 +225,9 @@ void Tablet::process_val(uint32_t row_index, uint32_t schema_index, T val) {

template <typename T>
int Tablet::add_value(uint32_t row_index, uint32_t schema_index, T val) {
if (err_code_ != E_OK) {
return err_code_;
}
int ret = common::E_OK;
if (UNLIKELY(schema_index >= schema_vec_->size())) {
ASSERT(false);
Expand Down Expand Up @@ -250,6 +255,9 @@ int Tablet::add_value(uint32_t row_index, uint32_t schema_index, T val) {
template <>
int Tablet::add_value(uint32_t row_index, uint32_t schema_index,
common::String val) {
if (err_code_ != E_OK) {
return err_code_;
}
int ret = common::E_OK;
if (UNLIKELY(schema_index >= schema_vec_->size())) {
ASSERT(false);
Expand All @@ -269,6 +277,9 @@ template <typename T>
int Tablet::add_value(uint32_t row_index, const std::string &measurement_name,
T val) {
int ret = common::E_OK;
if (err_code_ != E_OK) {
return err_code_;
}
SchemaMapIterator find_iter = schema_map_.find(measurement_name);
if (LIKELY(find_iter == schema_map_.end())) {
ASSERT(false);
Expand Down
13 changes: 7 additions & 6 deletions cpp/src/common/tablet.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ class Tablet {

public:
static const uint32_t DEFAULT_MAX_ROWS = 1024;
int err_code_ = common::E_OK;

public:
Tablet(const std::string &device_id,
Expand All @@ -75,7 +76,7 @@ class Tablet {
ASSERT(false);
max_row_num_ = DEFAULT_MAX_ROWS;
}
init();
err_code_ = init();
}

Tablet(const std::string &device_id,
Expand Down Expand Up @@ -106,7 +107,7 @@ class Tablet {
return MeasurementSchema(name, type);
});
schema_vec_ = std::make_shared<std::vector<MeasurementSchema>>(measurement_vec);
init();
err_code_ = init();
}

Tablet(const std::string &insert_target_name,
Expand All @@ -127,7 +128,7 @@ class Tablet {
common::get_default_compressor()));
}
set_column_categories(column_categories);
init();
err_code_ = init();
}

/**
Expand All @@ -150,10 +151,10 @@ class Tablet {
schema_vec_ = std::make_shared<std::vector<MeasurementSchema>>();
for (size_t i = 0; i < column_names.size(); i++) {
schema_vec_->emplace_back(
MeasurementSchema(column_names[i], data_types[i], common::get_value_encoder(data_types[i]),
common::get_default_compressor()));
column_names[i], data_types[i], common::get_value_encoder(data_types[i]),
common::get_default_compressor());
}
init();
err_code_ = init();
}

~Tablet() { destroy(); }
Expand Down
4 changes: 4 additions & 0 deletions cpp/src/writer/tsfile_table_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ int storage::TsFileTableWriter::register_table(const std::shared_ptr<TableSchema
}

int storage::TsFileTableWriter::write_table(storage::Tablet& tablet) const {
// DIRTY CODE...
if (common::E_OK != error_number) {
return error_number;
}
if (tablet.get_table_name().empty()) {
tablet.set_table_name(exclusive_table_name_);
} else if (!exclusive_table_name_.empty() && tablet.get_table_name() != exclusive_table_name_) {
Expand Down
6 changes: 5 additions & 1 deletion cpp/src/writer/tsfile_table_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ class TsFileTableWriter {
// Perform a deep copy. The source TableSchema object may be
// stack/heap-allocated.
auto table_schema_ptr = std::make_shared<TableSchema>(*table_schema);
tsfile_writer_->register_table(table_schema_ptr);
error_number = tsfile_writer_->register_table(table_schema_ptr);
exclusive_table_name_ = table_schema->get_table_name();
common::g_config_value_.chunk_group_size_threshold_ = memory_threshold;
}
Expand Down Expand Up @@ -106,6 +106,10 @@ class TsFileTableWriter {
// if this TsFile only contains one table, this will be its name, otherwise,
// it will be an empty string
std::string exclusive_table_name_;

// Some errors may not be conveyed during the construction phase, so it's
// necessary to maintain an internal error code.
int error_number = common::E_OK;
};

} // namespace storage
Expand Down
25 changes: 23 additions & 2 deletions cpp/src/writer/tsfile_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,27 @@ void TsFileWriter::set_generate_table_schema(bool generate_table_schema) {
int TsFileWriter::register_table(
const std::shared_ptr<TableSchema> &table_schema) {
if (!table_schema) return E_INVALID_ARG;

// Empty table name or column name is not allowed.
if (table_schema->get_table_name().empty()) {
return E_INVALID_ARG;
}
for (const auto &name : table_schema->get_measurement_names()) {
if (name.empty()) {
return E_INVALID_ARG;
}
}

// Because it is not possible to return an error code for duplicate name
// checks during the construction phase of TabletSchema, the duplicate name
// check has been moved to the table registration stage.

// TODO: Add Debug INFO if ErrorCode is not enough to describe problems.
if (table_schema->get_column_pos_index_num() !=
table_schema->get_measurement_names().size()) {
return E_INVALID_ARG;
}

if (io_writer_->get_schema()->table_schema_map_.find(
table_schema->get_table_name()) !=
io_writer_->get_schema()->table_schema_map_.end()) {
Expand Down Expand Up @@ -671,7 +692,7 @@ int TsFileWriter::write_tablet_aligned(const Tablet &tablet) {
continue;
}
if (RET_FAIL(value_write_column(value_chunk_writer, tablet, c, 0,
tablet.get_cur_row_size()))) {
tablet.get_cur_row_size()))) {
return ret;
}
}
Expand Down Expand Up @@ -764,7 +785,7 @@ int TsFileWriter::write_table(Tablet &tablet) {
continue;
}
if (RET_FAIL(write_column(chunk_writer, tablet, c, start_idx,
device_id_end_index_pair.second))) {
device_id_end_index_pair.second))) {
return ret;
}
}
Expand Down
36 changes: 36 additions & 0 deletions cpp/test/writer/table_view/tsfile_writer_table_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -303,4 +303,40 @@ TEST_F(TsFileWriterTableTest, WriteAndReadSimple) {

reader.close();
delete table_schema;
}

TEST_F(TsFileWriterTableTest, DuplicateColumnName) {
std::vector<MeasurementSchema*> measurement_schemas;
std::vector<ColumnCategory> column_categories;
measurement_schemas.resize(3);
measurement_schemas[0] = new MeasurementSchema("device", STRING);
column_categories.emplace_back(ColumnCategory::TAG);
measurement_schemas[1] = new MeasurementSchema("Device", STRING);
column_categories.emplace_back(ColumnCategory::TAG);
measurement_schemas[2] = new MeasurementSchema("value", DOUBLE);
column_categories.emplace_back(ColumnCategory::FIELD);
TableSchema* table_schema = new TableSchema("test_table", measurement_schemas, column_categories);
auto tsfile_table_writer =
std::make_shared<TsFileTableWriter>(&write_file_, table_schema);
Tablet tablet = Tablet(table_schema->get_measurement_names(), table_schema->get_data_types());
tablet.set_table_name("test_table");
ASSERT_EQ(E_INVALID_ARG, tablet.add_timestamp(0, 10));
ASSERT_EQ(E_INVALID_ARG, tablet.add_value(1, 1, 10));
ASSERT_EQ(E_INVALID_ARG, tablet.add_value(1, "test", 10));
std::vector<MeasurementSchema> measurement_schemas2;
for (int i = 0; i < 2; i++) {
measurement_schemas2.push_back(*measurement_schemas[i]);
}
Tablet tablet1 = Tablet("test_table", std::make_shared<std::vector<MeasurementSchema>>(measurement_schemas2));
tablet1.set_table_name("test_table");
ASSERT_EQ(E_INVALID_ARG, tablet1.add_timestamp(0, 10));
ASSERT_EQ(E_INVALID_ARG, tablet1.add_value(1, 1, 10));
ASSERT_EQ(E_INVALID_ARG, tablet1.add_value(1, "test", 10));

ASSERT_EQ(E_INVALID_ARG, tsfile_table_writer->write_table(tablet));
ASSERT_EQ(E_INVALID_ARG, tsfile_table_writer->register_table(std::make_shared<TableSchema>(*table_schema)));
for (int i = 0; i < 3; i++) {
delete measurement_schemas[i];
}

}
Loading