Skip to content

Commit 37fb699

Browse files
authored
Merge branch 'main' into ir_case_when
2 parents ab03413 + ad4aea1 commit 37fb699

File tree

12 files changed

+245
-82
lines changed

12 files changed

+245
-82
lines changed

analytical_engine/core/fragment/arrow_projected_fragment.h

+111-32
Original file line numberDiff line numberDiff line change
@@ -1117,13 +1117,13 @@ class ArrowProjectedFragment
11171117
grape::PrepareConf conf) {
11181118
if (conf.message_strategy ==
11191119
grape::MessageStrategy::kAlongEdgeToOuterVertex) {
1120-
initDestFidList(true, true, iodst_, iodoffset_);
1120+
initDestFidList(comm_spec, true, true, iodst_, iodoffset_);
11211121
} else if (conf.message_strategy ==
11221122
grape::MessageStrategy::kAlongIncomingEdgeToOuterVertex) {
1123-
initDestFidList(true, false, idst_, idoffset_);
1123+
initDestFidList(comm_spec, true, false, idst_, idoffset_);
11241124
} else if (conf.message_strategy ==
11251125
grape::MessageStrategy::kAlongOutgoingEdgeToOuterVertex) {
1126-
initDestFidList(false, true, odst_, odoffset_);
1126+
initDestFidList(comm_spec, false, true, odst_, odoffset_);
11271127
}
11281128

11291129
initOuterVertexRanges();
@@ -1140,16 +1140,19 @@ class ArrowProjectedFragment
11401140
ie_spliters_ptr_.clear();
11411141
oe_spliters_ptr_.clear();
11421142
if (directed_) {
1143-
initEdgeSpliters(ie_, ie_offsets_begin_, ie_offsets_end_, ie_spliters_);
1144-
initEdgeSpliters(oe_, oe_offsets_begin_, oe_offsets_end_, oe_spliters_);
1143+
initEdgeSpliters(comm_spec, ie_, ie_offsets_begin_, ie_offsets_end_,
1144+
ie_spliters_);
1145+
initEdgeSpliters(comm_spec, oe_, oe_offsets_begin_, oe_offsets_end_,
1146+
oe_spliters_);
11451147
for (auto& vec : ie_spliters_) {
11461148
ie_spliters_ptr_.push_back(vec.data());
11471149
}
11481150
for (auto& vec : oe_spliters_) {
11491151
oe_spliters_ptr_.push_back(vec.data());
11501152
}
11511153
} else {
1152-
initEdgeSpliters(oe_, oe_offsets_begin_, oe_offsets_end_, oe_spliters_);
1154+
initEdgeSpliters(comm_spec, oe_, oe_offsets_begin_, oe_offsets_end_,
1155+
oe_spliters_);
11531156
for (auto& vec : oe_spliters_) {
11541157
ie_spliters_ptr_.push_back(vec.data());
11551158
oe_spliters_ptr_.push_back(vec.data());
@@ -1690,7 +1693,7 @@ class ArrowProjectedFragment
16901693
ends[i] = range.second;
16911694
}
16921695
},
1693-
std::thread::hardware_concurrency());
1696+
std::thread::hardware_concurrency(), 1024);
16941697
return {};
16951698
}
16961699

@@ -1721,16 +1724,81 @@ class ArrowProjectedFragment
17211724
bends[i] = range.second.second;
17221725
}
17231726
},
1724-
std::thread::hardware_concurrency());
1727+
std::thread::hardware_concurrency(), 1024);
17251728
return {};
17261729
}
17271730

1728-
void initDestFidList(bool in_edge, bool out_edge,
1729-
std::vector<fid_t>& fid_list,
1731+
void initDestFidList(const grape::CommSpec& comm_spec, const bool in_edge,
1732+
const bool out_edge, std::vector<fid_t>& fid_list,
17301733
std::vector<fid_t*>& fid_list_offset) {
17311734
if (!fid_list_offset.empty()) {
17321735
return;
17331736
}
1737+
fid_list_offset.resize(ivnum_ + 1, NULL);
1738+
1739+
int concurrency =
1740+
(std::thread::hardware_concurrency() + comm_spec.local_num() - 1) /
1741+
comm_spec.local_num();
1742+
1743+
// don't use std::vector<bool> due to its specialization
1744+
std::vector<uint8_t> fid_list_bitmap(ivnum_ * fnum_, 0);
1745+
std::atomic_size_t fid_list_size(0);
1746+
1747+
vineyard::parallel_for(
1748+
static_cast<vid_t>(0), static_cast<vid_t>(ivnum_),
1749+
[this, in_edge, out_edge, &fid_list_bitmap,
1750+
&fid_list_size](const vid_t& offset) {
1751+
vertex_t v = *(inner_vertices_.begin() + offset);
1752+
if (in_edge) {
1753+
auto es = GetIncomingAdjList(v);
1754+
fid_t last_fid = -1;
1755+
for (auto& e : es) {
1756+
fid_t f = GetFragId(e.neighbor());
1757+
if (f != last_fid && f != fid_ &&
1758+
!fid_list_bitmap[offset * fnum_ + f]) {
1759+
last_fid = f;
1760+
fid_list_bitmap[offset * fnum_ + f] = 1;
1761+
fid_list_size.fetch_add(1);
1762+
}
1763+
}
1764+
}
1765+
if (out_edge) {
1766+
auto es = GetOutgoingAdjList(v);
1767+
fid_t last_fid = -1;
1768+
for (auto& e : es) {
1769+
fid_t f = GetFragId(e.neighbor());
1770+
if (f != last_fid && f != fid_ &&
1771+
!fid_list_bitmap[offset * fnum_ + f]) {
1772+
last_fid = f;
1773+
fid_list_bitmap[offset * fnum_ + f] = 1;
1774+
fid_list_size.fetch_add(1);
1775+
}
1776+
}
1777+
}
1778+
},
1779+
concurrency, 1024);
1780+
1781+
fid_list.reserve(fid_list_size.load());
1782+
fid_list_offset[0] = fid_list.data();
1783+
1784+
for (vid_t i = 0; i < ivnum_; ++i) {
1785+
size_t nonzero = 0;
1786+
for (fid_t fid = 0; fid < fnum_; ++fid) {
1787+
if (fid_list_bitmap[i * fnum_ + fid]) {
1788+
nonzero += 1;
1789+
fid_list.push_back(fid);
1790+
}
1791+
}
1792+
fid_list_offset[i + 1] = fid_list_offset[i] + nonzero;
1793+
}
1794+
}
1795+
1796+
void initDestFidListSeq(const bool in_edge, const bool out_edge,
1797+
std::vector<fid_t>& fid_list,
1798+
std::vector<fid_t*>& fid_list_offset) {
1799+
if (!fid_list_offset.empty()) {
1800+
return;
1801+
}
17341802

17351803
fid_list_offset.resize(ivnum_ + 1, NULL);
17361804

@@ -1773,6 +1841,7 @@ class ArrowProjectedFragment
17731841
}
17741842

17751843
void initEdgeSpliters(
1844+
const grape::CommSpec& comm_spec,
17761845
const std::shared_ptr<arrow::FixedSizeBinaryArray>& edge_list,
17771846
const std::shared_ptr<arrow::Int64Array>& offsets_begin,
17781847
const std::shared_ptr<arrow::Int64Array>& offsets_end,
@@ -1784,28 +1853,38 @@ class ArrowProjectedFragment
17841853
for (auto& vec : spliters) {
17851854
vec.resize(ivnum_);
17861855
}
1787-
std::vector<int> frag_count;
1788-
for (vid_t i = 0; i < ivnum_; ++i) {
1789-
frag_count.clear();
1790-
frag_count.resize(fnum_, 0);
1791-
int64_t begin = offsets_begin->Value(i);
1792-
int64_t end = offsets_end->Value(i);
1793-
for (int64_t j = begin; j != end; ++j) {
1794-
const nbr_unit_t* nbr_ptr =
1795-
reinterpret_cast<const nbr_unit_t*>(edge_list->GetValue(j));
1796-
vertex_t u(nbr_ptr->vid);
1797-
fid_t u_fid = GetFragId(u);
1798-
++frag_count[u_fid];
1799-
}
1800-
begin += frag_count[fid_];
1801-
frag_count[fid_] = 0;
1802-
spliters[0][i] = begin;
1803-
for (fid_t j = 0; j < fnum_; ++j) {
1804-
begin += frag_count[j];
1805-
spliters[j + 1][i] = begin;
1806-
}
1807-
CHECK_EQ(begin, end);
1808-
}
1856+
1857+
int concurrency =
1858+
(std::thread::hardware_concurrency() + comm_spec.local_num() - 1) /
1859+
comm_spec.local_num();
1860+
1861+
vineyard::parallel_for(
1862+
static_cast<vid_t>(0), ivnum_,
1863+
[this, &offsets_begin, &offsets_end, &edge_list,
1864+
&spliters](const vid_t i) {
1865+
std::vector<int> frag_count(fnum_, 0);
1866+
int64_t begin = offsets_begin->Value(i);
1867+
int64_t end = offsets_end->Value(i);
1868+
for (int64_t j = begin; j != end; ++j) {
1869+
const nbr_unit_t* nbr_ptr =
1870+
reinterpret_cast<const nbr_unit_t*>(edge_list->GetValue(j));
1871+
vertex_t u(nbr_ptr->vid);
1872+
fid_t u_fid = GetFragId(u);
1873+
++frag_count[u_fid];
1874+
}
1875+
begin += frag_count[fid_];
1876+
frag_count[fid_] = 0;
1877+
spliters[0][i] = begin;
1878+
for (fid_t j = 0; j < fnum_; ++j) {
1879+
begin += frag_count[j];
1880+
spliters[j + 1][i] = begin;
1881+
}
1882+
if (begin != end) {
1883+
LOG(ERROR) << "Unexpected edge spliters for ith vertex " << i
1884+
<< ", begin: " << begin << " vs. end: " << end;
1885+
}
1886+
},
1887+
concurrency, 1024);
18091888
}
18101889

18111890
void initOuterVertexRanges() {

analytical_engine/core/fragment/dynamic_fragment.h

+28-21
Original file line numberDiff line numberDiff line change
@@ -418,7 +418,7 @@ class DynamicFragment
418418
if (conf.need_split_edges_by_fragment) {
419419
LOG(ERROR) << "MutableEdgecutFragment cannot split edges by fragment";
420420
} else if (conf.need_split_edges) {
421-
splitEdges();
421+
splitEdges(comm_spec);
422422
}
423423
}
424424

@@ -905,30 +905,37 @@ class DynamicFragment
905905
return id_parser_.max_local_id() - index - 1;
906906
}
907907

908-
void splitEdges() {
908+
void splitEdges(const grape::CommSpec& comm_spec) {
909909
auto& inner_vertices = InnerVertices();
910910
iespliter_.Init(inner_vertices);
911911
oespliter_.Init(inner_vertices);
912-
int inner_neighbor_count = 0;
913-
for (auto& v : inner_vertices) {
914-
inner_neighbor_count = 0;
915-
auto ie = GetIncomingAdjList(v);
916-
for (auto& e : ie) {
917-
if (IsInnerVertex(e.neighbor)) {
918-
++inner_neighbor_count;
919-
}
920-
}
921-
iespliter_[v] = get_ie_begin(v) + inner_neighbor_count;
922912

923-
inner_neighbor_count = 0;
924-
auto oe = GetOutgoingAdjList(v);
925-
for (auto& e : oe) {
926-
if (IsInnerVertex(e.neighbor)) {
927-
++inner_neighbor_count;
928-
}
929-
}
930-
oespliter_[v] = get_oe_begin(v) + inner_neighbor_count;
931-
}
913+
int concurrency =
914+
(std::thread::hardware_concurrency() + comm_spec.local_num() - 1) /
915+
comm_spec.local_num();
916+
vineyard::parallel_for(
917+
static_cast<vid_t>(0), static_cast<vid_t>(inner_vertices.size()),
918+
[this, &inner_vertices](const vid_t& offset) {
919+
vertex_t v = *(inner_vertices.begin() + offset);
920+
size_t inner_neighbor_count = 0;
921+
auto ie = GetIncomingAdjList(v);
922+
for (auto& e : ie) {
923+
if (IsInnerVertex(e.neighbor)) {
924+
++inner_neighbor_count;
925+
}
926+
}
927+
iespliter_[v] = get_ie_begin(v) + inner_neighbor_count;
928+
929+
inner_neighbor_count = 0;
930+
auto oe = GetOutgoingAdjList(v);
931+
for (auto& e : oe) {
932+
if (IsInnerVertex(e.neighbor)) {
933+
++inner_neighbor_count;
934+
}
935+
}
936+
oespliter_[v] = get_oe_begin(v) + inner_neighbor_count;
937+
},
938+
concurrency, 1024);
932939
}
933940

934941
vid_t parseOrAddOuterVertexGid(vid_t gid) {

analytical_engine/core/fragment/fragment_reporter.h

+1-2
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,6 @@
1818

1919
#ifdef NETWORKX
2020

21-
#include <glog/logging.h>
22-
2321
#include <cstddef>
2422
#include <cstdint>
2523
#include <memory>
@@ -29,6 +27,7 @@
2927

3028
#include "boost/leaf/error.hpp"
3129
#include "boost/leaf/result.hpp"
30+
#include "glog/logging.h"
3231
#include "grape/communication/communicator.h"
3332
#include "grape/serialization/in_archive.h"
3433
#include "grape/worker/comm_spec.h"

analytical_engine/core/server/rpc_utils.h

+11
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,17 @@ class GSParams {
131131

132132
const rpc::LargeAttrValue& GetLargeAttr() const { return large_attr_; }
133133

134+
const std::string DebugString() const {
135+
std::ostringstream ss;
136+
ss << "GSParams: {";
137+
for (auto const& kv : params_) {
138+
ss << rpc::ParamKey_Name(kv.first) << ": " << kv.second.DebugString()
139+
<< ", ";
140+
}
141+
ss << "}";
142+
return ss.str();
143+
}
144+
134145
private:
135146
const std::map<int, rpc::AttrValue> params_;
136147
const rpc::LargeAttrValue& large_attr_;

analytical_engine/test/run_vy_app.cc

+2-2
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@
3535
#include "lcc/lcc.h"
3636
#include "pagerank/pagerank.h"
3737
#include "pagerank/pagerank_auto.h"
38-
#include "pagerank/pagerank_local.h"
38+
#include "pagerank/pagerank_local_parallel.h"
3939
#include "sssp/sssp.h"
4040
#include "wcc/wcc.h"
4141

@@ -321,7 +321,7 @@ void RunProjectedPR(std::shared_ptr<ProjectedFragmentType> fragment,
321321
const std::string& out_prefix) {
322322
// using AppType = grape::PageRankAuto<ProjectedFragmentType>;
323323
using AppType = grape::PageRank<ProjectedFragmentType>;
324-
// using AppType = grape::PageRankLocal<ProjectedFragmentType>;
324+
// using AppType = grape::PageRankLocalParallel<ProjectedFragmentType>;
325325
auto app = std::make_shared<AppType>();
326326
auto worker = AppType::CreateWorker(app, fragment);
327327
auto spec = grape::DefaultParallelEngineSpec();

coordinator/gscoordinator/builtin/app/.gs_conf.yaml

+24-8
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,24 @@
11
app:
22
- algo: pagerank
33
type: cpp_pie
4-
class_name: grape::PageRankLocal
5-
src: pagerank/pagerank_local.h
4+
class_name: grape::PageRankLocalParallel
5+
src: pagerank/pagerank_local_parallel.h
6+
compatible_graph:
7+
- grape::ImmutableEdgecutFragment
8+
- gs::ArrowProjectedFragment
9+
- gs::DynamicProjectedFragment
10+
- algo: pagerank_directed
11+
type: cpp_pie
12+
class_name: grape::PageRankDirected
13+
src: pagerank/pagerank_directed.h
14+
compatible_graph:
15+
- grape::ImmutableEdgecutFragment
16+
- gs::ArrowProjectedFragment
17+
- gs::DynamicProjectedFragment
18+
- algo: pagerank_opt
19+
type: cpp_pie
20+
class_name: grape::PageRankOpt
21+
src: pagerank/pagerank_opt.h
622
compatible_graph:
723
- grape::ImmutableEdgecutFragment
824
- gs::ArrowProjectedFragment
@@ -53,6 +69,12 @@ app:
5369
- grape::ImmutableEdgecutFragment
5470
- gs::ArrowProjectedFragment
5571
- gs::DynamicProjectedFragment
72+
- algo: lcc
73+
type: cpp_pie
74+
class_name: grape::LCC
75+
src: lcc/lcc.h
76+
compatible_graph:
77+
- gs::DynamicFragment
5678
- algo: sssp_path
5779
type: cpp_pie
5880
class_name: gs::SSSPPath
@@ -118,12 +140,6 @@ app:
118140
src: apps/kshell/kshell.h
119141
compatible_graph:
120142
- gs::DynamicFragment
121-
- algo: lcc
122-
type: cpp_pie
123-
class_name: grape::LCC
124-
src: lcc/lcc.h
125-
compatible_graph:
126-
- gs::DynamicFragment
127143
- algo: clustering
128144
type: cpp_pie
129145
class_name: gs::Clustering

interactive_engine/executor/assembly/v6d/src/bin/gaia_executor.rs

+3-1
Original file line numberDiff line numberDiff line change
@@ -169,7 +169,9 @@ fn sync_global_process_partition_lists(
169169
if servers.len() > 1 {
170170
assert_eq!(partitions.len() % servers.len(), 0);
171171
let nchunk = partitions.len() / servers.len();
172-
for (index, server) in servers.iter().enumerate() {
172+
let mut sorted_servers = servers.clone();
173+
sorted_servers.sort();
174+
for (index, server) in sorted_servers.iter().enumerate() {
173175
partition_lists.insert(*server, partitions[index * nchunk..(index + 1) * nchunk].to_vec());
174176
}
175177
}

0 commit comments

Comments
 (0)