-
Notifications
You must be signed in to change notification settings - Fork 47
Distributed IO #533
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Distributed IO #533
Conversation
…mentation and testing
cpp/src/cylon/table_api.cpp
Outdated
buffers.resize(ctx->GetWorldSize()); | ||
int32_t buf_start = 0; | ||
for (int i = 0; i < ctx->GetWorldSize(); ++i) { | ||
buffers[i] = std::make_shared<arrow::Buffer>(received_buf.data() + buf_start, all_buffer_sizes[i]); | ||
buf_start += all_buffer_sizes[i]; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
buffers.resize(ctx->GetWorldSize()); | |
int32_t buf_start = 0; | |
for (int i = 0; i < ctx->GetWorldSize(); ++i) { | |
buffers[i] = std::make_shared<arrow::Buffer>(received_buf.data() + buf_start, all_buffer_sizes[i]); | |
buf_start += all_buffer_sizes[i]; | |
} | |
buffers.reserve(ctx->GetWorldSize()); | |
int32_t buf_start = 0; | |
for (int i = 0; i < ctx->GetWorldSize(); ++i) { | |
buffers.push_back(std::make_shared<arrow::Buffer>(received_buf.data() + buf_start, all_buffer_sizes[i])); | |
buf_start += all_buffer_sizes[i]; | |
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added some minor comments. I am happy with the approach though 🙂
Good job @ahmet-uyar
cpp/src/cylon/net/ops/gather.cpp
Outdated
return cylon::Status(cylon::Code::ExecutionError, "MPI_Gather failed when receiving buffer sizes!"); | ||
} | ||
|
||
std::shared_ptr<arrow::Buffer> all_buf = std::move(arrow::AllocateBuffer(0).MoveValueUnsafe()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
std::shared_ptr<arrow::Buffer> all_buf = std::move(arrow::AllocateBuffer(0).MoveValueUnsafe()); | |
std::shared_ptr<arrow::Buffer> all_buf; |
we can leave the shared_ptr as it is (ie. nullptr
)
@ahmet-uyar what about that |
@nirandaperera I have added arrow::buffer broadcast function. One can easily serialize a schema and broadcast. I think, we do not need a separate function for that. |
* initial implementation of read_csv * added write_csv * added json read and write methods * implemented allgather for arrow.Buffer * added schema checking after reading and initializing empty dataframes * added test cases for read/write csv/json * added json data files for read tests * added read_parquet function * added write_parquet and test files for parquet io * added parquet files for io testing * added allgathering metadata, improved row_group distribution among workers * added MPI gathering arrow buffer function * implemented _metadata writing for parquet files * improved write function signatures * added reading parquet directories with _metadata files, improved documentation and testing * moved io util functions to pycylon/util directory * avoided buffer copying when gathering and allgathering arrow Buffers * added arrow buffer broadcasting * error checking on arrow::Buffer allocation and using std::partial_sum * improved looping in gather
Implemented distributed read/write for csv, json and parquet files.
Added utility codes to Cylon code base, so that Cylon IO can be easily implemented.
Details can be found in the following design document:
https://docs.google.com/document/d/1y7TAU_15qNkQ9DBkw43El1CimhkAMqemW5dcGAwbZmk/edit?usp=sharing