Skip to content

Commit 9688e06

Browse files
committed
wip
1 parent 832b3ad commit 9688e06

File tree

5 files changed

+182
-21
lines changed

5 files changed

+182
-21
lines changed

python/mscclpp/language/internal/operations.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -330,7 +330,8 @@ def to_json(self):
330330
result["dst_buff"] = []
331331
for chunk in self.dst_buff:
332332
result["dst_buff"].append(chunk.to_json())
333-
result["channel_ids"] = self.channel_ids
333+
if self.channel_ids == ChannelType.port:
334+
result["channel_ids"] = self.channel_ids
334335
result["channel_type"] = self.channel_type.value
335336
return result
336337

@@ -437,7 +438,7 @@ def __add__(self, other):
437438
remote_dst_buff=self.remote_dst_buff + other.dst_buff,
438439
channel_ids=self.channel_ids,
439440
put_channel_ids=self.put_channel_ids + other.channel_ids,
440-
channel_type=self.channel_type,
441+
channel_type=other.channel_type,
441442
reduce_operation=self.reduce_operation,
442443
packet=self.packet,
443444
)
@@ -460,10 +461,10 @@ def to_json(self):
460461
for chunk in self.remote_dst_buff:
461462
result["dst_buff"].append(chunk.to_json())
462463

463-
if len(self.channel_ids) > 0:
464+
""" if len(self.channel_ids) > 0:
464465
result["channel_ids"] = self.channel_ids
465466
if len(self.put_channel_ids) > 0:
466-
result["output_channel_ids"] = self.put_channel_ids
467+
result["output_channel_ids"] = self.put_channel_ids """
467468
if self.channel_type != ChannelType.none:
468469
result["channel_type"] = self.channel_type.value
469470
result["reduce_op"] = self.reduce_operation.value

python/mscclpp/language/tests/allgather.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ def allgather_example(name, num_threads_per_block, min_message_size, max_message
4444
# Skip sending from a rank to itself
4545
if src_rank != dst_rank:
4646
# Define a channel from src_rank → dst_rank using memory channel
47-
ch = Channel(dst_rank, src_rank, ChannelType.memory)
47+
ch = Channel(dst_rank, src_rank)
4848
# Step 1: source signals to indicate it is ready to receive data
4949
ch.signal(tb=0, sync=None, relaxed=True)
5050
# Step 2: wait for the destination rank to be ready
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
# Copyright (c) Microsoft Corporation.
2+
# Licensed under the MIT License.
3+
4+
import argparse
5+
from mscclpp.language.channel import *
6+
from mscclpp.language.rank import *
7+
from mscclpp.language.general import *
8+
from mscclpp.language.program import *
9+
from mscclpp.language.collectives import *
10+
11+
12+
def allgather_example(name, gpu_size, num_threads_per_block, min_message_size, max_message_size):
13+
chunksperloop = 1
14+
collective = AllGather(gpu_size, chunksperloop, True)
15+
with MSCCLPPProgram(
16+
name,
17+
collective,
18+
gpu_size,
19+
protocol="LL",
20+
num_threads_per_block=num_threads_per_block,
21+
use_double_scratch_buffer=True,
22+
min_message_size=min_message_size,
23+
max_message_size=max_message_size,
24+
):
25+
# Creating Scratch Buffers
26+
scratch_buffer = []
27+
for gpu in range(gpu_size):
28+
scratch_buffer.append(Buffer(gpu, gpu_size))
29+
30+
# Putting packet in the remote scratch buffer
31+
for gpu in range(gpu_size):
32+
rank = Rank(gpu)
33+
output_buffer = rank.get_output_buffer()
34+
for peer in range(1, gpu_size):
35+
dst_rank = (gpu + peer) % gpu_size
36+
ch = Channel(dst_rank, gpu)
37+
tb = peer - 1
38+
ch.put_packet(scratch_buffer[dst_rank][gpu : gpu + 1], output_buffer[gpu : gpu + 1], tb)
39+
40+
# Copying packet from local scratch buffer to local buffer
41+
for gpu in range(gpu_size):
42+
rank = Rank(gpu)
43+
output_buffer = rank.get_output_buffer()
44+
for peer in range(1, gpu_size):
45+
dst_rank = (gpu + peer) % gpu_size
46+
rank.copy(output_buffer[dst_rank: dst_rank + 1], scratch_buffer[gpu][dst_rank: dst_rank + 1], tb=gpu_size + peer - 2, from_packet=True)
47+
48+
print(JSON())
49+
50+
51+
parser = argparse.ArgumentParser()
52+
53+
parser.add_argument("--name", type=str, help="name of the program")
54+
parser.add_argument("--num_gpus", type=int, help="number of gpus")
55+
parser.add_argument("--num_threads_per_block", type=int, default=1024, help="number of threads per block")
56+
parser.add_argument("--min_message_size", type=int, default=0, help="minimum message size")
57+
parser.add_argument("--max_message_size", type=int, default=2**64 - 1, help="maximum message size")
58+
59+
args = parser.parse_args()
60+
61+
allgather_example(args.name, args.num_gpus, args.num_threads_per_block, args.min_message_size, args.max_message_size)

python/mscclpp/language/tests/allreduce.py

Lines changed: 30 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010

1111

1212
def allreduce_example(name, gpu_size, num_threads_per_block, min_message_size, max_message_size):
13-
chunksperloop = 1
13+
chunksperloop = gpu_size
1414
collective = AllReduce(gpu_size, chunksperloop, True)
1515
with MSCCLPPProgram(
1616
name,
@@ -25,7 +25,7 @@ def allreduce_example(name, gpu_size, num_threads_per_block, min_message_size, m
2525
# Creating Scratch Buffers
2626
scratch_buffer = []
2727
for gpu in range(gpu_size):
28-
scratch_buffer.append(Buffer(gpu, 2 * gpu_size))
28+
scratch_buffer.append(Buffer(gpu, 2 * (gpu_size - 1) * gpu_size))
2929

3030
# Creating Channels
3131
channels = {}
@@ -40,33 +40,47 @@ def allreduce_example(name, gpu_size, num_threads_per_block, min_message_size, m
4040
input_buffer = rank.get_input_buffer()
4141
for peer in range(gpu_size):
4242
if peer != gpu:
43+
scratch_index = gpu if gpu < peer else gpu - 1
44+
scratch_index *= gpu_size
45+
input_index = peer * gpu_size
46+
tb = peer if peer < gpu else peer - 1
4347
channels[(peer, gpu)].put_packet(
44-
scratch_buffer[peer][gpu : gpu + 1], input_buffer[peer : peer + 1], 0
48+
scratch_buffer[peer][scratch_index : scratch_index + gpu_size], input_buffer[input_index : input_index + gpu_size], tb
4549
)
4650

4751
# Each rank performs a local reduction on the nth chunk
4852
for gpu in range(gpu_size):
49-
chunks = []
50-
for peer in range(gpu_size):
51-
if peer != gpu:
52-
chunks.append(scratch_buffer[gpu][peer : peer + 1])
53-
rank = Rank(gpu)
54-
input_buffer = rank.get_input_buffer()
55-
rank.reduce(input_buffer[gpu : gpu + 1], chunks, 0, packet=True)
56-
for peer in range(gpu_size):
57-
if peer != gpu:
58-
channels[(peer, gpu)].put_packet(
59-
scratch_buffer[peer][gpu_size + gpu : gpu_size + gpu + 1], input_buffer[gpu : gpu + 1], 0
60-
)
53+
for index in range(gpu_size):
54+
chunks = []
55+
for peer in range(gpu_size):
56+
if peer != gpu:
57+
scratch_index = peer if peer < gpu else peer - 1
58+
scratch_index = scratch_index * gpu_size + index
59+
chunks.append(scratch_buffer[gpu][scratch_index : scratch_index + 1])
60+
rank = Rank(gpu)
61+
input_buffer = rank.get_input_buffer()
62+
input_index = gpu * gpu_size + index
63+
rank.reduce(input_buffer[input_index : input_index + 1], chunks, index, packet=True)
64+
65+
for peer in range(gpu_size):
66+
if peer != gpu:
67+
scratch_index = gpu if gpu < peer else gpu - 1
68+
scratch_index = gpu_size * (gpu_size - 1) + scratch_index * gpu_size + index
69+
channels[(peer, gpu)].put_packet(
70+
scratch_buffer[peer][scratch_index : scratch_index + 1], input_buffer[input_index : input_index + 1], index
71+
)
6172

6273
# Each rank get final result from scratch space
6374
for gpu in range(gpu_size):
6475
rank = Rank(gpu)
6576
input_buffer = rank.get_input_buffer()
6677
for peer in range(gpu_size):
6778
if peer != gpu:
79+
input_index = peer * gpu_size
80+
scratch_index = peer if peer < gpu else peer - 1
81+
scratch_index = gpu_size * (gpu_size - 1) + scratch_index * gpu_size
6882
rank.copy(
69-
input_buffer[peer : peer + 1], scratch_buffer[gpu][gpu_size + peer : gpu_size + peer + 1], 0
83+
input_buffer[input_index : input_index + gpu_size], scratch_buffer[gpu][scratch_index : scratch_index + gpu_size], peer, from_packet=True
7084
)
7185

7286
print(JSON())
Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
# Copyright (c) Microsoft Corporation.
2+
# Licensed under the MIT License.
3+
4+
import argparse
5+
from mscclpp.language.channel import *
6+
from mscclpp.language.rank import *
7+
from mscclpp.language.general import *
8+
from mscclpp.language.program import *
9+
from mscclpp.language.collectives import *
10+
11+
12+
def allreduce_example(name, gpu_size, num_threads_per_block, min_message_size, max_message_size):
13+
chunksperloop = 1
14+
collective = AllReduce(gpu_size, chunksperloop, True)
15+
with MSCCLPPProgram(
16+
name,
17+
collective,
18+
gpu_size,
19+
protocol="LL",
20+
num_threads_per_block=num_threads_per_block,
21+
use_double_scratch_buffer=False,
22+
min_message_size=min_message_size,
23+
max_message_size=max_message_size,
24+
):
25+
# Creating Scratch Buffers
26+
scratch_buffer = []
27+
for gpu in range(gpu_size):
28+
scratch_buffer.append(Buffer(gpu, 2 * gpu_size))
29+
30+
# Creating Channels
31+
channels = {}
32+
for gpu in range(gpu_size):
33+
for peer in range(gpu_size):
34+
if peer != gpu:
35+
channels[(peer, gpu)] = Channel(peer, gpu)
36+
37+
# Each rank sends the nth chunk to the nth rank into scratch space
38+
for gpu in range(gpu_size):
39+
rank = Rank(gpu)
40+
input_buffer = rank.get_input_buffer()
41+
for peer in range(gpu_size):
42+
if peer != gpu:
43+
channels[(peer, gpu)].put_packet(
44+
scratch_buffer[peer][gpu : gpu + 1], input_buffer[peer : peer + 1], 0
45+
)
46+
47+
# Each rank performs a local reduction on the nth chunk
48+
for gpu in range(gpu_size):
49+
chunks = []
50+
for peer in range(gpu_size):
51+
if peer != gpu:
52+
chunks.append(scratch_buffer[gpu][peer : peer + 1])
53+
rank = Rank(gpu)
54+
input_buffer = rank.get_input_buffer()
55+
rank.reduce(input_buffer[gpu : gpu + 1], chunks, 0, packet=True)
56+
for peer in range(gpu_size):
57+
if peer != gpu:
58+
channels[(peer, gpu)].put_packet(
59+
scratch_buffer[peer][gpu_size + gpu : gpu_size + gpu + 1], input_buffer[gpu : gpu + 1], 0
60+
)
61+
62+
# Each rank get final result from scratch space
63+
for gpu in range(gpu_size):
64+
rank = Rank(gpu)
65+
input_buffer = rank.get_input_buffer()
66+
for peer in range(gpu_size):
67+
if peer != gpu:
68+
rank.copy(
69+
input_buffer[peer : peer + 1], scratch_buffer[gpu][gpu_size + peer : gpu_size + peer + 1], 0, from_packet=True
70+
)
71+
72+
print(JSON())
73+
74+
75+
parser = argparse.ArgumentParser()
76+
77+
parser.add_argument("--name", type=str, help="name of the program")
78+
parser.add_argument("--num_gpus", type=int, help="number of gpus")
79+
parser.add_argument("--num_threads_per_block", type=int, default=1024, help="number of threads per block")
80+
parser.add_argument("--min_message_size", type=int, default=0, help="minimum message size")
81+
parser.add_argument("--max_message_size", type=int, default=2**64 - 1, help="maximum message size")
82+
83+
args = parser.parse_args()
84+
85+
allreduce_example(args.name, args.num_gpus, args.num_threads_per_block, args.min_message_size, args.max_message_size)

0 commit comments

Comments
 (0)