Skip to content

Commit 25036e9

Browse files
committed
re-enable passing in providers/domains to try an use
1 parent d944739 commit 25036e9

16 files changed

+485
-288
lines changed

src/api.c

+5-2
Original file line numberDiff line numberDiff line change
@@ -21,21 +21,24 @@
2121
* state and to start the underlying OFI and runtime layer. Using any other APIs
2222
* without prior call to rofi_init() may result in failure and unexpected behavior.
2323
*
24+
* @param provs A priority list of providers to try and be used.
25+
* @param domains A priority list of domains to try and be used.
26+
*
2427
*
2528
* @return 0 on success, -1 in case of errors
2629
*
2730
* \b blocking: yes
2831
* \b thread-safe: no
2932
*
3033
*/
31-
int rofi_init(char *prov) {
34+
int rofi_init(char *provs, char *domains) {
3235
int ret = 0;
3336

3437
rofi.desc.status = ROFI_STATUS_NONE;
3538

3639
DEBUG_MSG("Initilizing ROFI runtime...");
3740

38-
ret = rofi_init_internal(prov);
41+
ret = rofi_init_internal(provs, domains);
3942

4043
if (ret) {
4144
ERR_MSG("Error initializing ROFI library");

src/core.c

+68-16
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ void *rofi_get_remote_addr_internal(void *addr, unsigned int id) {
2525
return NULL;
2626
}
2727

28-
DEBUG_MSG("\t Found MR [0x%lx - 0x%lx] Addr: %p Key: 0x%lx ", el->start, el->start + el->size, (void *)(addr - (uintptr_t)el->start + el->iov[id].addr), el->mr_key);
28+
DEBUG_MSG("\t Found MR [0x%lx - 0x%lx] Addr: %p Key: 0x%lx ", el->start, el->start + el->size, (void *)(addr - (uintptr_t)el->start + el->iov[id].addr), el->iov[id].key);
2929
return (void *)(addr - (uintptr_t)el->start + el->iov[id].addr);
3030
}
3131

@@ -38,7 +38,7 @@ void *rofi_get_local_addr_from_remote_addr_internal(void *addr, unsigned int id)
3838
return NULL;
3939
}
4040

41-
DEBUG_MSG("\t Found MR [0x%lx - 0x%lx] Addr: %p Key: 0x%lx", el->start, el->start + el->size, (void *)(addr - el->iov[id].addr + (uintptr_t)el->start), el->mr_key);
41+
DEBUG_MSG("\t Found MR [0x%lx - 0x%lx] Addr: %p Key: 0x%lx", el->start, el->start + el->size, (void *)(addr - el->iov[id].addr + (uintptr_t)el->start), el->iov[id].key);
4242
return (void *)(addr - el->iov[id].addr + (uintptr_t)el->start);
4343
}
4444

@@ -206,10 +206,34 @@ int rofi_send_internal(unsigned long id, void *buf, size_t size, unsigned long f
206206
int rofi_recv_internal(unsigned long id, void *buf, size_t size, unsigned long flags) {
207207
}
208208

209-
int rofi_init_internal(char *prov) {
210-
if (!prov) {
211-
ERR_MSG("ROFI provider not specified. Currently ROFI only supports \"verbs\".");
209+
rofi_names_t *rofi_parse_names_internal(char *names_list) {
210+
char token = ';';
211+
int name_cnt = 0;
212+
for (int i = 0; i < strlen(names_list); i++) {
213+
if (names_list[i] == token) {
214+
name_cnt++;
215+
}
216+
}
217+
name_cnt += 1;
218+
char **name_strs = (char **)calloc(name_cnt, sizeof(char *));
219+
220+
int p = 0;
221+
int i = 0;
222+
for (int k = 0; k < strlen(names_list); k++) {
223+
if (names_list[k] == token) {
224+
name_strs[p] = strndup(&names_list[i], k - i);
225+
p++;
226+
i = k + 1;
227+
}
212228
}
229+
name_strs[p] = strndup(&names_list[i], strlen(names_list) - i);
230+
rofi_names_t *names = (rofi_names_t *)calloc(1, sizeof(rofi_names_t));
231+
names->num = name_cnt;
232+
names->names = name_strs;
233+
return names;
234+
}
235+
236+
int rofi_init_internal(char *provs, char *domains) {
213237
pthread_rwlock_init(&rofi.mr_lock, NULL);
214238
pthread_mutex_init(&rofi.lock, NULL);
215239
int ret = 0;
@@ -224,12 +248,7 @@ int rofi_init_internal(char *prov) {
224248
rofi.desc.nodes = rt_get_size();
225249
rofi.desc.nid = rt_get_rank();
226250

227-
rofi.info = (struct fi_info *)calloc(1, sizeof(struct fi_info));
228-
if (!rofi.info) {
229-
ERR_MSG("Error allocating memory for rofi. Aborting.");
230-
ret = EXIT_FAILURE;
231-
goto err;
232-
}
251+
rofi.info = NULL;
233252

234253
DEBUG_MSG("Initializing process %d/%d...", rofi.desc.nid, rofi.desc.nodes);
235254

@@ -238,7 +257,7 @@ int rofi_init_internal(char *prov) {
238257
return EXIT_FAILURE;
239258
}
240259

241-
hints->caps = FI_RMA | FI_ATOMIC | FI_COLLECTIVE; // eventually want FI_ATOMIC
260+
hints->caps = FI_RMA | FI_ATOMIC | FI_COLLECTIVE;
242261
hints->addr_format = FI_FORMAT_UNSPEC;
243262
hints->domain_attr->resource_mgmt = FI_RM_ENABLED;
244263
hints->domain_attr->threading = FI_THREAD_DOMAIN;
@@ -249,9 +268,21 @@ int rofi_init_internal(char *prov) {
249268
hints->ep_attr->type = FI_EP_RDM;
250269
hints->tx_attr->op_flags = FI_DELIVERY_COMPLETE; // maybe need to change this to FI_INJECT_COMPLETE or FI_TRANSMIT_COMPLETE
251270

252-
if (prov) {
253-
hints->fabric_attr->prov_name = strdup(prov);
271+
rofi_names_t *prov_names = NULL;
272+
if (provs) {
273+
prov_names = rofi_parse_names_internal(provs);
254274
}
275+
// else {
276+
// names = rofi_parse_names_internal("verbs");
277+
// }
278+
279+
rofi_names_t *domain_names = NULL;
280+
if (domains) {
281+
domain_names = rofi_parse_names_internal(domains);
282+
}
283+
// else {
284+
// rofi.domains = rofi_parse_names_internal("ib");
285+
// }
255286

256287
// this isn't really needed for verbs since it is a connected endpoint
257288
rofi.remote_addrs = (fi_addr_t *)malloc(rofi.desc.nodes * sizeof(fi_addr_t));
@@ -264,11 +295,27 @@ int rofi_init_internal(char *prov) {
264295
rofi.remote_addrs[i] = i;
265296
}
266297

267-
rofi_transport_init(hints, &rofi);
298+
rofi_transport_init(hints, &rofi, prov_names, domain_names);
299+
300+
if (prov_names) {
301+
for (int i = 0; i < prov_names->num; i++) {
302+
free(prov_names->names[i]);
303+
}
304+
free(prov_names);
305+
}
306+
307+
if (domain_names) {
308+
for (int i = 0; i < domain_names->num; i++) {
309+
free(domain_names->names[i]);
310+
}
311+
free(domain_names);
312+
}
268313

269314
mr_init();
270315
uint64_t global_barrier_size = rofi.desc.nodes * sizeof(uint64_t);
271-
int rofi_mr_size = global_barrier_size;
316+
uint64_t sub_alloc_barrier_size = rofi.desc.nodes * sizeof(uint64_t);
317+
uint64_t sub_alloc_size = rofi.desc.nodes * sizeof(struct fi_rma_iov);
318+
int rofi_mr_size = global_barrier_size + sub_alloc_barrier_size + sub_alloc_size;
272319

273320
rofi.mr = mr_add(&rofi, rofi_mr_size, 0);
274321
if (!rofi.mr) {
@@ -284,9 +331,14 @@ int rofi_init_internal(char *prov) {
284331

285332
rofi.global_barrier_id = 0;
286333
rofi.global_barrier_buf = (uint64_t *)rofi.mr->start;
334+
rofi.sub_alloc_barrier_buf = (uint64_t *)(rofi.mr->start + global_barrier_size);
335+
rofi.sub_alloc_buf = (struct fi_rma_iov *)(rofi.mr->start + global_barrier_size + sub_alloc_barrier_size);
287336

288337
for (int i = 0; i < rofi.desc.nid; i++) {
289338
rofi.global_barrier_buf[i] = 0;
339+
rofi.sub_alloc_barrier_buf[i] = 0;
340+
rofi.sub_alloc_buf[i].key = 0;
341+
rofi.sub_alloc_buf[i].addr = 0;
290342
}
291343
fi_freeinfo(hints);
292344
return 0;

src/include/rofi.h

+15-15
Original file line numberDiff line numberDiff line change
@@ -5,21 +5,21 @@
55

66
#define ROFI_ERR_ALLOC 0x01
77

8-
int rofi_init(char*);
9-
int rofi_finit(void);
8+
int rofi_init(char *, char *);
9+
int rofi_finit(void);
1010
unsigned int rofi_get_size(void);
1111
unsigned int rofi_get_id(void);
12-
int rofi_flush(void);
13-
int rofi_put(void*, void*, size_t, unsigned int, unsigned long);
14-
int rofi_iput(void*, void*, size_t, unsigned int, unsigned long);
15-
int rofi_get(void*, void*, size_t, unsigned int, unsigned long);
16-
int rofi_iget(void*, void*, size_t, unsigned int, unsigned long);
17-
int rofi_alloc(size_t, unsigned long, void**);
18-
int rofi_sub_alloc(size_t, unsigned long, void**,uint64_t*, uint64_t);
19-
int rofi_release(void*);
20-
int rofi_sub_release(void*,uint64_t*, uint64_t);
21-
void rofi_barrier(void);
22-
int rofi_wait(void);
23-
void* rofi_get_remote_addr(void*, unsigned int);
24-
void* rofi_get_local_addr_from_remote_addr(void*, unsigned int);
12+
int rofi_flush(void);
13+
int rofi_put(void *, void *, size_t, unsigned int, unsigned long);
14+
int rofi_iput(void *, void *, size_t, unsigned int, unsigned long);
15+
int rofi_get(void *, void *, size_t, unsigned int, unsigned long);
16+
int rofi_iget(void *, void *, size_t, unsigned int, unsigned long);
17+
int rofi_alloc(size_t, unsigned long, void **);
18+
int rofi_sub_alloc(size_t, unsigned long, void **, uint64_t *, uint64_t);
19+
int rofi_release(void *);
20+
int rofi_sub_release(void *, uint64_t *, uint64_t);
21+
void rofi_barrier(void);
22+
int rofi_wait(void);
23+
void *rofi_get_remote_addr(void *, unsigned int);
24+
void *rofi_get_local_addr_from_remote_addr(void *, unsigned int);
2525
#endif

src/include/rofi_debug.h

+9-5
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,22 @@
11
#ifndef ROFI_DEBUG_H
22
#define ROFI_DEBUG_H
33

4+
#include <sys/syscall.h>
45
#include <sys/types.h>
56
#include <unistd.h>
6-
#include <sys/syscall.h>
77

88
#ifdef _DEBUG
9-
#define DEBUG_MSG(fmt, ...) \
10-
do { fprintf(stderr, "[%d][%d][DEBUG][%s][%s:%d] " fmt "\n", rt_get_rank(),syscall(__NR_gettid),__func__, __FILE__, \
11-
__LINE__, ##__VA_ARGS__); } while (0)
9+
#define DEBUG_MSG(fmt, ...) \
10+
do { \
11+
fprintf(stderr, "[ROFI DEBUG][PE: %d][TID: %d][%s][%s:%d] " fmt "\n", rt_get_rank(), syscall(__NR_gettid), __func__, __FILE__, \
12+
__LINE__, ##__VA_ARGS__); \
13+
} while (0)
1214
#else
1315
#define DEBUG_MSG(fmt, args...)
1416
#endif
1517

16-
#define ERR_MSG(fmt, args...) fprintf(stderr,"[PE %d, TID: %d][ROFI ERR][%s][%s:%d] " fmt "\n", rt_get_rank(),syscall(__NR_gettid),__func__, __FILE__, __LINE__, ##args)
18+
#define WARN_MSG(fmt, args...) fprintf(stderr, "[ROFI WARNING][PE: %d][TID: %d][%s][%s:%d] " fmt "\n", rt_get_rank(), syscall(__NR_gettid), __func__, __FILE__, __LINE__, ##args)
19+
20+
#define ERR_MSG(fmt, args...) fprintf(stderr, "[ROFI ERROR][PE: %d][TID: %d][%s][%s:%d] " fmt "\n", rt_get_rank(), syscall(__NR_gettid), __func__, __FILE__, __LINE__, ##args)
1721

1822
#endif

src/include/rofi_internal.h

+11-3
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
#define ROFI_FI_VERSION FI_VERSION(1, 20)
1616
#endif
1717

18-
typedef struct rofi_transport_s rofi_transport_t;
18+
typedef struct rofi_transport_t rofi_transport_t;
1919

2020
#include "context.h"
2121
#include "mr.h"
@@ -54,7 +54,12 @@ typedef struct {
5454
uint64_t inject_size;
5555
} rofi_desc_t;
5656

57-
struct rofi_transport_s {
57+
typedef struct rofi_prov_names_t {
58+
char **names;
59+
int num;
60+
} rofi_names_t;
61+
62+
struct rofi_transport_t {
5863
struct fi_info *info;
5964
struct fid_fabric *fabric;
6065
struct fid_domain *domain;
@@ -71,13 +76,16 @@ struct rofi_transport_s {
7176
rofi_mr_desc *mr;
7277
uint64_t global_barrier_id;
7378
uint64_t *global_barrier_buf;
79+
uint64_t *sub_alloc_barrier_buf;
80+
struct fi_rma_iov *sub_alloc_buf;
7481
pthread_mutex_t lock;
7582
pthread_rwlock_t mr_lock;
83+
uint64_t fi_collective;
7684
};
7785

7886
extern rofi_transport_t rofi;
7987

80-
int rofi_init_internal(char *);
88+
int rofi_init_internal(char *, char *);
8189
int rofi_finit_internal(void);
8290
unsigned int rofi_get_size_internal(void);
8391
unsigned int rofi_get_id_internal(void);

src/include/transport.h

+16-16
Original file line numberDiff line numberDiff line change
@@ -31,39 +31,40 @@
3131
#define _TRANSPORT_H_
3232

3333
#if HAVE_CONFIG_H
34-
# include <config.h>
34+
#include <config.h>
3535
#endif /* HAVE_CONFIG_H */
3636

37-
#include <stdlib.h>
3837
#include <inttypes.h>
3938
#include <netinet/tcp.h>
40-
#include <sys/uio.h>
4139
#include <stdbool.h>
40+
#include <stdlib.h>
41+
#include <sys/uio.h>
4242

4343
#include <rdma/fabric.h>
44-
#include <rdma/fi_rma.h>
4544
#include <rdma/fi_domain.h>
45+
#include <rdma/fi_rma.h>
4646

47-
#define ROFI_TRANSPORT_ERR_MSG(call,retv) \
48-
do { fprintf(stderr, "[PE %d][ROFI TRANSPORT ERR][%s:%d] " call " failed: %s (%d)\n", \
49-
rt_get_rank(),__FILE__,__LINE__, fi_strerror(retv),(int)(retv)); } while(0)
47+
#define ROFI_TRANSPORT_ERR_MSG(call, retv) \
48+
do { \
49+
fprintf(stderr, "[PE %d][ROFI TRANSPORT ERR][%s:%d] " call " failed: %s (%d)\n", \
50+
rt_get_rank(), __FILE__, __LINE__, fi_strerror(retv), (int)(retv)); \
51+
} while (0)
5052

51-
#define MIN(a,b) \
52-
({ __typeof__ (a) _a = (a); \
53+
#define MIN(a, b) \
54+
({ __typeof__ (a) _a = (a); \
5355
__typeof__ (b) _b = (b); \
5456
_a < _b ? _a : _b; })
5557

5658
int rofi_transport_fini(rofi_transport_t *rofi);
57-
int rofi_transport_init(struct fi_info *hints, rofi_transport_t *rofi);
58-
int rofi_transport_init_fabric_resources( rofi_transport_t *rofi);
59-
int rofi_transport_init_endpoint_resources( rofi_transport_t *rofi);
59+
int rofi_transport_init(struct fi_info *hints, rofi_transport_t *rofi, rofi_names_t *prov_names, rofi_names_t *domain_names);
60+
int rofi_transport_init_fabric_resources(rofi_transport_t *rofi);
61+
int rofi_transport_init_endpoint_resources(rofi_transport_t *rofi);
6062
int rofi_transport_init_av(rofi_transport_t *rofi);
6163

62-
6364
int rofi_transport_progress(rofi_transport_t *rofi);
6465
int rofi_transport_ctx_check_err(rofi_transport_t *rofi, int err);
6566
int rofi_transport_check_rma_err(rofi_transport_t *rofi, int ret);
66-
int rofi_transport_wait_on_cntr(rofi_transport_t *rofi, uint64_t *pending_cntr, struct fid_cntr *cntr);
67+
int rofi_transport_wait_on_cntr(rofi_transport_t *rofi, uint64_t *pending_cntr, struct fid_cntr *cntr);
6768

6869
int rofi_transport_put_inject(rofi_transport_t *rofi, struct fi_rma_iov *rma_iov, uint64_t pe, const void *src_addr, size_t len);
6970
int rofi_transport_put_large(rofi_transport_t *rofi, struct fi_rma_iov *rma_iov, uint64_t pe, const void *src_addr, size_t len, void *desc, void *context);
@@ -75,10 +76,9 @@ int rofi_transport_get_large(rofi_transport_t *rofi, struct fi_rma_iov *rma_iov,
7576
int rofi_transport_get(rofi_transport_t *rofi, struct fi_rma_iov *rma_iov, uint64_t pe, void *dst_addr, size_t len, void *desc, void *context);
7677
int rofi_transport_get_wait_all(rofi_transport_t *rofi);
7778

78-
7979
int rofi_transport_exchange_mr_info(rofi_transport_t *rofi, rofi_mr_desc *mr);
8080
int rofi_transport_sub_exchange_mr_info(rofi_transport_t *rofi, rofi_mr_desc *mr, uint64_t *pes, uint64_t num_pes);
81+
int rofi_transport_inner_barrier(rofi_transport_t *rofi, uint64_t *barrier_id, uint64_t *barrier_buf, uint64_t *pes, uint64_t me, uint64_t num_pes);
8182
int rofi_transport_barrier(rofi_transport_t *rofi);
8283

83-
8484
#endif /* _TRANSPORT_H_ */

0 commit comments

Comments
 (0)