Skip to content

Repartition - CPU #526

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 21 commits into from
Nov 23, 2021
Merged

Repartition - CPU #526

merged 21 commits into from
Nov 23, 2021

Conversation

kaiyingshan
Copy link
Collaborator

repartition c++ implementation and tests

@nirandaperera nirandaperera changed the title Repartition Repartition - CPU Nov 2, 2021
Copy link
Collaborator

@nirandaperera nirandaperera left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kaiyingshan Great work! I requested some changes. Let me know if those make sense/ or you need any clarification.

Comment on lines 197 to 200
arrow::Result<std::shared_ptr<arrow::Table>> concat_res =
arrow::ConcatenateTables(received_tables);
RETURN_CYLON_STATUS_IF_ARROW_FAILED(concat_res.status());
const auto &final_table = concat_res.ValueOrDie();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's use CYLON_ASSIGN_OR_RAISE here

Suggested change
arrow::Result<std::shared_ptr<arrow::Table>> concat_res =
arrow::ConcatenateTables(received_tables);
RETURN_CYLON_STATUS_IF_ARROW_FAILED(concat_res.status());
const auto &final_table = concat_res.ValueOrDie();
CYLON_ASSIGN_OR_RAISE(auto final_table, arrow::ConcatenateTables(received_tables))

Comment on lines 203 to 206
arrow::Result<std::shared_ptr<arrow::Table>> combine_res =
final_table->CombineChunks(cylon::ToArrowPool(ctx));
RETURN_CYLON_STATUS_IF_ARROW_FAILED(concat_res.status());
table_out = combine_res.ValueOrDie();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
arrow::Result<std::shared_ptr<arrow::Table>> combine_res =
final_table->CombineChunks(cylon::ToArrowPool(ctx));
RETURN_CYLON_STATUS_IF_ARROW_FAILED(concat_res.status());
table_out = combine_res.ValueOrDie();
CYLON_ASSIGN_OR_RAISE(table_out, final_table->CombineChunks(cylon::ToArrowPool(ctx)))

static inline Status all_to_all_arrow_tables_preserve_order(const std::shared_ptr<CylonContext> &ctx,
const std::shared_ptr<arrow::Schema> &schema,
const std::vector<std::shared_ptr<arrow::Table>> &partitioned_tables,
std::shared_ptr<arrow::Table> &table_out) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we use pointers for output here?

Comment on lines 1187 to 1189
std::vector<int64_t> size = { num_row };
std::vector<int64_t> sizes;
mpi::AllGather(size, world_size, sizes);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

couple of things here. You don't need to allocate a vector to gather. You can simply do that by allocating just a int64_t size variable. My suggestion is,

Suggested change
std::vector<int64_t> size = { num_row };
std::vector<int64_t> sizes;
mpi::AllGather(size, world_size, sizes);
int64_t size = num_row;
std::vector<int64_t> sizes(world_size, 0); // allocate world_size number of slots
int status = mpi::AllGather(&size, world_size, sizes.data());
// this status needs to be checked!!

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

from the code, it seems like mpi::AllGather can only take vectors as arguments, but MPI_Allgather can take pointers. Should I use it?

Comment on lines +1191 to +1197
if(rows_per_partition.size() != world_size) {
return Status(
cylon::Code::ValueError,
"rows_per_partition size does not align with world size. Received " +
std::to_string(rows_per_partition.size()) + ", Expected " +
std::to_string(world_size));
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's move this before the allgather operation. There's no need to do a comm operation if this check is failing.

std::to_string(acc));
}

std::vector<std::pair<int, int>> send_to = find_mapping(start_idx, num_row, rows_per_partition, dest_sizes_acc);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd rather use a vector<int64> instead of vector<pair<int, int64>> here. We can make the index of the vector correspond to the rank, isn't it? 🤔

Comment on lines 1230 to 1233
for(auto p: send_to) {
std::fill(itr + idx, itr + idx + p.second, (uint32_t) receive_build_rank_order[p.first]);
idx += p.second;
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could you please clarify this loop? I didn't understand the receive_build_rank_order[p.first] part

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

p.first is the partition, p.second is the number of elements.
for one partition, it may need to send, for example, a1 elements to partition receive_build_rank_order[0], and then a2 elements to partition receive_build_rank_order[1], then send_to will be {{a1, 0}, {a2, 1}}.

std::vector<std::shared_ptr<arrow::Table>> partitioned_tables;
RETURN_CYLON_STATUS_IF_FAILED(Split(table, no_of_partitions, outPartitions, partitioned_tables));

std::shared_ptr<arrow::Schema> schema = table->get_table()->schema();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
std::shared_ptr<arrow::Schema> schema = table->get_table()->schema();
const auto& schema = table->get_table()->schema();

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

bump

Comment on lines 1272 to 1275
int total = 0;
for(int n: sizes) {
total += n;
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use std::accumulate

Comment on lines +22 to +31
static void verify_test(std::vector<std::vector<std::string>>& expected, std::shared_ptr<Table>& output) {
std::stringstream ss;
output->PrintToOStream(ss);
std::string s;
int i = 0;
while(ss>>s) {
REQUIRE(s == expected[RANK][i++]);
}
REQUIRE(i == expected[RANK].size());
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can use ARROW_EQUALS macro here. To create expected arrow table, use TableFromJSON in arrow_test_utils header

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

bump

@ahmet-uyar
Copy link
Collaborator

ahmet-uyar commented Nov 3, 2021

I have implemented this https://github.com/ahmet-uyar/cylon/blob/repartition/cpp/src/cylon/repartition.hpp
Basically there two two methods I think you have also implemented:

  • DivideRowsEvenly (this is less significant but, may remove replicated code)
  • RowIndicesToAll (this calculates the list of indices to send partitions to)-

@ahmet-uyar
Copy link
Collaborator

While testing, I recommend to test with empty tables and partitions also.
for example:

        std::vector<int64_t> rows_per_partition = {0, 0, 0, 0};
        std::vector<int64_t> rows_per_partition = {12, 0, 0, 0};
        std::vector<int64_t> rows_per_partition = {6, 0, 6, 0};

I also recommend to test non-numeric data types such as strings and dates.
there are data files for that:

  • data/mpiops/sales_nulls_nunascii_x.csv

@nirandaperera
Copy link
Collaborator

I have implemented this https://github.com/ahmet-uyar/cylon/blob/repartition/cpp/src/cylon/repartition.hpp Basically there two two methods I think you have also implemented:

* DivideRowsEvenly (this is less significant but, may remove replicated code)

* RowIndicesToAll (this calculates the list of indices to send partitions to)-

Yes, I also saw that there were some duplication in the 2 PRs. Best would be to merge 1 PR first and then reuse utils from it in the other.

const std::vector<int64_t>& rows_per_partition,
std::shared_ptr<cylon::Table> *output);

Status Repartition(const std::shared_ptr<cylon::Table>& table,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we may remove this function. Previous function can get rows_per_partition with default value of empty vector. If the vector is empty, it can perform even repartitioning.

@nirandaperera
Copy link
Collaborator

@kaiyingshan I merged #528 now. Ping @ahmet-uyar if you need any help with those utils.

Copy link
Collaborator

@nirandaperera nirandaperera left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kaiyingshan I made some minor comments. I also saw that there were some previous comments that need to be addressed.
Overall, it looks good to me. Good job @kaiyingshan :-)
Let's address these comments and merge this on green CI!

Comment on lines 1133 to 1135
if(num_row == 0) {
return Status::OK();
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think since we are returning Status::OK, we need to set b_out. We can simply assign a to it.

Comment on lines 1139 to 1141
RETURN_CYLON_STATUS_IF_FAILED(Repartition(b, rows_per_partition, b_out));

return Status::OK();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit

Suggested change
RETURN_CYLON_STATUS_IF_FAILED(Repartition(b, rows_per_partition, b_out));
return Status::OK();
return Repartition(b, rows_per_partition, b_out);

@@ -1056,26 +1126,124 @@ Status Equals(const std::shared_ptr<cylon::Table>& a, const std::shared_ptr<cylo
return Status::OK();
}

static Status RepartitionToMatchOtherTable(const std::shared_ptr<cylon::Table> &a, const std::shared_ptr<cylon::Table> &b, std::shared_ptr<cylon::Table> * b_out) {
int world_size = a->GetContext()->GetWorldSize();
int num_row = a->Rows();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rows are usually int64

Comment on lines 1180 to 1182
if(num_row == 0) {
return Status::OK();
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if(num_row == 0) {
return Status::OK();
}
if(num_row == 0) {
*output = table;
return Status::OK();
}

std::vector<std::shared_ptr<arrow::Table>> partitioned_tables;
RETURN_CYLON_STATUS_IF_FAILED(Split(table, no_of_partitions, outPartitions, partitioned_tables));

std::shared_ptr<arrow::Schema> schema = table->get_table()->schema();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

bump

Comment on lines 1247 to 1249
*output = std::make_shared<cylon::Table>(table->GetContext(), table_out);

return Status::OK();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

bump

int num_row = table->Rows();
std::vector<int64_t> size = { num_row };
std::vector<int64_t> sizes;
mpi::AllGather(size, world_size, sizes);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Status of this call needs to be checked

Comment on lines +22 to +31
static void verify_test(std::vector<std::vector<std::string>>& expected, std::shared_ptr<Table>& output) {
std::stringstream ss;
output->PrintToOStream(ss);
std::string s;
int i = 0;
while(ss>>s) {
REQUIRE(s == expected[RANK][i++]);
}
REQUIRE(i == expected[RANK].size());
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

bump

Comment on lines 2 to 17
from utils import create_df,assert_eq
from pycylon.net import MPIConfig
import random

"""
Run test:
>> pytest -q python/pycylon/test/test_repartition.py
"""

def test_repartition():
env=CylonEnv(config=MPIConfig())
df1, _ = create_df([random.sample(range(10, 300), 50),
random.sample(range(10, 300), 50),
random.sample(range(10, 300), 50)])
df2 = df1.repartition([50], None, env=env)
assert_eq(df1, df2)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to run this test, you need add this under test/test_all.py. Otherwise, it wouldn't run.

And I think this could be extended for multiple workers.

Suggested change
from utils import create_df,assert_eq
from pycylon.net import MPIConfig
import random
"""
Run test:
>> pytest -q python/pycylon/test/test_repartition.py
"""
def test_repartition():
env=CylonEnv(config=MPIConfig())
df1, _ = create_df([random.sample(range(10, 300), 50),
random.sample(range(10, 300), 50),
random.sample(range(10, 300), 50)])
df2 = df1.repartition([50], None, env=env)
assert_eq(df1, df2)
from utils import create_df,assert_eq
from pycylon.net import MPIConfig
import random
"""
Run test:
>> pytest -q python/pycylon/test/test_repartition.py
"""
def test_repartition():
env=CylonEnv(config=MPIConfig())
world_sz = env.get_world_size()
df1, _ = create_df([random.sample(range(10, 300), 50),
random.sample(range(10, 300), 50),
random.sample(range(10, 300), 50)])
df2 = df1.repartition([50 for _ in range(world_sz)], None, env=env) # distributed repartition
assert_eq(df1, df2) # still the local partitions would be equal

@nirandaperera nirandaperera merged commit 112ea97 into cylondata:main Nov 23, 2021
@nirandaperera
Copy link
Collaborator

@kaiyingshan Thank you very much for doing this! Great work.. 👍

nirandaperera added a commit that referenced this pull request Dec 16, 2021
* repartion with custom rank order

* use all_to_all that preserves rank order & added tests

* input validation & more test

* add C++ apis and corresponding tests

* minor fix

* python apis

* temporarily delete python api

* python api

* improve coding style, add comments, and refactor find mapping

* remove unused code

* use util function

* refined distributed eq with repartition

* use int64

* create a MacOS yml file (#530)

* equal tests

* fixes

* Update test_repartition.py

minor change

* fixing test failures

* adding an additional test

Co-authored-by: Ziyao22 <[email protected]>
Co-authored-by: niranda perera <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants