Skip to content

Distributed IO #533

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 21 commits into from
Dec 14, 2021
Merged

Distributed IO #533

merged 21 commits into from
Dec 14, 2021

Conversation

ahmet-uyar
Copy link
Collaborator

Implemented distributed read/write for csv, json and parquet files.
Added utility codes to Cylon code base, so that Cylon IO can be easily implemented.

Details can be found in the following design document:
https://docs.google.com/document/d/1y7TAU_15qNkQ9DBkw43El1CimhkAMqemW5dcGAwbZmk/edit?usp=sharing

Comment on lines 364 to 369
buffers.resize(ctx->GetWorldSize());
int32_t buf_start = 0;
for (int i = 0; i < ctx->GetWorldSize(); ++i) {
buffers[i] = std::make_shared<arrow::Buffer>(received_buf.data() + buf_start, all_buffer_sizes[i]);
buf_start += all_buffer_sizes[i];
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
buffers.resize(ctx->GetWorldSize());
int32_t buf_start = 0;
for (int i = 0; i < ctx->GetWorldSize(); ++i) {
buffers[i] = std::make_shared<arrow::Buffer>(received_buf.data() + buf_start, all_buffer_sizes[i]);
buf_start += all_buffer_sizes[i];
}
buffers.reserve(ctx->GetWorldSize());
int32_t buf_start = 0;
for (int i = 0; i < ctx->GetWorldSize(); ++i) {
buffers.push_back(std::make_shared<arrow::Buffer>(received_buf.data() + buf_start, all_buffer_sizes[i]));
buf_start += all_buffer_sizes[i];
}

Copy link
Collaborator

@nirandaperera nirandaperera left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added some minor comments. I am happy with the approach though 🙂
Good job @ahmet-uyar

return cylon::Status(cylon::Code::ExecutionError, "MPI_Gather failed when receiving buffer sizes!");
}

std::shared_ptr<arrow::Buffer> all_buf = std::move(arrow::AllocateBuffer(0).MoveValueUnsafe());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
std::shared_ptr<arrow::Buffer> all_buf = std::move(arrow::AllocateBuffer(0).MoveValueUnsafe());
std::shared_ptr<arrow::Buffer> all_buf;

we can leave the shared_ptr as it is (ie. nullptr)

@nirandaperera
Copy link
Collaborator

@ahmet-uyar what about that bcast_schema method? 🙂

@ahmet-uyar
Copy link
Collaborator Author

@nirandaperera I have added arrow::buffer broadcast function. One can easily serialize a schema and broadcast. I think, we do not need a separate function for that.

@nirandaperera nirandaperera merged commit 62a3f08 into cylondata:main Dec 14, 2021
nirandaperera pushed a commit that referenced this pull request Dec 16, 2021
* initial implementation of read_csv

* added write_csv

* added json read and write methods

* implemented allgather for arrow.Buffer

* added schema checking after reading and initializing empty dataframes

* added test cases for read/write csv/json

* added json data files for read tests

* added read_parquet function

* added write_parquet and test files for parquet io

* added parquet files for io testing

* added allgathering metadata, improved row_group distribution among workers

* added MPI gathering arrow buffer function

* implemented _metadata writing for parquet files

* improved write function signatures

* added reading parquet directories with _metadata files, improved documentation and testing

* moved io util functions to pycylon/util directory

* avoided buffer copying when gathering and allgathering arrow Buffers

* added arrow buffer broadcasting

* error checking on arrow::Buffer allocation and using std::partial_sum

* improved looping in gather
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants