|
| 1 | +import torch |
| 2 | +import torch.nn.functional as F |
| 3 | +from torch.autograd import Function |
| 4 | +import torch.distributed as dist |
| 5 | + |
| 6 | +from einops import rearrange |
| 7 | + |
| 8 | +# helpers |
| 9 | + |
| 10 | +def exists(val): |
| 11 | + return val is not None |
| 12 | + |
| 13 | +def pad_dim_to(t, length, dim = 0): |
| 14 | + pad_length = length - t.shape[dim] |
| 15 | + zero_pairs = (-dim - 1) if dim < 0 else (t.ndim - dim - 1) |
| 16 | + return F.pad(t, (*((0, 0) * zero_pairs), 0, pad_length)) |
| 17 | + |
| 18 | +# distributed helpers |
| 19 | + |
| 20 | +def all_gather_variable_dim(t, dim = 0, sizes = None): |
| 21 | + device, world_size = t.device, dist.get_world_size() |
| 22 | + |
| 23 | + if not exists(sizes): |
| 24 | + size = torch.tensor(t.shape[dim], device = device, dtype = torch.long) |
| 25 | + sizes = [torch.empty_like(size, device = device, dtype = torch.long) for i in range(world_size)] |
| 26 | + dist.all_gather(sizes, size) |
| 27 | + sizes = torch.stack(sizes) |
| 28 | + |
| 29 | + max_size = sizes.amax().item() |
| 30 | + padded_t = pad_dim_to(t, max_size, dim = dim) |
| 31 | + |
| 32 | + gathered_tensors = [torch.empty(padded_t.shape, device = device, dtype = padded_t.dtype) for i in range(world_size)] |
| 33 | + dist.all_gather(gathered_tensors, padded_t) |
| 34 | + |
| 35 | + gathered_tensor = torch.cat(gathered_tensors, dim = dim) |
| 36 | + seq = torch.arange(max_size, device = device) |
| 37 | + |
| 38 | + mask = rearrange(seq, 'j -> 1 j') < rearrange(sizes, 'i -> i 1') |
| 39 | + mask = rearrange(mask, 'i j -> (i j)') |
| 40 | + seq = torch.arange(mask.shape[-1], device = device) |
| 41 | + indices = seq[mask] |
| 42 | + |
| 43 | + gathered_tensor = gathered_tensor.index_select(dim, indices) |
| 44 | + |
| 45 | + return gathered_tensor, sizes |
| 46 | + |
| 47 | +class AllGather(Function): |
| 48 | + @staticmethod |
| 49 | + def forward(ctx, x, dim, sizes): |
| 50 | + is_dist = dist.is_initialized() and dist.get_world_size() > 1 |
| 51 | + ctx.is_dist = is_dist |
| 52 | + |
| 53 | + if not is_dist: |
| 54 | + return x, None |
| 55 | + |
| 56 | + x, batch_sizes = all_gather_variable_dim(x, dim = dim, sizes = sizes) |
| 57 | + ctx.batch_sizes = batch_sizes.tolist() |
| 58 | + ctx.dim = dim |
| 59 | + return x, batch_sizes |
| 60 | + |
| 61 | + @staticmethod |
| 62 | + def backward(ctx, grads, _): |
| 63 | + if not ctx.is_dist: |
| 64 | + return grads, None, None |
| 65 | + |
| 66 | + batch_sizes, rank = ctx.batch_sizes, dist.get_rank() |
| 67 | + grads_by_rank = grads.split(batch_sizes, dim = ctx.dim) |
| 68 | + return grads_by_rank[rank], None, None |
| 69 | + |
| 70 | +all_gather = AllGather.apply |
0 commit comments