-
Notifications
You must be signed in to change notification settings - Fork 47
Repartition - CPU #526
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Repartition - CPU #526
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@kaiyingshan Great work! I requested some changes. Let me know if those make sense/ or you need any clarification.
cpp/src/cylon/table.cpp
Outdated
arrow::Result<std::shared_ptr<arrow::Table>> concat_res = | ||
arrow::ConcatenateTables(received_tables); | ||
RETURN_CYLON_STATUS_IF_ARROW_FAILED(concat_res.status()); | ||
const auto &final_table = concat_res.ValueOrDie(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's use CYLON_ASSIGN_OR_RAISE
here
arrow::Result<std::shared_ptr<arrow::Table>> concat_res = | |
arrow::ConcatenateTables(received_tables); | |
RETURN_CYLON_STATUS_IF_ARROW_FAILED(concat_res.status()); | |
const auto &final_table = concat_res.ValueOrDie(); | |
CYLON_ASSIGN_OR_RAISE(auto final_table, arrow::ConcatenateTables(received_tables)) |
cpp/src/cylon/table.cpp
Outdated
arrow::Result<std::shared_ptr<arrow::Table>> combine_res = | ||
final_table->CombineChunks(cylon::ToArrowPool(ctx)); | ||
RETURN_CYLON_STATUS_IF_ARROW_FAILED(concat_res.status()); | ||
table_out = combine_res.ValueOrDie(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
arrow::Result<std::shared_ptr<arrow::Table>> combine_res = | |
final_table->CombineChunks(cylon::ToArrowPool(ctx)); | |
RETURN_CYLON_STATUS_IF_ARROW_FAILED(concat_res.status()); | |
table_out = combine_res.ValueOrDie(); | |
CYLON_ASSIGN_OR_RAISE(table_out, final_table->CombineChunks(cylon::ToArrowPool(ctx))) |
static inline Status all_to_all_arrow_tables_preserve_order(const std::shared_ptr<CylonContext> &ctx, | ||
const std::shared_ptr<arrow::Schema> &schema, | ||
const std::vector<std::shared_ptr<arrow::Table>> &partitioned_tables, | ||
std::shared_ptr<arrow::Table> &table_out) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shall we use pointers for output here?
cpp/src/cylon/table.cpp
Outdated
std::vector<int64_t> size = { num_row }; | ||
std::vector<int64_t> sizes; | ||
mpi::AllGather(size, world_size, sizes); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
couple of things here. You don't need to allocate a vector
to gather. You can simply do that by allocating just a int64_t size
variable. My suggestion is,
std::vector<int64_t> size = { num_row }; | |
std::vector<int64_t> sizes; | |
mpi::AllGather(size, world_size, sizes); | |
int64_t size = num_row; | |
std::vector<int64_t> sizes(world_size, 0); // allocate world_size number of slots | |
int status = mpi::AllGather(&size, world_size, sizes.data()); | |
// this status needs to be checked!! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
from the code, it seems like mpi::AllGather can only take vectors as arguments, but MPI_Allgather can take pointers. Should I use it?
if(rows_per_partition.size() != world_size) { | ||
return Status( | ||
cylon::Code::ValueError, | ||
"rows_per_partition size does not align with world size. Received " + | ||
std::to_string(rows_per_partition.size()) + ", Expected " + | ||
std::to_string(world_size)); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's move this before the allgather operation. There's no need to do a comm operation if this check is failing.
cpp/src/cylon/table.cpp
Outdated
std::to_string(acc)); | ||
} | ||
|
||
std::vector<std::pair<int, int>> send_to = find_mapping(start_idx, num_row, rows_per_partition, dest_sizes_acc); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd rather use a vector<int64>
instead of vector<pair<int, int64>>
here. We can make the index of the vector correspond to the rank, isn't it? 🤔
cpp/src/cylon/table.cpp
Outdated
for(auto p: send_to) { | ||
std::fill(itr + idx, itr + idx + p.second, (uint32_t) receive_build_rank_order[p.first]); | ||
idx += p.second; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could you please clarify this loop? I didn't understand the receive_build_rank_order[p.first]
part
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
p.first is the partition, p.second is the number of elements.
for one partition, it may need to send, for example, a1
elements to partition receive_build_rank_order[0]
, and then a2
elements to partition receive_build_rank_order[1]
, then send_to
will be {{a1, 0}, {a2, 1}}
.
cpp/src/cylon/table.cpp
Outdated
std::vector<std::shared_ptr<arrow::Table>> partitioned_tables; | ||
RETURN_CYLON_STATUS_IF_FAILED(Split(table, no_of_partitions, outPartitions, partitioned_tables)); | ||
|
||
std::shared_ptr<arrow::Schema> schema = table->get_table()->schema(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
std::shared_ptr<arrow::Schema> schema = table->get_table()->schema(); | |
const auto& schema = table->get_table()->schema(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
bump
cpp/src/cylon/table.cpp
Outdated
int total = 0; | ||
for(int n: sizes) { | ||
total += n; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use std::accumulate
static void verify_test(std::vector<std::vector<std::string>>& expected, std::shared_ptr<Table>& output) { | ||
std::stringstream ss; | ||
output->PrintToOStream(ss); | ||
std::string s; | ||
int i = 0; | ||
while(ss>>s) { | ||
REQUIRE(s == expected[RANK][i++]); | ||
} | ||
REQUIRE(i == expected[RANK].size()); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you can use ARROW_EQUALS
macro here. To create expected arrow table, use TableFromJSON
in arrow_test_utils header
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
bump
I have implemented this https://github.com/ahmet-uyar/cylon/blob/repartition/cpp/src/cylon/repartition.hpp
|
While testing, I recommend to test with empty tables and partitions also.
I also recommend to test non-numeric data types such as strings and dates.
|
Yes, I also saw that there were some duplication in the 2 PRs. Best would be to merge 1 PR first and then reuse utils from it in the other. |
const std::vector<int64_t>& rows_per_partition, | ||
std::shared_ptr<cylon::Table> *output); | ||
|
||
Status Repartition(const std::shared_ptr<cylon::Table>& table, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we may remove this function. Previous function can get rows_per_partition with default value of empty vector. If the vector is empty, it can perform even repartitioning.
@kaiyingshan I merged #528 now. Ping @ahmet-uyar if you need any help with those utils. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@kaiyingshan I made some minor comments. I also saw that there were some previous comments that need to be addressed.
Overall, it looks good to me. Good job @kaiyingshan :-)
Let's address these comments and merge this on green CI!
cpp/src/cylon/table.cpp
Outdated
if(num_row == 0) { | ||
return Status::OK(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think since we are returning Status::OK
, we need to set b_out
. We can simply assign a
to it.
cpp/src/cylon/table.cpp
Outdated
RETURN_CYLON_STATUS_IF_FAILED(Repartition(b, rows_per_partition, b_out)); | ||
|
||
return Status::OK(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit
RETURN_CYLON_STATUS_IF_FAILED(Repartition(b, rows_per_partition, b_out)); | |
return Status::OK(); | |
return Repartition(b, rows_per_partition, b_out); |
cpp/src/cylon/table.cpp
Outdated
@@ -1056,26 +1126,124 @@ Status Equals(const std::shared_ptr<cylon::Table>& a, const std::shared_ptr<cylo | |||
return Status::OK(); | |||
} | |||
|
|||
static Status RepartitionToMatchOtherTable(const std::shared_ptr<cylon::Table> &a, const std::shared_ptr<cylon::Table> &b, std::shared_ptr<cylon::Table> * b_out) { | |||
int world_size = a->GetContext()->GetWorldSize(); | |||
int num_row = a->Rows(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rows are usually int64
cpp/src/cylon/table.cpp
Outdated
if(num_row == 0) { | ||
return Status::OK(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if(num_row == 0) { | |
return Status::OK(); | |
} | |
if(num_row == 0) { | |
*output = table; | |
return Status::OK(); | |
} |
cpp/src/cylon/table.cpp
Outdated
std::vector<std::shared_ptr<arrow::Table>> partitioned_tables; | ||
RETURN_CYLON_STATUS_IF_FAILED(Split(table, no_of_partitions, outPartitions, partitioned_tables)); | ||
|
||
std::shared_ptr<arrow::Schema> schema = table->get_table()->schema(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
bump
cpp/src/cylon/table.cpp
Outdated
*output = std::make_shared<cylon::Table>(table->GetContext(), table_out); | ||
|
||
return Status::OK(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
bump
cpp/src/cylon/table.cpp
Outdated
int num_row = table->Rows(); | ||
std::vector<int64_t> size = { num_row }; | ||
std::vector<int64_t> sizes; | ||
mpi::AllGather(size, world_size, sizes); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Status of this call needs to be checked
static void verify_test(std::vector<std::vector<std::string>>& expected, std::shared_ptr<Table>& output) { | ||
std::stringstream ss; | ||
output->PrintToOStream(ss); | ||
std::string s; | ||
int i = 0; | ||
while(ss>>s) { | ||
REQUIRE(s == expected[RANK][i++]); | ||
} | ||
REQUIRE(i == expected[RANK].size()); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
bump
from utils import create_df,assert_eq | ||
from pycylon.net import MPIConfig | ||
import random | ||
|
||
""" | ||
Run test: | ||
>> pytest -q python/pycylon/test/test_repartition.py | ||
""" | ||
|
||
def test_repartition(): | ||
env=CylonEnv(config=MPIConfig()) | ||
df1, _ = create_df([random.sample(range(10, 300), 50), | ||
random.sample(range(10, 300), 50), | ||
random.sample(range(10, 300), 50)]) | ||
df2 = df1.repartition([50], None, env=env) | ||
assert_eq(df1, df2) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
to run this test, you need add this under test/test_all.py
. Otherwise, it wouldn't run.
And I think this could be extended for multiple workers.
from utils import create_df,assert_eq | |
from pycylon.net import MPIConfig | |
import random | |
""" | |
Run test: | |
>> pytest -q python/pycylon/test/test_repartition.py | |
""" | |
def test_repartition(): | |
env=CylonEnv(config=MPIConfig()) | |
df1, _ = create_df([random.sample(range(10, 300), 50), | |
random.sample(range(10, 300), 50), | |
random.sample(range(10, 300), 50)]) | |
df2 = df1.repartition([50], None, env=env) | |
assert_eq(df1, df2) | |
from utils import create_df,assert_eq | |
from pycylon.net import MPIConfig | |
import random | |
""" | |
Run test: | |
>> pytest -q python/pycylon/test/test_repartition.py | |
""" | |
def test_repartition(): | |
env=CylonEnv(config=MPIConfig()) | |
world_sz = env.get_world_size() | |
df1, _ = create_df([random.sample(range(10, 300), 50), | |
random.sample(range(10, 300), 50), | |
random.sample(range(10, 300), 50)]) | |
df2 = df1.repartition([50 for _ in range(world_sz)], None, env=env) # distributed repartition | |
assert_eq(df1, df2) # still the local partitions would be equal |
minor change
…to kaiyingshan-repartition
@kaiyingshan Thank you very much for doing this! Great work.. 👍 |
* repartion with custom rank order * use all_to_all that preserves rank order & added tests * input validation & more test * add C++ apis and corresponding tests * minor fix * python apis * temporarily delete python api * python api * improve coding style, add comments, and refactor find mapping * remove unused code * use util function * refined distributed eq with repartition * use int64 * create a MacOS yml file (#530) * equal tests * fixes * Update test_repartition.py minor change * fixing test failures * adding an additional test Co-authored-by: Ziyao22 <[email protected]> Co-authored-by: niranda perera <[email protected]>
repartition c++ implementation and tests